I'd expect the store in the following code to return a continuously growing sequence of numbers, but it does not:
import org.apache.kafka.streams.processor.api.Processor
import org.apache.kafka.streams.processor.api.ProcessorContext
import org.apache.kafka.streams.processor.api.Record
import org.apache.kafka.streams.state.KeyValueStore
import org.apache.kafka.streams.processor.api.Processor
import org.apache.kafka.streams.processor.api.ProcessorContext
import org.apache.kafka.streams.processor.api.Record
import org.apache.kafka.streams.state.KeyValueStore
class Processor : Processor<String, Event, String, Event> {
private lateinit var context: ProcessorContext<String, Event>
private lateinit var store: KeyValueStore<Int, Int>
override fun init(context: ProcessorContext<String, Event>) {
super.init(context)
this.context = context
store = this.context.getStateStore(Store.NAME)
}
override fun process(record: Record<String, Event>?) {
val event = record!!.value()
val key = event.key
val lastValue = store[key]
val currentValue = lastValue?.inc() ?: 1
store.put(key, currentValue)
store.flush()
}
}
Instead it returns random int values such as 1,2,1,4,2,7,6,5,2,4,3,... and so on.
The process step is as follows:
.process(object : ProcessorSupplier<String, Event, String, Event> {
override fun get() = Processor()
override fun stores(): MutableSet<StoreBuilder<*>> {
return mutableSetOf(Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(Store.NAME),
Serdes.Integer(),
Serdes.Integer()
))
}
}, Store.NAME)
Even set num.stream.threads: 1.
What's going on...?