Flink Streaming: Data stream that gets controlled by control stream

562 views Asked by At

I have a question which is a variation of this question: Flink: how to store state and use in another stream?

I have two streams:

  1. val ipStream: DataStream[IPAddress] = ???
  2. val routeStream: DataStream[RoutingTable] = ???

I want to find out which route which package uses. Usually this can be done with:

val ip = IPAddress("10.10.10.10")
val table = RoutingTable(Seq("10.10.10.0/24", "5.5.5.0/24"))
val route = table.lookup(ip) // == "10.10.10.0/24"

The problem here is that I cannot really key the stream here since that requires both the complete table as well as the ip address (and keys must be computed isolated).

For every element from the ipStream, I need the latest routeStream element. Right now I'm using a hack that all of that is processed non-parallel:

ipStream
  .connect(routeStream)
  .keyBy(_ => 0, _ => 0)
  .flatMap(new MyRichCoFlatMapFunction) // with ValueState[RoutingTable]

This sounds like the use case for a broadcast strategy. However, the routeStream will be updated and is not fixed in a file. The question remains: Is there a way to have two streams, one of which contains changing control data for the other stream?

1

There are 1 answers

0
Jonas Gröger On BEST ANSWER

Since I solved the issue, I might aswell write an answer here :)

I keyed the two streams like this:

  1. The RoutingTable stream was keyed using the first byte of the network route
  2. The IPAddress was also keyed by the first byte of the address

This works under the condition that IP packages are getting generally routed in the net with the same /8 prefix, which can be assumed for most traffic.

Then, by having a stateful RichCoFlatMap one can build up the routing table state as key. When receiving a new IP package, do a lookup in the routing table. Now there are two possible scenarios:

  1. No matching route has been found. We could store the package for later here but discarding it works aswell.
  2. If a route has been found, output the tuple of [IPAddress, RoutingTableEntry].

This way, we have two streams where one of them has changing control data for the other stream.