Apache flink Wikipedia edit analytics with Scala

473 views Asked by At

I'm trying to rewrite the wikipedia edit stream analytics in Apache Flink tutorials to Scala from https://ci.apache.org/projects/flink/flink-docs-release-1.2/quickstart/run_example_quickstart.html

The code from the tutorial is

import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource;

public class WikipediaAnalysis {

  public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());

    KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
      .keyBy(new KeySelector<WikipediaEditEvent, String>() {
        @Override
        public String getKey(WikipediaEditEvent event) {
          return event.getUser();
        }
      });

    DataStream<Tuple2<String, Long>> result = keyedEdits
      .timeWindow(Time.seconds(5))
      .fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
        @Override
        public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) {
          acc.f0 = event.getUser();
          acc.f1 += event.getByteDiff();
          return acc;
        }
      });

    result.print();

    see.execute();
  }
}

below is my attempt in scala

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.wikiedits.{WikipediaEditEvent, WikipediaEditsSource}
import org.apache.flink.streaming.api.windowing.time.Time


object WikipediaAnalytics extends App{

  val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

  val edits = env.addSource(new WikipediaEditsSource());

  val keyedEdits = edits.keyBy(event => event.getUser)

  val result = keyedEdits.timeWindow(Time.seconds(5)).fold(("", 0L), (we: WikipediaEditEvent, t: (String, Long)) =>
    (we.getUser, t._2 + we.getByteDiff))

}

which is more or less a word to word conversion to scala, based on which the type of the val result should be DataStream[(String, Long)] but the actual type inferred after fold() is no where close.

Please help identify what is wrong with the scala code

EDIT1: made the below changes, using the currying schematic of fold[R] and the type now confirms to the expected type, but still couldn't get hold of the reason though

  val result_1: (((String, Long), WikipediaEditEvent) => (String, Long)) => DataStream[(String, Long)] =
    keyedEdits.timeWindow(Time.seconds(5)).fold(("", 0L))

  val result_2: DataStream[(String, Long)] = result_1((t: (String, Long), we: WikipediaEditEvent ) =>
    (we.getUser, t._2 + we.getByteDiff))
1

There are 1 answers

0
AudioBubble On BEST ANSWER

The problem seems to be with the fold, you have to have a closing bracket after your accumulator initialValue. When you fix that, the code will fail to compile because it doesn't have TypeInformation available for WikipediaEditEvent. The easiest way to resolve that is to import more of flink scala API. See below for a full example:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource
import org.apache.flink.streaming.api.windowing.time.Time

object WikipediaAnalytics extends App {
  val see: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  val edits = see.addSource(new WikipediaEditsSource())
  val userEditsVolume: DataStream[(String, Int)] = edits
    .keyBy(_.getUser)
    .timeWindow(Time.seconds(5))
    .fold(("", 0))((acc, event) => (event.getUser, acc._2 + event.getByteDiff))
  userEditsVolume.print()
  see.execute("Wikipedia User Edit Volume")
}