Apache Flink - use values from a data stream to dynamically create a streaming data source

3k views Asked by At

I'm trying to build a sample application using Apache Flink that does the following:

  1. Reads a stream of stock symbols (e.g. 'CSCO', 'FB') from a Kafka queue.
  2. For each symbol performs a real-time lookup of current prices and streams the values for downstream processing.

* Update to original post *

I moved the map function into a separate class and do not get the run-time error message "The implementation of the MapFunction is not serializable any more. The object probably contains or references non serializable fields".

The issue I'm facing now is that the Kafka topic "stockprices" I'm trying to write the prices to is not receiving them. I'm trying to trouble-shoot and will post any updates.

public class RetrieveStockPrices { 
    @SuppressWarnings("serial") 
    public static void main(String[] args) throws Exception { 
        final StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); 

        Properties properties = new Properties(); 
        properties.setProperty("bootstrap.servers", "localhost:9092"); 
        properties.setProperty("zookeeper.connect", "localhost:2181"); 
        properties.setProperty("group.id", "stocks"); 

        DataStream<String> streamOfStockSymbols = streamExecEnv.addSource(new FlinkKafkaConsumer08<String>("stocksymbol", new SimpleStringSchema(), properties)); 

        DataStream<String> stockPrice = 
            streamOfStockSymbols 
            //get unique keys 
            .keyBy(new KeySelector<String, String>() { 
                @Override 
                public String getKey(String trend) throws Exception {
                    return trend; 
                }
                }) 
            //collect events over a window 
            .window(TumblingEventTimeWindows.of(Time.seconds(60))) 
            //return the last event from the window...all elements are the same "Symbol" 
            .apply(new WindowFunction<String, String, String, TimeWindow>() {
                @Override 
                public void apply(String key, TimeWindow window, Iterable<String> input, Collector<String> out) throws Exception { 
                    out.collect(input.iterator().next().toString()); 
                }
            })
            .map(new StockSymbolToPriceMapFunction());

        streamExecEnv.execute("Retrieve Stock Prices"); 
    }
}

public class StockSymbolToPriceMapFunction extends RichMapFunction<String, String> {
    @Override
    public String map(String stockSymbol) throws Exception {
        final StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
        System.out.println("StockSymbolToPriceMapFunction: stockSymbol: " + stockSymbol);

        DataStream<String> stockPrices = streamExecEnv.addSource(new LookupStockPrice(stockSymbol));
        stockPrices.keyBy(new CustomKeySelector()).addSink(new FlinkKafkaProducer08<String>("localhost:9092", "stockprices", new SimpleStringSchema()));

        return "100000";
    }

    private static class CustomKeySelector implements KeySelector<String, String> {
        @Override
        public String getKey(String arg0) throws Exception {
            return arg0.trim();
        }
    }
}


public class LookupStockPrice extends RichSourceFunction<String> { 
    public String stockSymbol = null; 
    public boolean isRunning = true; 

    public LookupStockPrice(String inSymbol) { 
            stockSymbol = inSymbol; 
    } 

    @Override 
    public void open(Configuration parameters) throws Exception { 
            isRunning = true; 
    } 


    @Override 
    public void cancel() { 
            isRunning = false; 
    } 

    @Override 
    public void run(SourceFunction.SourceContext<String> ctx) 
                    throws Exception { 
            String stockPrice = "0";
            while (isRunning) { 
                //TODO: query Google Finance API 
                stockPrice = Integer.toString((new Random()).nextInt(100)+1);
                ctx.collect(stockPrice);
                Thread.sleep(10000);
            } 
    } 
}
1

There are 1 answers

3
Fabian Hueske On BEST ANSWER

StreamExecutionEnvironment are not indented to be used inside of operators of a streaming application. Not intended means, this is not tested and encouraged. It might work and do something, but will most likely not behave well and probably kill your application.

The StockSymbolToPriceMapFunction in your program specifies for each incoming record a completely new and independent new streaming application. However, since you do not call streamExecEnv.execute() the programs are not started and the map method returns without doing anything.

If you would call streamExecEnv.execute(), the function would start a new local Flink cluster in the workers JVM and start the application on this local Flink cluster. The local Flink instance will take a lot of the heap space and after a few clusters have been started, the worker will probably die due to an OutOfMemoryError which is not what you want to happen.