How to synchronize datastreams in broadcastProcessFunction flink?

54 views Asked by At

im using flink 1.81.1 api on java 11 and im trying to use a BroadcastProcessFunction to filter a Products Datastream with a brand autorized Datastream as broadcast.

So my first products Datastream contains different products that has a field brand and my second brands Datastream contains only brands that should be allowed .

The problem is that when my products comes to processElement of the BroadcastProcessFunction , the brandState is not yet full of the brands Datastream records, for example i have 4800 brands in my brands DataStream but when the products goes to processElement, the brandState only contains few of them (like 200 brands) , and this is causing problems because i have products which will not be allowed because their brands are not uploaded yet in the brandState

Here is my BroadcastProcessFunction

public class GateCoProcess extends BroadcastProcessFunction<CrawlData, Brand, CrawlData> {
    private final MapStateDescriptor<String, Boolean> broadcastStateDescriptor;


    public GateCoProcess(MapStateDescriptor<String, Boolean> broadcastStateDescriptor) {
        this.broadcastStateDescriptor = broadcastStateDescriptor;

    }
    @Override
    public void processElement(CrawlData value, ReadOnlyContext ctx, Collector<CrawlData> out) throws Exception {
        ReadOnlyBroadcastState<String, Boolean> brandState = ctx.getBroadcastState(broadcastStateDescriptor);

        if (brandState.contains(value.data.product.brand)) {
            out.collect(value);
        }
    }
    @Override
    public void processBroadcastElement(Brand brand, Context ctx, Collector<CrawlData> out) throws Exception {
        BroadcastState<String, Boolean> brandState = ctx.getBroadcastState(broadcastStateDescriptor);
        if (brand.active) {
            brandState.put(brand.getName(), true);
        }
    }
}

and here is my Datastreams and call of the function

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Brand> brands = env.fromSource(KafkaSources.brandsSource, WatermarkStrategy.noWatermarks(), "gatebrand-cdc-records");

MapStateDescriptor<String, Boolean> broadcastStateDescriptor = new MapStateDescriptor<>(
                "broadcastState",
                BasicTypeInfo.STRING_TYPE_INFO,
                BasicTypeInfo.BOOLEAN_TYPE_INFO);

BroadcastStream<Brand> broadcastStream = brands.broadcast(broadcastStateDescriptor);

// integration is the products Datastream
DataStream<CrawlData> integration = ExtractData.extractProducts(env);

DataStream<CrawlData> filtered = integration.connect(broadcastStream).process(new  GateCoProcess(broadcastStateDescriptor));

env.execute("mon job de products");


What should i do to get around this problem ? thanks

i tried using watermarks but with no result , my classes are not with timestamps

1

There are 1 answers

2
kkrugler On

This is the cold-start problem, where you can't control the order in which records from different streams reach an operator.

One simple approach is to add a --coldstart flag to your workflow. This assumes that you are correctly saving the brands data in your broadcast state.

When set, you use a dummy source (no data generated) for your products DataStream. Start the workflow, and wait for all of your brands stream to get stored in state. Then stop the workflow with a savepoint, and re-start it from that savepoint without the --coldstart flag.

Another approach, which can be a bit harder and less deterministic, is that when your broadcast function gets a product record that don't have a matching brand (which means it would get filtered), you save it in state with a timer, so that you wait some amount of time just in case there's a late-arriving brand that would mean it shouldn't get filtered. If that happens, you emit it (don't filter it) and clear it from state. When the timer fires, you purge that record from state, which means you're assuming there's going to be no matching brand, and thus it should get filtered.