Unable to parse schemas received by schema registry while tracking oracle database changes

902 views Asked by At

I am using confluent and kafka-connect-oracle (https://github.com/erdemcer/kafka-connect-oracle) to track changes in Oracle database 11g XE and i can see schema content by using schema registry api such as "curl -X GET http://localhost:8081/schemas/ids/44" :

{"subject":"TEST.KAFKAUSER.TEST-value","version":1,"id":44,"schema":"{"type":"record","name":"row","namespace":"test.kafkauser.test","fields":[{"name":"SCN","type":"long"},{"name":"SEG_OWNER","type":"string"},{"name":"TABLE_NAME","type":"string"},{"name":"TIMESTAMP","type":{"type":"long","connect.version":1,"connect.name":"org.apache.kafka.connect.data.Timestamp","logicalType":"timestamp-millis"}},{"name":"SQL_REDO","type":"string"},{"name":"OPERATION","type":"string"},{"name":"data","type":["null",{"type":"record","name":"value","namespace":"","fields":[{"name":"ID","type":["null","double"],"default":null},{"name":"NAME","type":["null","string"],"default":null}],"connect.name":"value"}],"default":null},{"name":"before","type":["null","value"],"default":null}],"connect.name":"test.kafkauser.test.row"}","deleted":false}

However this schema cannot be parsed by confluent's schema registry in python :

schemaRegistryClientURL="http://localhost:8081"
from confluent.schemaregistry.client import CachedSchemaRegistryClient
from confluent.schemaregistry.serializers import MessageSerializer
schema_registry_client= CachedSchemaRegistryClient(url=schemaRegistryClientURL)
schema_registry_client.get_by_id(44)

I get following error :

Traceback (most recent call last): File "", line 1, in File "build/bdist.linux-x86_64/egg/confluent/schemaregistry/client/CachedSchemaRegistryClient.py", line 140, in get_by_id confluent.schemaregistry.client.ClientError: Received bad schema from registry.

Does kafka-connect-oracle send unvalid schema to schema registry ? How can I get this schema into proper format?

Thanks.

1

There are 1 answers

0
Bünyamin Şentürk On

Looks like there is a problem with your schema. JSON formatter says it's an invalid format. You can check if your JSON is formatted correctly here: https://jsonformatter.curiousconcept.com/#

By looking at it, I see there are 2 overused quote marks here:

First one is in the firt row, after "schema":

Second one is in the last row, between test.row"} and ,"deleted":false}

After deleting these two, it is now in the valid form. If you are asking a way to do this automatically, I don't know a way to do it. Maybe you can search for some python codes to validate and fix JSON format.

This is the valid format:

{
   "subject":"TEST.KAFKAUSER.TEST-value",
   "version":1,
   "id":44,
   "schema":{
      "type":"record",
      "name":"row",
      "namespace":"test.kafkauser.test",
      "fields":[
         {
            "name":"SCN",
            "type":"long"
         },
         {
            "name":"SEG_OWNER",
            "type":"string"
         },
         {
            "name":"TABLE_NAME",
            "type":"string"
         },
         {
            "name":"TIMESTAMP",
            "type":{
               "type":"long",
               "connect.version":1,
               "connect.name":"org.apache.kafka.connect.data.Timestamp",
               "logicalType":"timestamp-millis"
            }
         },
         {
            "name":"SQL_REDO",
            "type":"string"
         },
         {
            "name":"OPERATION",
            "type":"string"
         },
         {
            "name":"data",
            "type":[
               "null",
               {
                  "type":"record",
                  "name":"value",
                  "namespace":"",
                  "fields":[
                     {
                        "name":"ID",
                        "type":[
                           "null",
                           "double"
                        ],
                        "default":null
                     },
                     {
                        "name":"NAME",
                        "type":[
                           "null",
                           "string"
                        ],
                        "default":null
                     }
                  ],
                  "connect.name":"value"
               }
            ],
            "default":null
         },
         {
            "name":"before",
            "type":[
               "null",
               "value"
            ],
            "default":null
         }
      ],
      "connect.name":"test.kafkauser.test.row"
   },
   "deleted":false
}