I have a question which is a variation of this question: Flink: how to store state and use in another stream?
I have two streams:
val ipStream: DataStream[IPAddress] = ???
val routeStream: DataStream[RoutingTable] = ???
I want to find out which route which package uses. Usually this can be done with:
val ip = IPAddress("10.10.10.10")
val table = RoutingTable(Seq("10.10.10.0/24", "5.5.5.0/24"))
val route = table.lookup(ip) // == "10.10.10.0/24"
The problem here is that I cannot really key the stream here since that requires both the complete table as well as the ip address (and keys must be computed isolated).
For every element from the ipStream
, I need the latest routeStream
element. Right now I'm using a hack that all of that is processed non-parallel:
ipStream
.connect(routeStream)
.keyBy(_ => 0, _ => 0)
.flatMap(new MyRichCoFlatMapFunction) // with ValueState[RoutingTable]
This sounds like the use case for a broadcast strategy. However, the routeStream will be updated and is not fixed in a file. The question remains: Is there a way to have two streams, one of which contains changing control data for the other stream?
Since I solved the issue, I might aswell write an answer here :)
I keyed the two streams like this:
This works under the condition that IP packages are getting generally routed in the net with the same /8 prefix, which can be assumed for most traffic.
Then, by having a stateful
RichCoFlatMap
one can build up the routing table state as key. When receiving a new IP package, do a lookup in the routing table. Now there are two possible scenarios:This way, we have two streams where one of them has changing control data for the other stream.