issue on implementing transactional topology in trident

508 views Asked by At

My use case is to call a query to fetch records from db with different input parameters. After fetching records, do some processing, and then finally write it into a file. My input parameter values depend on the complete processing of the previous query. My problem is, how will i know in a spout that processing of previous query has been completed, i.e. records has been successfully written into the file.

I tried implementing ITridentSpout but still not getting any solution. Below is my code for ITridentSpout:

TridentCoordinator.java

package com.TransactionlTopology;

import java.util.concurrent.ConcurrentHashMap;

import storm.trident.spout.ITridentSpout;

public class TridentCoordinator implements ITridentSpout.BatchCoordinator<ConcurrentHashMap<Long,String>>{

    ConcurrentHashMap<Long,String> prevMetadata=new ConcurrentHashMap<Long, String>();
    boolean result=true;

    @Override
    public void success(long txid) {
        System.out.println("inside success mehod with txid as  "+txid);
        if(prevMetadata.containsKey(txid)){
            prevMetadata.replace(txid, "SUCCESS");
        }
    }

    @Override
    public boolean isReady(long txid) {
        if(!prevMetadata.isEmpty()){
            result=true;
        for(Long txId:prevMetadata.keySet()){
            System.out.println("txId:---- "+txId +"    value"+prevMetadata.get(txId) );
            if(prevMetadata.get(txId).equalsIgnoreCase("SUCESS")){
                prevMetadata.put(txid, "STARTED");
                result= true;
            }
        }
        }
        else{
            prevMetadata.put(txid, "STARTED");
            result= true;
        }

        System.out.println("inside isReady function with txid as:---- "+txid+"result value:--  "+result);

        return result;
    }

    @Override
    public void close() {
        // TODO Auto-generated method stub

    }

    @Override
    public ConcurrentHashMap<Long,String> initializeTransaction(long txid, ConcurrentHashMap<Long,String> prevMetadata, ConcurrentHashMap<Long,String> currMetadata) {
        System.out.println("inside initialize transaction method with values as:----- "+txid+"   "+prevMetadata+"   "+currMetadata);

        return prevMetadata;
    }
}

TridentEmitterImpl.java

package com.TransactionlTopology;

import java.util.concurrent.ConcurrentHashMap;

import storm.trident.operation.TridentCollector;
import storm.trident.spout.ITridentSpout;
import storm.trident.topology.TransactionAttempt;
import backtype.storm.tuple.Values;

public class TridentEmitterImpl implements ITridentSpout.Emitter<ConcurrentHashMap<Long,String>> {

    @Override
    public void emitBatch(TransactionAttempt tx, ConcurrentHashMap<Long,String> coordinatorMeta,TridentCollector collector) {
        System.out.println("inside emitbatch of emitter class with values as:--- "+coordinatorMeta);
        System.out.println("tx.getAttemptId()   "+tx.getAttemptId()+"tx.getTransactionId()  "+tx.getTransactionId()+"tx.getId()  "+tx.getId().toString());
        collector.emit(new Values("preeti"));
    }

    @Override
    public void success(TransactionAttempt tx) {
        System.out.println("inside success of emitter with tx id as   "+tx.getTransactionId());

    }

    @Override
    public void close() {
        // TODO Auto-generated method stub

    }
}

TridentSpoutImpl.java

package com.TransactionlTopology;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import storm.trident.spout.ITridentSpout;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;

public class TridentSpoutImpl implements ITridentSpout<ConcurrentHashMap<Long,String>> {

    @Override
    public storm.trident.spout.ITridentSpout.BatchCoordinator<ConcurrentHashMap<Long,String>> getCoordinator(String txStateId, Map conf, TopologyContext context) {

        return new TridentCoordinator();
    }

    @Override
    public storm.trident.spout.ITridentSpout.Emitter<ConcurrentHashMap<Long,String>> getEmitter(String txStateId, Map conf, TopologyContext context) {

        return new TridentEmitterImpl();
    }

    @Override
    public Map getComponentConfiguration() {

        Map<String,String> newMap=new HashMap<String, String>();
        newMap.put("words","preeti");
        return newMap;
    }

    @Override
    public Fields getOutputFields() {

        return new Fields("word");
    }

}

Also not able to understand what values will come in initializeTransaction as prevMetaData and curMetada. Please provide some solution

1

There are 1 answers

3
Gordon Seidoh Worley On

You have a variety of options available to you. Perhaps the easiest one, though, is to have the final bolt in your topology, after writing the file, notify the spout it is good to start the next query via a messaging queue that your spout can watch. When the spout picks up this notification, it can then process the next query.

However, speaking more generally, this seems like a questionable use case for Storm. A lot of your topology's resources will likely be idle a lot of the time since you only have one transaction at a time running through it. Obviously I don't know all the details of your problem, but this kind of dependency between transactions limits the value of the added complexity of using Storm.