I have the most straightforward of use cases for Kafka Streams DSL: read in CSV sensordata, group by timestamp and output. Following code does not compile:
public static void main(String[] args) {
StreamsConfig streamingConfig = new StreamsConfig(getProperties());
Serde<String> stringSerde = Serdes.String();
CSVDeserializer<SensorData> sensorDataDeserializer = new CSVDeserializer<>(SensorData.class);
JsonSerializer<SensorData> sensorDataSerializer = new JsonSerializer<>();
Serde sensorDataSerde = Serdes.serdeFrom(sensorDataSerializer, sensorDataDeserializer);
JsonDeserializer<SensorData> sensorDataJsonDeserializer = new JsonDeserializer<>(SensorData.class);
Serde sensorDataJSONSerde = Serdes.serdeFrom(sensorDataSerializer, sensorDataJsonDeserializer);
StringSerializer stringSerializer = new StringSerializer();
StringDeserializer stringDeserializer = new StringDeserializer();
WindowedSerializer<String> windowedSerializer = new WindowedSerializer<>(stringSerializer);
WindowedDeserializer<String> windowedDeserializer = new WindowedDeserializer<>(stringDeserializer);
Serde<Windowed<String>> windowedSerde = Serdes.serdeFrom(windowedSerializer, windowedDeserializer);
JsonSerializer<SensorDataAccumulator> accSerializer = new JsonSerializer<>();
JsonDeserializer accDeserializer = new JsonDeserializer<>(SensorDataAccumulator.class);
Serde<SensorDataAccumulator> accSerde = Serdes.serdeFrom(accSerializer, accDeserializer);
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String,SensorData> initialStream = kStreamBuilder.stream(stringSerde,sensorDataSerde,"e40_orig");
final KStream<String, SensorData> sensorDataKStream = initialStream
.filter((k, v) -> (v != null))
.map((k, v) -> new KeyValue<>(v.getMeasurementDateTime().toString(), v));
sensorDataKStream
.filter((k, v) -> (v != null))
.groupBy((k,v) -> k, stringSerde, sensorDataJSONSerde)
.aggregate(SensorDataAccumulator::new,
==> error (k, v, list) -> list.add(v), //CHANGED THIS -->((SensorDataAccumulator)list).add((SensorData)v),
TimeWindows.of(10000),
accSerde, "acc")
.to(windowedSerde, accSerde, "out");
KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder,streamingConfig);
kafkaStreams.start();
}
due to
Error:(90, 45) java: cannot find symbol symbol: method add(java.lang.Object) location: variable list of type java.lang.Object
Weird.
public class SensorDataAccumulator {
ArrayList list = new ArrayList<SensorData>();
public SensorDataAccumulator add(SensorData s) {
list.add(s);
return this;
}
Casting as commented leads to following runtime exception (right before outputting the windowed accumulation).
[2017-01-02 13:00:45,614] INFO task [1_0] Initializing processor nodes of the topology (org.apache.kafka.streams.processor.internals.StreamTask:123)
[2017-01-02 13:01:04,173] WARN Error while fetching metadata with correlation id 779 : {out=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient:600)
[2017-01-02 13:01:04,662] INFO stream-thread [StreamThread-1] Shutting down (org.apache.kafka.streams.processor.internals.StreamThread:268)
[2017-01-02 13:01:04,663] INFO stream-thread [StreamThread-1] Committing consumer offsets of task 0_0 (org.apache.kafka.streams.processor.internals.StreamThread:358)
[2017-01-02 13:01:04,666] INFO stream-thread [StreamThread-1] Committing consumer offsets of task 1_0 (org.apache.kafka.streams.processor.internals.StreamThread:358)
[2017-01-02 13:01:04,668] INFO stream-thread [StreamThread-1] Closing a task 0_0 (org.apache.kafka.streams.processor.internals.StreamThread:751)
[2017-01-02 13:01:04,668] INFO stream-thread [StreamThread-1] Closing a task 1_0 (org.apache.kafka.streams.processor.internals.StreamThread:751)
[2017-01-02 13:01:04,668] INFO stream-thread [StreamThread-1] Flushing state stores of task 0_0 (org.apache.kafka.streams.processor.internals.StreamThread:368)
[2017-01-02 13:01:04,669] INFO stream-thread [StreamThread-1] Flushing state stores of task 1_0 (org.apache.kafka.streams.processor.internals.StreamThread:368)
Exception in thread "StreamThread-1" java.lang.NoSuchMethodError: org.rocksdb.RocksIterator.close()V
at org.apache.kafka.streams.state.internals.RocksDBStore$RocksDbIterator.close(RocksDBStore.java:468)
at org.apache.kafka.streams.state.internals.RocksDBStore.closeOpenIterators(RocksDBStore.java:411)
at org.apache.kafka.streams.state.internals.RocksDBStore.close(RocksDBStore.java:397)
at org.apache.kafka.streams.state.internals.RocksDBWindowStore.close(RocksDBWindowStore.java:276)
at org.apache.kafka.streams.state.internals.MeteredWindowStore.close(MeteredWindowStore.java:109)
at org.apache.kafka.streams.state.internals.CachingWindowStore.close(CachingWindowStore.java:125)
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:349)
at org.apache.kafka.streams.processor.internals.AbstractTask.closeStateManager(AbstractTask.java:120)
at org.apache.kafka.streams.processor.internals.StreamThread$2.apply(StreamThread.java:348)
at org.apache.kafka.streams.processor.internals.StreamThread.performOnAllTasks(StreamThread.java:328)
at org.apache.kafka.streams.processor.internals.StreamThread.closeAllStateManagers(StreamThread.java:344)
at org.apache.kafka.streams.processor.internals.StreamThread.shutdownTasksAndState(StreamThread.java:305)
at org.apache.kafka.streams.processor.internals.StreamThread.shutdown(StreamThread.java:269)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:252)
[2017-01-02 13:01:05,316] INFO stream-thread [StreamThread-1] Closing the state manager of task 0_0 (org.apache.kafka.streams.processor.internals.StreamThread:347)
[2017-01-02 13:01:05,316] INFO stream-thread [StreamThread-1] Closing the state manager of task 1_0 (org.apache.kafka.streams.processor.internals.StreamThread:347)
Debugging the add
method of SensorDataAccumulator
should give a clue:
So, if I understand correctly, I'm keeping a ArrayList list = new ArrayList<SensorData>();
but actually, somewhere in the process its members are changed to LinkedTreeMap
. The typechecker lost me here...
Ok, the LinkedTreeMap
is the underlying datastructure GSON uses for my JsonDeserializer
and JsonSerializer
classes. So I'll add these for completeness below.
Currently I'm not sure what I am doing wrong and where to fix it. Should I use different serializers, different data structures? Different language ;) ?
Any input is welcome.
public class JsonSerializer<T> implements Serializer<T> {
private Gson gson = new Gson();
@Override
public void configure(Map<String, ?> map, boolean b) {
}
@Override
public byte[] serialize(String topic, T t) {
return gson.toJson(t).getBytes(Charset.forName("UTF-8"));
}
@Override
public void close() {
}
}
public class JsonDeserializer<T> implements Deserializer<T> {
private Gson gson = new Gson();
private Class<T> deserializedClass;
public JsonDeserializer(Class<T> deserializedClass) {
this.deserializedClass = deserializedClass;
}
public JsonDeserializer() {
}
@Override
public void configure(Map<String, ?> map, boolean b) {
if(deserializedClass == null) {
deserializedClass = (Class<T>) map.get("serializedClass");
}
}
@Override
public T deserialize(String s, byte[] bytes) {
if(bytes == null){
return null;
}
return gson.fromJson(new String(bytes),deserializedClass);
}
@Override
public void close() {
}
}