Apache Storm: can't receive tuples from multiple bolts

31 views Asked by At

I'm creating a topology which reads a sample video, do some conversions on it and saves it as output and I'm using apache storm to apply different filters simultaneously. Imagine after reading the video I sent its frames to 2 bolts. one applies gaussian blur effect and the other one sharpens every receiving frame. Both sharpener and gaussian blur emit their tuples to the same destination successfully. And now I want to merge the resulting frames emitted from these two bolts but the aggregator-bolt keeps receiving only one tuple values at a time. how can I fix it?

Topology.java:

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
import java.io.*;

public class Topology {
   public static void main(String[] args) throws Exception {
      // Create a log file for stdout and stderr
      File logFile = new File("topology.log");

      // Redirect stdout and stderr to the log file
      PrintStream printStream = new PrintStream(new FileOutputStream(logFile));
       try (printStream) {
           System.setOut(printStream);
           System.setErr(printStream);
           Config config = new Config();                      // Create Config instance for cluster configuration
           config.setDebug(true);
           TopologyBuilder builder = new TopologyBuilder();   // Create a TopologyBuilder

           Spout spout = new Spout();                         // Set the spout and bolts in the topology
           BoltFrameAnalyzer boltFrameAnalyzer = new BoltFrameAnalyzer();
           BoltAnalysisSaver boltAnalysisSaver = new BoltAnalysisSaver();
           BoltImageProcessor boltImageProcessor = new BoltImageProcessor();
           BoltGaussianBlur boltGaussianBlur = new BoltGaussianBlur();
           BoltSharpener boltSharpener = new BoltSharpener();
           BoltFrameAggregator boltFrameAggregator = new BoltFrameAggregator();
           BoltOutputGenerator boltOutputGenerator = new BoltOutputGenerator();
           builder.setSpout("spout", spout, 1);      // Define the data flow by connecting the spout and bolts
           builder.setBolt("bolt-frame-analyzer", boltFrameAnalyzer, 1).shuffleGrouping("spout");
           builder.setBolt("bolt-analysis-saver", boltAnalysisSaver, 1).shuffleGrouping("bolt-frame-analyzer");
           builder.setBolt("bolt-image-processor", boltImageProcessor, 1).shuffleGrouping("spout");
           builder.setBolt("bolt-gaussian-blur", boltGaussianBlur, 1).shuffleGrouping("bolt-image-processor");
           builder.setBolt("bolt-sharpener", boltSharpener, 1).shuffleGrouping("bolt-image-processor");
           builder.setBolt("bolt-frame-aggregator", boltFrameAggregator, 1).shuffleGrouping("bolt-sharpener").shuffleGrouping("bolt-gaussian-blur");
           builder.setBolt("bolt-output-generator", boltOutputGenerator, 1).shuffleGrouping("bolt-frame-aggregator");

           try (LocalCluster cluster = new LocalCluster()) {  // Use try-with-resources
               cluster.submitTopology("Topology", config, builder.createTopology());
               Thread.sleep(100000);  // Adjust sleep time as needed
           }  // Automatic cluster shutdown when exiting the try block
       }
       // Close the PrintStream and log file
   }
}

BoltGaussianBlur.java:

import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.opencv.core.Mat;
import org.opencv.core.Size;
import org.opencv.imgcodecs.Imgcodecs;
import org.opencv.imgproc.Imgproc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Properties;

public class BoltGaussianBlur extends BaseBasicBolt {
    private final String framesGaussianBlurFilePath;

    public BoltGaussianBlur() {
        try {
            Properties prop = new Properties();
            prop.load(new FileInputStream("config.ini"));
            framesGaussianBlurFilePath = prop.getProperty("framesGaussianBlurFilePath");
        } catch (IOException e) {
            LOG.error("BoltImageProcessor: Error occurred while reading config.ini file", e);
            throw new RuntimeException(e);
        }
    }

