How to write transactional spout for storm trident

177 views Asked by At

I have written a demo about trident. In the demo, I want to generate a series batches of data in a transaction spout and then do persistentAggregate operation for them.

The normal processing like this:

step1: txid=1, {"aaa 3", "bbb 2"}==>persist to DB(OK)

step2: txid=2, {"ccc 6", "ddd 7"}==>persist to DB(OK)

In the case that the operation of persist to DB is abnormal in step1, I suppose the process should be

step1: txid=1, {"aaa 3", "bbb 2"}==>persist to DB(NG)

step2: txid=1, {"aaa 3", "bbb 2"}==>persist to DB(OK)

But the test result is as following:

step1: txid=1, {"aaa 3", "bbb 2"}==>persist to DB(NG)

step2: txid=1, {"ccc 6", "ddd 7"}==>persist to DB(OK)

I think this process is not correct.But I do not know why it happen. I have checked the zookeeper, in the path "meta/1, the value is null.

I wrote a transactional spout myself implement the interface ITridentSpout, I think it might be a problem with the spout I have written.

Anyone can give me some advice? Or give me a implemented transactional spout example for me to reference.

Thanks!

0

There are 0 answers