I'm trying to write some data to cassandra, a connection is established and this is my database schema :
CREATE TABLE IF NOT EXISTS road_traffic (
road_id INT,
timestamp TIMESTAMP,
radar_id INT,
vehicles MAP<INT, FROZEN<UDTVehicle>>,
PRIMARY KEY (road_id, timestamp) // partition key and clustering key
);
CREATE TYPE IF NOT EXISTS UDTVehicle (
num_vehicles INT,
speed LIST<FLOAT>
);
and this is my python code :
def write_to_cassandra(self,session, record):
insert_query = """
INSERT INTO road_traffic (road_id, timestamp, radar_id, vehicles)
VALUES (?, ?, ?, ?);
"""
prepared_insert = session.prepare(insert_query)
batch = BatchStatement()
batch_size = 0
batches= 0
print(record)
if not all(record.get(field) is not None for field in list(record.keys())) :
logger.warning("Record is missing a required fields")
return
else :
vehicles_map = {record.get('road_id'): record.get('Vehicles')}
print(record.get('road_id'))
batch.add(prepared_insert, (record.get('road_id'), record.get('timestamp'), record.get('radar_id'), vehicles_map))
batch_size += 1
As far as I understand the vehicles_map must be a key(which is the partition key) and a value(my UDTVehicle object). I get this when I print the record :
{'timestamp': datetime.datetime(2022, 1, 1, 8, 0, 30), 'num_Vehicles': 7, 'road_id': 9, 'radar_id': 11, 'Vehicles': {'num_Vehicles': 7, 'speed': [72.78, 62.67, 85.15, 75.51, 83.95, 76.39, 57.92]}}
The error I get is as follows :
Traceback (most recent call last):
File "cassandra_kafka/ingest_cassandra.py", line 135, in <module>
Data_ingest.consume_store()
File "cassandra_kafka/ingest_cassandra.py", line 113, in consume_store
self.write_to_cassandra(session, value)
File "cassandra_kafka/ingest_cassandra.py", line 66, in write_to_cassandra
batch.add(prepared_insert, (record.get('road_id'), record.get('timestamp'), record.get('radar_id'), vehicles_map))
File "cassandra/query.py", line 827, in cassandra.query.BatchStatement.add
File "cassandra/query.py", line 506, in cassandra.query.PreparedStatement.bind
File "cassandra/query.py", line 636, in cassandra.query.BoundStatement.bind
File "cassandra/cqltypes.py", line 799, in cassandra.cqltypes._ParameterizedType.serialize
File "cassandra/cqltypes.py", line 909, in cassandra.cqltypes.MapType.serialize_safe
File "cassandra/cqltypes.py", line 324, in cassandra.cqltypes._CassandraType.to_binary
File "cassandra/cqltypes.py", line 799, in cassandra.cqltypes._ParameterizedType.serialize
File "cassandra/cqltypes.py", line 1030, in cassandra.cqltypes.UserType.serialize_safe
KeyError: 0
What am I doing wrong here ? any help would be appreciated ^^.
the proper way to insert rows containing UDTs in the Python driver, with prepared statements, is to use a simple class with the same structure as the UDT and use it when creating the values for the insert statement.
I have prepared a simple code demonstrating what I mean: check the
UdtVehicleclass and how it is instantiated when creating the arguments to the insert statement in the batch.The sample code goes on to demonstrate a few other things, depending on the passed command-line arg.
readshows what you get when "just" reading the rows as they are,read_udtshows how you can register a UDT with your Cluster and have the returned rows nicely cast into your Python class,insertis a sanity check for a single-row (=non-batch) prepared insertion statement (with the UDT properly handled as explained above), andinsertbexemplifies use of the previous case within a batch.For more information on handling UDT, please check this page: https://docs.datastax.com/en/developer/python-driver/latest/user_defined_types . Note that for unprepared statement you would need a slightly different approach (anyway you probably want to use prepared statement in a production application).
Looking at your code above, you might want to cast the vehicles into their UDT right within the function you posted, (depending on what exactly you are receiving through the Kafka stream). Just be aware of the fact that a batch will not be executed until you explictly invoke
session.execute(batch), which is not shown in your code.Another couple of remarks for your awareness:
self.prepared_statementis a natural candidate) and then just use it for greater performancenum_vehicles == len(speed). If that should be enforced, probably a different model would be better (but then again this depends on your use case)road_idvalues), then probably a single batch is to be avoided. Read more here: https://batey.info/cassandra-anti-pattern-cassandra-logged.html, https://www.batey.info/cassandra-anti-pattern-misuse-of.html .And now the sample code you can start from. (Tested on Cassandra 4.1)