Neo4j Source Connectors Failing to build the Schema where the source query returns null for some of the fields

26 views Asked by At

I am using Neo4j Source Connector for Kafka (https://neo4j.com/docs/kafka/kafka-connect/source/) to produce the changes to the Kafka Topic. My config is like below:

{
    "connector.class": "streams.kafka.connect.source.Neo4jSourceConnector",
    "neo4j.authentication.basic.password": "*****",
    "neo4j.server.uri": "bolt://neo4j:7687",
    "neo4j.source.query": "MATCH (c:Customer) WHERE c.timestamp > $lastCheck RETURN c.name as name, c.age as age, c.timestamp as timestamp",
    "neo4j.enforce.schema": "true",
    "name": "neo4j-source-connector",
    "neo4j.authentication.basic.username": "neo4j",
    "topic": "neo4j-test-AVRO",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "neo4j.streaming.poll.interval.msecs": "5000",
    "neo4j.streaming.from": "LAST_COMMITTED"
}

Whenever I inserted a node in the neo4j with the following code:

CREATE (c1:Customer {name: 'Test',age:null, timestamp: timestamp()})

I got the following error: enter image description here

Any help will be appreciated. Thanks

I used the JSONSchema (io.confluent.connect.json.JsonSchemaConverter) but was getting the same error. Things works with the String Converter (org.apache.kafka.connect.storage.StringConverter) though.

1

There are 1 answers

1
cybersam On

Depending on your needs, here are some possible workarounds achievable by changing "neo4j.source.query":

  1. Filter out Customer nodes with no age property:

    MATCH (c:Customer) WHERE c.age IS NOT NULL AND c.timestamp > $lastCheck RETURN c.name AS name, c.age AS age, c.timestamp AS timestamp
    
  2. Use a special value (e.g., -1) instead, if age is NULL:

    MATCH (c:Customer) c.timestamp > $lastCheck RETURN c.name AS name, COALESCE(c.age, -1) AS age, c.timestamp AS timestamp