I am using Neo4j Source Connector for Kafka (https://neo4j.com/docs/kafka/kafka-connect/source/) to produce the changes to the Kafka Topic. My config is like below:
{
"connector.class": "streams.kafka.connect.source.Neo4jSourceConnector",
"neo4j.authentication.basic.password": "*****",
"neo4j.server.uri": "bolt://neo4j:7687",
"neo4j.source.query": "MATCH (c:Customer) WHERE c.timestamp > $lastCheck RETURN c.name as name, c.age as age, c.timestamp as timestamp",
"neo4j.enforce.schema": "true",
"name": "neo4j-source-connector",
"neo4j.authentication.basic.username": "neo4j",
"topic": "neo4j-test-AVRO",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"neo4j.streaming.poll.interval.msecs": "5000",
"neo4j.streaming.from": "LAST_COMMITTED"
}
Whenever I inserted a node in the neo4j with the following code:
CREATE (c1:Customer {name: 'Test',age:null, timestamp: timestamp()})
Any help will be appreciated. Thanks
I used the JSONSchema (io.confluent.connect.json.JsonSchemaConverter) but was getting the same error. Things works with the String Converter (org.apache.kafka.connect.storage.StringConverter) though.

Depending on your needs, here are some possible workarounds achievable by changing "neo4j.source.query":
Filter out
Customernodes with noageproperty:Use a special value (e.g., -1) instead, if
ageisNULL: