stateStore.get() returns inconsistent results, when used from transform() on KStream. It returns null, even though corresponding key-value has been put() into the store.
Can someone explain this behavior of KeyValueStore<>?
@Component
public class StreamProcessor {
    @StreamListener
    public void process(@Input(KStreamBindings.INPUT_STREAM) KStream<String, JsonNode> inputStream) {
        KStream<String, JsonNode> joinedEvents = inputStream
           .selectKey((key, value) -> computeKey(value))
           .transform(
               () -> new SelfJoinTransformer((v1, v2) -> join(v1, v2), "join_store"),
               "join_store"
            );
        
        joinedEvents
               .foreach((key, value) -> System.out.format("%s,joined=%b\n",key, value.has("right")));
    }
    private JsonNode join(JsonNode left, JsonNode right) {
        ((ObjectNode) left).set("right", right);
        return left;
    }
}
public class SelfJoinTransformer implements Transformer<String, JsonNode, KeyValue<String, JsonNode>> {
  private KeyValueStore<String, JsonNode> stateStore;
  private ValueJoiner<JsonNode, JsonNode, JsonNode> valueJoiner;
  private String storeName;
  public SelfJoinTransformer(ValueJoiner<JsonNode, JsonNode, JsonNode> valueJoiner, String storeName) {
    this.storeName = storeName;
    this.valueJoiner = valueJoiner;
  }
  @Override
  public void init(ProcessorContext context) {
     this.stateStore = (KeyValueStore<String, JsonNode>) context.getStateStore(storeName);
  }
  @Override
  public KeyValue<String, JsonNode> transform(String key, JsonNode value) {
    JsonNode oldValue = stateStore.get(key);
    if (oldValue != null) { //this condition rarely holds true
        stateStore.delete(key);
        System.out.format("%s,joined\n", key);
        return KeyValue.pair(key, valueJoiner.apply(oldValue, value));
    }
    stateStore.put(key, value);
    return null;
  }
}
 
                        
The reason, that it seems messages are disappearing (assuming, that punctuator doesn't remove them) is that you use KStream::selectKey(...), it change key, but doesn't do repartitioning And you might look for the key in wrong partitions.
Look at following scenarion:
k1,v1(partition0)k2,v2(partition1)Assumption messages are put in different partition (because of key) After selectKey:
k1 -> k,k2 -> kk,v1k,v2Operation
selectKeyis stateless so messages are not sent to downstream (topic) and repartition doesn't happen. For first message: value is put for key - k in the store (partition0) When second message arrives: for key - k there is no message, because it is different partition (partition1)