Output of Join in Apache Flink

1.2k views Asked by At

In Apache Flink, if I join two data sets on one primary key I get a tuple 2 containing the corresponding data set entry out each of the data sets.

The problem is, when applying a the map() method to the outcoming tuple 2 data set it does not really look nice, especially if the entries of both data sets have a high number of features.

Using tuples in both input data sets gets me some code like this:

var in1: DataSet[(Int, Int, Int, Int, Int)] = /* */
var in2: DataSet[(Int, Int, Int, Int)] = /* */

val out = in1.join(in2).where(0, 1, 2).equalTo(0, 1, 2)
  .map(join => (join._1._1, join._1._2, join._1._3,
                    join._1._4, join._1._5, join._2._4))

I would not mind using POJOs or case classes, but I don't see how this would make it better.

Question 1: Is there a nice way to flaten that tuple 2? E.g. using another operator.

Question 2: How to handle a join of 3 data sets on the same key? It would make the example source even more messy.

Thanks for helping.

1

There are 1 answers

0
Fabian Hueske On BEST ANSWER

you can directly apply a join function on each pair of joined elements such as for example

val leftData: DataSet[(String, Int, Int)] = ...
val rightData: DataSet[(String, Int)] = ...
val joined: DataSet[(String, Int, Int)] = leftData
      .join(rightData).where(0).equalTo(0) { (l, r) => (l._1, l._2, l._3 + r._2) ) }

To answer the second question, Flink handles only binary joins. However, Flink's optimizer can avoid to do unnecessary shuffles, if you give a hint about the behavior of your function. Forward Field annotations tell the optimizer, that certain fields (such as the join key) have not been modified by your join function and enables reusing existing partitioning and sortings.