    private static final Logger LOG = LoggerFactory.getLogger(BoltGaussianBlur.class);
    public void execute(Tuple input, BasicOutputCollector collector) {
        int gaussianBlurFrameNumber = input.getIntegerByField("frameNumber");
        LOG.info("BoltGaussianBlur: Frame #" + gaussianBlurFrameNumber + " has been received successfully.");
        Mat receivedFrame = (Mat) input.getValueByField("resizedFrame");
        Mat gaussianBlurFrame = new Mat();
        Imgproc.GaussianBlur(receivedFrame, gaussianBlurFrame, new Size(9, 9), 2, 2);
        String gaussianBlurFileName = framesGaussianBlurFilePath + "/frame_" + gaussianBlurFrameNumber + "_gaussian_blur.png";
        Imgcodecs.imwrite(gaussianBlurFileName, gaussianBlurFrame);
        LOG.info("BoltGaussianBlur: Frame #" + gaussianBlurFrameNumber + " has been converted to Gaussian blur successfully.");
        collector.emit(new Values(gaussianBlurFrame, gaussianBlurFrameNumber));
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("gaussianBlurFrame", "gaussianBlurFrameNumber"));
    }
}

BoltSharpener.java:

import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.opencv.core.Core;
import org.opencv.core.Mat;
import org.opencv.core.Size;
import org.opencv.imgproc.Imgproc;
import org.opencv.imgcodecs.Imgcodecs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Properties;

public class BoltSharpener extends BaseBasicBolt {
    private final String framesSharpenedFilePath;

    public BoltSharpener() {
        try {
            Properties prop = new Properties();
            prop.load(new FileInputStream("config.ini"));
            framesSharpenedFilePath = prop.getProperty("framesSharpenedFilePath");
        } catch (IOException e) {
            LOG.error("BoltSharpener: Error occurred while reading config.ini file", e);
            throw new RuntimeException(e);
        }
    }

    private static final Logger LOG = LoggerFactory.getLogger(BoltSharpener.class);
    public void execute(Tuple input, BasicOutputCollector collector) {
        int sharpenedFrameNumber = input.getIntegerByField("frameNumber");
        LOG.info("BoltSharpener: Frame #" + sharpenedFrameNumber + " has been received successfully.");
        Mat receivedFrame = (Mat) input.getValueByField("resizedFrame");
        Mat sharpenedFrame = new Mat();
        Imgproc.GaussianBlur(receivedFrame, sharpenedFrame, new Size(0, 0), 10);
        Core.addWeighted(receivedFrame, 1.5, sharpenedFrame, -0.5, 0, sharpenedFrame);
        String sharpenedFileName = framesSharpenedFilePath + "/frame_" + sharpenedFrameNumber + "_sharpened.png";
        Imgcodecs.imwrite(sharpenedFileName, sharpenedFrame);
        LOG.info("BoltSharpener: Frame #" + sharpenedFrameNumber + " has been sharpened successfully.");
        collector.emit(new Values(sharpenedFrame, sharpenedFrameNumber));
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("sharpenedFrame", "sharpenedFrameNumber"));
    }
}

BoltFrameAggregator:

import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.opencv.core.Core;
import org.opencv.core.Mat;
import org.opencv.imgcodecs.Imgcodecs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Properties;

public class BoltFrameAggregator extends BaseBasicBolt {
    private final String framesAggregated;
    public BoltFrameAggregator() {
        try {
            Properties prop = new Properties();
            prop.load(new FileInputStream("config.ini"));
            framesAggregated = prop.getProperty("framesAggregatedFilePath");
        } catch (IOException e) {
            LOG.error("BoltAggregator: Error occurred while reading config.ini file", e);
            throw new RuntimeException(e);
        }
    }

