State store in Kafka Streams processor returns random values

33 views Asked by At

I'd expect the store in the following code to return a continuously growing sequence of numbers, but it does not:

import org.apache.kafka.streams.processor.api.Processor
import org.apache.kafka.streams.processor.api.ProcessorContext
import org.apache.kafka.streams.processor.api.Record
import org.apache.kafka.streams.state.KeyValueStore

import org.apache.kafka.streams.processor.api.Processor
import org.apache.kafka.streams.processor.api.ProcessorContext
import org.apache.kafka.streams.processor.api.Record
import org.apache.kafka.streams.state.KeyValueStore

class Processor : Processor<String, Event, String, Event> {
    private lateinit var context: ProcessorContext<String, Event>
    private lateinit var store: KeyValueStore<Int, Int>

    override fun init(context: ProcessorContext<String, Event>) {
        super.init(context)
        this.context = context
        store = this.context.getStateStore(Store.NAME)
    }

    override fun process(record: Record<String, Event>?) {
        val event = record!!.value()

        val key = event.key
        val lastValue = store[key]
        val currentValue = lastValue?.inc() ?: 1
        store.put(key, currentValue)
        store.flush()
    }
}

Instead it returns random int values such as 1,2,1,4,2,7,6,5,2,4,3,... and so on.

The process step is as follows:

                .process(object : ProcessorSupplier<String, Event, String, Event> {
                    override fun get() = Processor()

                    override fun stores(): MutableSet<StoreBuilder<*>> {

                        return mutableSetOf(Stores.keyValueStoreBuilder(
                            Stores.persistentKeyValueStore(Store.NAME),
                            Serdes.Integer(),
                            Serdes.Integer()
                        ))
                    }
                }, Store.NAME)

Even set num.stream.threads: 1.

What's going on...?

0

There are 0 answers