KeyValueStore.get() returns inconsistent results

443 views Asked by At

stateStore.get() returns inconsistent results, when used from transform() on KStream. It returns null, even though corresponding key-value has been put() into the store.

Can someone explain this behavior of KeyValueStore<>?

@Component
public class StreamProcessor {

    @StreamListener
    public void process(@Input(KStreamBindings.INPUT_STREAM) KStream<String, JsonNode> inputStream) {
        KStream<String, JsonNode> joinedEvents = inputStream
           .selectKey((key, value) -> computeKey(value))
           .transform(
               () -> new SelfJoinTransformer((v1, v2) -> join(v1, v2), "join_store"),
               "join_store"
            );
        
        joinedEvents
               .foreach((key, value) -> System.out.format("%s,joined=%b\n",key, value.has("right")));
    }

    private JsonNode join(JsonNode left, JsonNode right) {
        ((ObjectNode) left).set("right", right);
        return left;
    }
}

public class SelfJoinTransformer implements Transformer<String, JsonNode, KeyValue<String, JsonNode>> {
  private KeyValueStore<String, JsonNode> stateStore;
  private ValueJoiner<JsonNode, JsonNode, JsonNode> valueJoiner;
  private String storeName;

  public SelfJoinTransformer(ValueJoiner<JsonNode, JsonNode, JsonNode> valueJoiner, String storeName) {
    this.storeName = storeName;
    this.valueJoiner = valueJoiner;
  }

  @Override
  public void init(ProcessorContext context) {
     this.stateStore = (KeyValueStore<String, JsonNode>) context.getStateStore(storeName);
  }

  @Override
  public KeyValue<String, JsonNode> transform(String key, JsonNode value) {
    JsonNode oldValue = stateStore.get(key);
    if (oldValue != null) { //this condition rarely holds true
        stateStore.delete(key);
        System.out.format("%s,joined\n", key);
        return KeyValue.pair(key, valueJoiner.apply(oldValue, value));
    }
    stateStore.put(key, value);
    return null;
  }
}
1

There are 1 answers

1
Bartosz Wardziński On BEST ANSWER

The reason, that it seems messages are disappearing (assuming, that punctuator doesn't remove them) is that you use KStream::selectKey(...), it change key, but doesn't do repartitioning And you might look for the key in wrong partitions.

Look at following scenarion:

  • Msg1: k1, v1 (partition0)
  • Msg2: k2, v2 (partition1)

Assumption messages are put in different partition (because of key) After selectKey: k1 -> k, k2 -> k

  • Msg1: k, v1
  • Msg2: k, v2

Operation selectKey is stateless so messages are not sent to downstream (topic) and repartition doesn't happen. For first message: value is put for key - k in the store (partition0) When second message arrives: for key - k there is no message, because it is different partition (partition1)