    private static final Logger LOG = LoggerFactory.getLogger(BoltFrameAggregator.class);
    public void execute(Tuple input, BasicOutputCollector collector) {
        Mat receivedGaussianBlurFrame = (Mat) input.getValueByField("gaussianBlurFrame");
        int gaussianBlurFrameNumber = input.getIntegerByField("gaussianBlurFrameNumber");
        Mat receivedSharpenedFrame = (Mat) input.getValueByField("sharpenedFrame");
        int sharpenedFrameNumber = input.getIntegerByField("sharpenedFrameNumber");
        int aggregatedFrameNumber = input.getIntegerByField("sharpenedFrameNumber");
        Mat aggregatedFrame = new Mat();
        if (sharpenedFrameNumber == gaussianBlurFrameNumber) {
            Core.addWeighted(receivedSharpenedFrame, 1, receivedGaussianBlurFrame, 1, 0, aggregatedFrame);
            String aggregatedFileName = framesAggregated + "/frame_" + sharpenedFrameNumber + "_aggregated.png";
            Imgcodecs.imwrite(aggregatedFileName, aggregatedFrame);
            LOG.info("BoltAggregator: Frame #" + sharpenedFrameNumber + " has been aggregated successfully.");
            collector.emit(new Values(aggregatedFrame, aggregatedFrameNumber));
        } else {
            LOG.warn("BoltAggregator: Frame numbers for Gaussian blur and sharpened frames do not match.");
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("aggregatedFrame", "aggregatedFrameNumber"));
    }
}

I checked every typo mistakes, tried assigning a name to the output streams of gaussian-blur and sharpener. Even I tried to store the receiving tuples to see if after the first 4 or 5 tuples from gaussian-blur, the sharpener tuples will arrive but didn't work.

Error Log:

[Thread-42-bolt-frame-aggregator-executor[3, 3]] INFO  o.a.s.e.Executor - Processing received TUPLE: source: bolt-gaussian-blur:5, stream: default, id: {}, [Mat [ 720*1280*CV_8UC1, isCont=true, isSubmat=false, nativeObj=0x1fc231bd4a0, dataAddr=0x1fc258f0f60 ], 0] PROC_START_TIME(sampled): null EXEC_START_TIME(sampled): null for TASK: 3 
...
[Thread-42-bolt-frame-aggregator-executor[3, 3]] ERROR o.a.s.u.Utils - Async loop died!
java.lang.RuntimeException: java.lang.IllegalArgumentException: sharpenedFrame does not exist
    at org.apache.storm.executor.Executor.accept(Executor.java:301) ~[storm-client-2.6.0.jar:2.6.0]
    at org.apache.storm.utils.JCQueue.consumeImpl(JCQueue.java:113) ~[storm-client-2.6.0.jar:2.6.0]
    at org.apache.storm.utils.JCQueue.consume(JCQueue.java:89) ~[storm-client-2.6.0.jar:2.6.0]
    at org.apache.storm.executor.bolt.BoltExecutor$1.call(BoltExecutor.java:154) ~[storm-client-2.6.0.jar:2.6.0]
    at org.apache.storm.executor.bolt.BoltExecutor$1.call(BoltExecutor.java:140) ~[storm-client-2.6.0.jar:2.6.0]
    at org.apache.storm.utils.Utils$1.run(Utils.java:398) ~[storm-client-2.6.0.jar:2.6.0]
    at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]
Caused by: java.lang.IllegalArgumentException: sharpenedFrame does not exist
    at org.apache.storm.tuple.Fields.fieldIndex(Fields.java:98) ~[storm-client-2.6.0.jar:2.6.0]
    at org.apache.storm.tuple.TupleImpl.fieldIndex(TupleImpl.java:101) ~[storm-client-2.6.0.jar:2.6.0]
    at org.apache.storm.tuple.TupleImpl.getValueByField(TupleImpl.java:161) ~[storm-client-2.6.0.jar:2.6.0]
    at BoltFrameAggregator.execute(BoltFrameAggregator.java:33) ~[classes/:?]
    at org.apache.storm.topology.BasicBoltExecutor.execute(BasicBoltExecutor.java:48) ~[storm-client-2.6.0.jar:2.6.0]
    at org.apache.storm.executor.bolt.BoltExecutor.tupleActionFn(BoltExecutor.java:212) ~[storm-client-2.6.0.jar:2.6.0]
    at org.apache.storm.executor.Executor.accept(Executor.java:294) ~[storm-client-2.6.0.jar:2.6.0]
    ... 6 more
1

There are 1 answers

0
moosehead42 On

As the error message indicates, your variable sharpenedFrame seems do be not existent. So can you double check, if its properly initialized? You might need to aplly extensive logging - or go for a pseudo-cluster on your local machine, which makes things easier to debug at all.