My use case is to call a query to fetch records from db with different input parameters. After fetching records, do some processing, and then finally write it into a file. My input parameter values depend on the complete processing of the previous query. My problem is, how will i know in a spout that processing of previous query has been completed, i.e. records has been successfully written into the file.
I tried implementing ITridentSpout
but still not getting any solution. Below is my code for ITridentSpout
:
TridentCoordinator.java
package com.TransactionlTopology;
import java.util.concurrent.ConcurrentHashMap;
import storm.trident.spout.ITridentSpout;
public class TridentCoordinator implements ITridentSpout.BatchCoordinator<ConcurrentHashMap<Long,String>>{
ConcurrentHashMap<Long,String> prevMetadata=new ConcurrentHashMap<Long, String>();
boolean result=true;
@Override
public void success(long txid) {
System.out.println("inside success mehod with txid as "+txid);
if(prevMetadata.containsKey(txid)){
prevMetadata.replace(txid, "SUCCESS");
}
}
@Override
public boolean isReady(long txid) {
if(!prevMetadata.isEmpty()){
result=true;
for(Long txId:prevMetadata.keySet()){
System.out.println("txId:---- "+txId +" value"+prevMetadata.get(txId) );
if(prevMetadata.get(txId).equalsIgnoreCase("SUCESS")){
prevMetadata.put(txid, "STARTED");
result= true;
}
}
}
else{
prevMetadata.put(txid, "STARTED");
result= true;
}
System.out.println("inside isReady function with txid as:---- "+txid+"result value:-- "+result);
return result;
}
@Override
public void close() {
// TODO Auto-generated method stub
}
@Override
public ConcurrentHashMap<Long,String> initializeTransaction(long txid, ConcurrentHashMap<Long,String> prevMetadata, ConcurrentHashMap<Long,String> currMetadata) {
System.out.println("inside initialize transaction method with values as:----- "+txid+" "+prevMetadata+" "+currMetadata);
return prevMetadata;
}
}
TridentEmitterImpl.java
package com.TransactionlTopology;
import java.util.concurrent.ConcurrentHashMap;
import storm.trident.operation.TridentCollector;
import storm.trident.spout.ITridentSpout;
import storm.trident.topology.TransactionAttempt;
import backtype.storm.tuple.Values;
public class TridentEmitterImpl implements ITridentSpout.Emitter<ConcurrentHashMap<Long,String>> {
@Override
public void emitBatch(TransactionAttempt tx, ConcurrentHashMap<Long,String> coordinatorMeta,TridentCollector collector) {
System.out.println("inside emitbatch of emitter class with values as:--- "+coordinatorMeta);
System.out.println("tx.getAttemptId() "+tx.getAttemptId()+"tx.getTransactionId() "+tx.getTransactionId()+"tx.getId() "+tx.getId().toString());
collector.emit(new Values("preeti"));
}
@Override
public void success(TransactionAttempt tx) {
System.out.println("inside success of emitter with tx id as "+tx.getTransactionId());
}
@Override
public void close() {
// TODO Auto-generated method stub
}
}
TridentSpoutImpl.java
package com.TransactionlTopology;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import storm.trident.spout.ITridentSpout;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;
public class TridentSpoutImpl implements ITridentSpout<ConcurrentHashMap<Long,String>> {
@Override
public storm.trident.spout.ITridentSpout.BatchCoordinator<ConcurrentHashMap<Long,String>> getCoordinator(String txStateId, Map conf, TopologyContext context) {
return new TridentCoordinator();
}
@Override
public storm.trident.spout.ITridentSpout.Emitter<ConcurrentHashMap<Long,String>> getEmitter(String txStateId, Map conf, TopologyContext context) {
return new TridentEmitterImpl();
}
@Override
public Map getComponentConfiguration() {
Map<String,String> newMap=new HashMap<String, String>();
newMap.put("words","preeti");
return newMap;
}
@Override
public Fields getOutputFields() {
return new Fields("word");
}
}
Also not able to understand what values will come in initializeTransaction
as prevMetaData
and curMetada
. Please provide some solution
You have a variety of options available to you. Perhaps the easiest one, though, is to have the final bolt in your topology, after writing the file, notify the spout it is good to start the next query via a messaging queue that your spout can watch. When the spout picks up this notification, it can then process the next query.
However, speaking more generally, this seems like a questionable use case for Storm. A lot of your topology's resources will likely be idle a lot of the time since you only have one transaction at a time running through it. Obviously I don't know all the details of your problem, but this kind of dependency between transactions limits the value of the added complexity of using Storm.