Apache Flink KeyedStream after window operator behavior clarification

1.1k views Asked by At

I'm requesting clarification on exactly how Apache Flink (1.6.0) handles events from KeyedStreams after the events have been sent through a window and some operator (such as reduce() or process()) has been applied.

Assuming a single node cluster, after an operator on a keyed windowed stream has been executed, is one left with exactly 1 DataStreams or exactly k DataStreams (where k is the number of unique values for the key)?

For clarification, consider needing to read in events from some source, key by some k, send the keyed events into some windowed stream, reduce, and then do pretty much anything else. Which of the two graphs below will actually be constructed?

Graph A

                     |--------------|
                     |    source    |
                     | (DataStream) |
                     |--------------|
                            |
                       [all events]
                            |
                            v
                     |--------------|
                     |  key by( k ) |
                     | (KeyedStream)|
                     |--------------|
                   /         |        \
                 /           |          \
            [ k = 1 ]    [ k = 2 ]    [ k = 3 ]
             /               |               \
           /                 |                 \
         v                   v                   v
|------------------||------------------||------------------|
|  sliding window  ||  sliding window  ||  sliding window  |
| (WindowedStream) || (WindowedStream) || (WindowedStream) |
|------------------||------------------||------------------|
         |                   |                   |
     [ k = 1 ]           [ k = 2 ]           [ k = 3 ]
         |                   |                   |
         v                   v                   v
   |----------|        |----------|        |----------|
   |  reduce  |        |  reduce  |        |  reduce  |
   |----------|        |----------|        |----------|
         |                   |                   |
     [ k = 1 ]           [ k = 2 ]           [ k = 3 ]
         |                   |                   |
         v                   v                   v
  |--------------|    |--------------|    |--------------|
  |     foo      |    |     foo      |    |     foo      |
  | (DataStream) |    | (DataStream) |    | (DataStream) |
  |--------------|    |--------------|    |--------------|

Graph B

                     |--------------|
                     |    source    |
                     | (DataStream) |
                     |--------------|
                            |
                       [all events]
                            |
                            v
                     |--------------|
                     |  key by( k ) |
                     | (KeyedStream)|
                     |--------------|
                   /         |        \
                 /           |          \
            [ k = 1 ]    [ k = 2 ]    [ k = 3 ]
             /               |               \
           /                 |                 \
         v                   v                   v
|------------------||------------------||------------------|
|  sliding window  ||  sliding window  ||  sliding window  |
| (WindowedStream) || (WindowedStream) || (WindowedStream) |
|------------------||------------------||------------------|
         |                   |                   |
     [ k = 1 ]           [ k = 2 ]           [ k = 3 ]
         |                   |                   |
         v                   v                   v
   |----------|        |----------|        |----------|
   |  reduce  |        |  reduce  |        |  reduce  |
   |----------|        |----------|        |----------|
         \                   |                  /
            \                |                /
               \             |             /
                  \          |          /
                     \       |       /
                        \    |    /
                           \ | /
                       [all products]
                             |
                             v
                      |--------------|
                      |     foo      |
                      | (DataStream) |
                      |--------------|

Edit (2018-09-22)

Based on David's answer, I think I've misinterpreted how exactly KeyedStreams work in combination with a window or other stream. Somehow, I got the impression that a KeyedStream partitioned the incoming stream by creating multiple streams behind the scenes rather than just grouping objects together by some value using the same stream.

I thought Flink was doing the equivalent of:

List<Foo> eventsForKey1 = ...;
List<Foo> eventsForKey2 = ...;
List<Foo> eventsForKey3 = ...;
...
List<Foo> eventsForKeyN = ...;

I now see that Flink is actually doing the equivalent of:

Map<Key, List<Foo>> events = ...;
2

There are 2 answers

0
David Anderson On BEST ANSWER

The best way to explore what the job graph will look like for various scenarios is to write some simple applications and examine their job graphs in the dashboard that comes with Flink.

I'm not sure how to interpret the fan-out you show after the keyBy, which makes responding to your question difficult. If you are asking about the parallelism of the resulting foo DataStream, it can be whatever you want it to be.

If the parallelism is 1 both before and after the keyBy, then the stream won't be split like you've shown. Instead there will be a single Window operator that handles all the keys. (The parallelism is independent of the number of keys, though a keyed operator -- such as your sliding window and its reduce function -- can not take advantage of a parallelism greater than the number of keys.)

But even in a single node you could have multiple cores and set the parallelism of the window operator to 3. And the result of the reduce function could be processed in parallel by subsequent operators, if that's what you want. But regardless of the parallelism, that part of your job will only have one DataStream (foo).

Please update your question if I've misinterpreted it and I'll try again.

0
kkrugler On

I think what you're really asking is whether you wind up with a KeyedStream following the reduce operation. If so, then the answer is no, you wind up with a regular DataStream.

Though it is possible via DataStreamUtils.reinterpretAsKeyedStream(DataStream, KeySelector) to cast it back to a KeyedStream, if you're careful about ensuring that you haven't changed the field value(s) used to do create the key for the window.