stateStore.get()
returns inconsistent results, when used from transform()
on KStream
. It returns null, even though corresponding key-value has been put()
into the store.
Can someone explain this behavior of KeyValueStore<>
?
@Component
public class StreamProcessor {
@StreamListener
public void process(@Input(KStreamBindings.INPUT_STREAM) KStream<String, JsonNode> inputStream) {
KStream<String, JsonNode> joinedEvents = inputStream
.selectKey((key, value) -> computeKey(value))
.transform(
() -> new SelfJoinTransformer((v1, v2) -> join(v1, v2), "join_store"),
"join_store"
);
joinedEvents
.foreach((key, value) -> System.out.format("%s,joined=%b\n",key, value.has("right")));
}
private JsonNode join(JsonNode left, JsonNode right) {
((ObjectNode) left).set("right", right);
return left;
}
}
public class SelfJoinTransformer implements Transformer<String, JsonNode, KeyValue<String, JsonNode>> {
private KeyValueStore<String, JsonNode> stateStore;
private ValueJoiner<JsonNode, JsonNode, JsonNode> valueJoiner;
private String storeName;
public SelfJoinTransformer(ValueJoiner<JsonNode, JsonNode, JsonNode> valueJoiner, String storeName) {
this.storeName = storeName;
this.valueJoiner = valueJoiner;
}
@Override
public void init(ProcessorContext context) {
this.stateStore = (KeyValueStore<String, JsonNode>) context.getStateStore(storeName);
}
@Override
public KeyValue<String, JsonNode> transform(String key, JsonNode value) {
JsonNode oldValue = stateStore.get(key);
if (oldValue != null) { //this condition rarely holds true
stateStore.delete(key);
System.out.format("%s,joined\n", key);
return KeyValue.pair(key, valueJoiner.apply(oldValue, value));
}
stateStore.put(key, value);
return null;
}
}
The reason, that it seems messages are disappearing (assuming, that punctuator doesn't remove them) is that you use KStream::selectKey(...), it change key, but doesn't do repartitioning And you might look for the key in wrong partitions.
Look at following scenarion:
k1
,v1
(partition0
)k2
,v2
(partition1
)Assumption messages are put in different partition (because of key) After selectKey:
k1 -> k
,k2 -> k
k
,v1
k
,v2
Operation
selectKey
is stateless so messages are not sent to downstream (topic) and repartition doesn't happen. For first message: value is put for key - k in the store (partition0) When second message arrives: for key - k there is no message, because it is different partition (partition1)