I have for example these batches of tuples with size of batch 5 with impressions from users:
Batch 1:
[UUID1, clientId1]
[UUID2, clientId1]
[UUID2, clientId1]
[UUID2, clientId1]
[UUID3, clientId2]
Batch 2:
[UUID4, clientId1]
[UUID5, clientId1]
[UUID5, clientId1]
[UUID6, clientId2]
[UUID6, clientId2]
And this is my example of saving count state:
TridentState ClientState = impressionStream
.groupBy(new Fields("clientId"))
.persistentAggregate(getCassandraStateFactory("users", "DataComputation",
"UserImpressionCounter"), new Count(), new Fields("count));
Stream ClientStream = ClientState.newValuesStream();
I have clear database and run my topology. After grouping stream by clientId I save the state with persistentAggregate function and Count aggregator.
For the first batch is the result after newValuesStream method: [clientId1, 4]
, [clientId2, 1]
.
For the second batch: [clientId1, 7]
, [clientId2, 3]
as expected.
ClientStream is used in couple of branches and in one
of these branches I need to process tuples so as to have batch with size 1 because I need information about count for each
tuple.
Batch with size 1 is obviously crap so I have to somehow find out the previous state of the counter before I update it and emit
this information with tuple there is already updated counter, e.g. for second batch [clientId1, 7, 4]
.
Have anybody idea how to do that?
I have solved this issue by adding new aggregator and join with persist aggregate:
SumCountAggregator: