issue on implementing transactional topology in trident

532 views Asked by At

My use case is to call a query to fetch records from db with different input parameters. After fetching records, do some processing, and then finally write it into a file. My input parameter values depend on the complete processing of the previous query. My problem is, how will i know in a spout that processing of previous query has been completed, i.e. records has been successfully written into the file.

I tried implementing ITridentSpout but still not getting any solution. Below is my code for ITridentSpout:

package com.TransactionlTopology;

import java.util.concurrent.ConcurrentHashMap;

import storm.trident.spout.ITridentSpout;

public class TridentCoordinator implements ITridentSpout.BatchCoordinator<ConcurrentHashMap<Long,String>>{

    ConcurrentHashMap<Long,String> prevMetadata=new ConcurrentHashMap<Long, String>();
    boolean result=true;

    public void success(long txid) {
        System.out.println("inside success mehod with txid as  "+txid);
            prevMetadata.replace(txid, "SUCCESS");

    public boolean isReady(long txid) {
        for(Long txId:prevMetadata.keySet()){
            System.out.println("txId:---- "+txId +"    value"+prevMetadata.get(txId) );
                prevMetadata.put(txid, "STARTED");
                result= true;
            prevMetadata.put(txid, "STARTED");
            result= true;

        System.out.println("inside isReady function with txid as:---- "+txid+"result value:--  "+result);

        return result;

    public void close() {
        // TODO Auto-generated method stub


    public ConcurrentHashMap<Long,String> initializeTransaction(long txid, ConcurrentHashMap<Long,String> prevMetadata, ConcurrentHashMap<Long,String> currMetadata) {
        System.out.println("inside initialize transaction method with values as:----- "+txid+"   "+prevMetadata+"   "+currMetadata);

        return prevMetadata;

package com.TransactionlTopology;

import java.util.concurrent.ConcurrentHashMap;

import storm.trident.operation.TridentCollector;
import storm.trident.spout.ITridentSpout;
import storm.trident.topology.TransactionAttempt;
import backtype.storm.tuple.Values;

public class TridentEmitterImpl implements ITridentSpout.Emitter<ConcurrentHashMap<Long,String>> {

    public void emitBatch(TransactionAttempt tx, ConcurrentHashMap<Long,String> coordinatorMeta,TridentCollector collector) {
        System.out.println("inside emitbatch of emitter class with values as:--- "+coordinatorMeta);
        System.out.println("tx.getAttemptId()   "+tx.getAttemptId()+"tx.getTransactionId()  "+tx.getTransactionId()+"tx.getId()  "+tx.getId().toString());
        collector.emit(new Values("preeti"));

    public void success(TransactionAttempt tx) {
        System.out.println("inside success of emitter with tx id as   "+tx.getTransactionId());


    public void close() {
        // TODO Auto-generated method stub


package com.TransactionlTopology;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import storm.trident.spout.ITridentSpout;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;

public class TridentSpoutImpl implements ITridentSpout<ConcurrentHashMap<Long,String>> {

    public storm.trident.spout.ITridentSpout.BatchCoordinator<ConcurrentHashMap<Long,String>> getCoordinator(String txStateId, Map conf, TopologyContext context) {

        return new TridentCoordinator();

    public storm.trident.spout.ITridentSpout.Emitter<ConcurrentHashMap<Long,String>> getEmitter(String txStateId, Map conf, TopologyContext context) {

        return new TridentEmitterImpl();

    public Map getComponentConfiguration() {

        Map<String,String> newMap=new HashMap<String, String>();
        return newMap;

    public Fields getOutputFields() {

        return new Fields("word");


Also not able to understand what values will come in initializeTransaction as prevMetaData and curMetada. Please provide some solution


There are 1 answers

Gordon Seidoh Worley On

You have a variety of options available to you. Perhaps the easiest one, though, is to have the final bolt in your topology, after writing the file, notify the spout it is good to start the next query via a messaging queue that your spout can watch. When the spout picks up this notification, it can then process the next query.

However, speaking more generally, this seems like a questionable use case for Storm. A lot of your topology's resources will likely be idle a lot of the time since you only have one transaction at a time running through it. Obviously I don't know all the details of your problem, but this kind of dependency between transactions limits the value of the added complexity of using Storm.