Kafka Connect to Cassandra Mapping Problem

512 views Asked by At

While trying to push Zeek logs to Kafka (3.1.0) topics, which works fine and works as intended. Then I was trying writing them from Kafka to Cassandra (4.0.1) via the DataStax Apache Kafka® Connector (kafka-connect-cassandra-sink-1.4.0) where I am getting a weird Mapping error (see below).

My connect-standalone.properties:

bootstrap.servers=localhost:9092
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/usr/local/kafka/plugins/kafka-connect-cassandra-sink-1.4.0.jar

My Cassandra connector properties cassandra-sink.properties:

name=cassandra-sink
connector.class=com.datastax.oss.kafka.sink.CassandraSinkConnector
tasks.max=1
topics=dns, http
#transforming the zeek-field-names so no error occours
"transforms": "RenameField",
"transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceFiel>
"transforms.RenameField.renames": "id.orig_h:id_orig_h,id.orig_p:id_orig_p,id.resp_h:id_resp_h,id.resp_p:id_resp_p,AA:aa,Z:z,RA:ra,TC:tc,TS:ts,RD:rd"
#Mapping
topic.dns.zeek.dns.mapping=  ts=value,uid=key,id_orig_h=value,id_orig_p=value,id_resp_h=value,id_resp_p=value,proto=value,trans_id=value,rtt=value,query=value,qclass=value,qclass_name=value,qtype=value,qtype_name=value,rcode=value,rcode_name=value,aa=value,tc=value,rd=value,ra=value,z=value,answers=value,rejected=value
topic.http.zeek.http.mapping= ts=value,uid=key,id_orig_h=value,id_orig_p=value,id_resp_h=value,id_resp_p=value,trans_depth=value,method=value,host=value,uri=value,version=value,user_agent=value,request_body_len=value,response_body_len=value,status_code=value,status_msg=value,tags=value,resp_fuids=value
topic.dns.zeek.dns.ttlTimeUnit=SECONDS
topic.http.zeek.http.ttlTimeUnit=SECONDS
topic.dns.zeek.dns.timestampTimeUnit=MICROSECONDS
topic.http.zeek.http.timestampTimeUnit=MICROSECONDS

Cassandra DB:

cqlsh> DESCRIBE KEYSPACE zeek

CREATE KEYSPACE zeek WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}  AND durable_writes = true;

CREATE TABLE zeek.dns (
    uid text PRIMARY KEY,
    aa text,
    answers text,
    id_orig_h text,
    id_orig_p double,
    id_resp_h text,
    id_resp_p double,
    proto text,
    qclass int,
    qclass_name text,
    qtype int,
    qtype_name text,
    query text,
    ra text,
    rcode double,
    rcode_name text,
    rd text,
    rejected text,
    rtt int,
    tc text,
    trans_id double,
    ts text,
    ttls text,
    z double
) WITH additional_write_policy = '99p'
    AND bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND cdc = false
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '16', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND default_time_to_live = 0
    AND extensions = {}
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair = 'BLOCKING'
    AND speculative_retry = '99p';

CREATE TABLE zeek.http (
    uid text PRIMARY KEY,
    host text,
    id_orig_h text,
    id_orig_p double,
    id_resp_h text,
    id_resp_p double,
    method text,
    orig_fuids text,
    request_body_len int,
    resp_fuids text,
    resp_mime_types text,
    response_body_len int,
    status_code int,
    status_msg text,
    tags text,
    trans_depth int,
    ts text,
    uri text,
    user_agent text,
    version text
) WITH additional_write_policy = '99p'
    AND bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND cdc = false
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '16', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND default_time_to_live = 0
    AND extensions = {}
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair = 'BLOCKING'
    AND speculative_retry = '99p';

An example Zeek-log I am trying to write to Cassandra looks like this:

$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
> --from-beginning --property print.key=true --max-messages 1 \
> --topic dns
null    {"dns": {"ts":1644391084.805351,"uid":"CUXnim2Q50AeUZoZXc","id.orig_h":"192.168.2.35","id.orig_p":55882,"id.resp_h":"192.168.2.1","id.resp_p":53,"proto":"udp","trans_id":8173,"rtt":0.01738905906677246,"query":"36.247.213.34.in-addr.arpa","qclass":1,"qclass_name":"C_INTERNET","qtype":12,"qtype_name":"PTR","rcode":0,"rcode_name":"NOERROR","AA":false,"TC":false,"RD":true,"RA":true,"Z":0,"answers":["ec2-34-213-247-36.us-west-2.compute.amazonaws.com"],"TTLs":[300.0],"rejected":false}}
Processed a total of 1 messages

With Zeek, Zookeeper, Kafka and Cassandra started I start Kafka-Connect with

bin/connect-standalone.sh config/connect-standalone.properties config/cassandra-sink.properties

This runs without interruption but throws countless Warnings like this:

[2022-02-09 13:43:50,615] WARN [cassandra-sink|task-0] Error decoding/mapping Kafka record SinkRecord{kafkaOffset=301, timestampType=CreateTime} ConnectRecord{topic='dns', kafkaPartition=0, key=null, keySchema=Schema{STRING}, value={dns={AA=false, qclass_name=C_INTERNET, id.orig_p=22793, qtype_name=AAAA, qtype=28, rejected=false, id.resp_p=53, query=connectivity-check.ubuntu.com.sphairon.box, trans_id=17766, rcode=3, rcode_name=NXDOMAIN, TC=false, RA=false, uid=Ckb97wAbf0MDgGVW7, RD=true, proto=udp, id.orig_h=192.168.2.35, Z=0, qclass=1, ts=1.644393059147673E9, id.resp_h=192.168.2.1}}, valueSchema=null, timestamp=1644393059667, headers=ConnectHeaders(headers=)}: Primary key column uid cannot be mapped to null. Check that your mapping setting matches your dataset contents. (com.datastax.oss.kafka.sink.CassandraSinkTask:305)

I dont know why it says Primary key column uid cannot be mapped to null. I tried changing the mapping to ts=dns.value, uid=dns.value etc. but it didnt help.

0

There are 0 answers