Kafka Streams has an interface, Processor
, the implementation of which is stateful. An example implementation given in the developer guide is:
public class WordCountProcessor implements Processor<String, String> {
private ProcessorContext context;
private KeyValueStore<String, Long> kvStore;
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
// keep the processor context locally because we need it in punctuate() and commit()
this.context = context;
// call this processor's punctuate() method every 1000 time units.
this.context.schedule(1000);
// retrieve the key-value store named "Counts"
kvStore = (KeyValueStore) context.getStateStore("Counts");
}
@Override
public void process(String dummy, String line) {
String[] words = line.toLowerCase().split(" ");
for (String word : words) {
Long oldValue = kvStore.get(word);
if (oldValue == null) {
kvStore.put(word, 1L);
} else {
kvStore.put(word, oldValue + 1L);
}
}
}
@Override
public void punctuate(long timestamp) {
KeyValueIterator<String, Long> iter = this.kvStore.all();
while (iter.hasNext()) {
KeyValue<String, Long> entry = iter.next();
context.forward(entry.key, entry.value.toString());
}
iter.close();
// commit the current processing progress
context.commit();
}
@Override
public void close() {
// close the key-value store
kvStore.close();
}
}
The init
method initializes WordCountProcessor
's internal state, such as retrieving a key-value store. Other methods, like process
and close
, make use of this state.
It's not clear to me how to reify
such an interface in Clojure. How would we pass on the state retrieved by init
to process
, close
, etc.?
Using a closure?
One idea I have is to use a closure:
(let [ctx (atom nil)]
(reify Processor
(close [this]
;; Do something w/ ctx
)
(init [this context]
(reset! ctx context))
(process [this k v]
;; Do something w/ ctx
)
(punctuate [this timestamp]
;; Do something w/ ctx
)))
Annoyingly, we'd have to start with the ProcessorContext
object each time, so the key-value store code would be repeated across all methods that need the key-value store.
I don't see a (general) way around that, though on a case-by-case basis we can replace the ctx
atom with more specific state that the methods need.
Is there a better way?
Closing over an atom would be the main way to do it. Your original class has two fields, so you can close over two atoms to get the same effect
If that's still too tedious then you can add some convenience functions that also close over the atoms
The alternative would be to use gen-class, but think you'll be better off with reify.