Does Spark structured streaming support chained flatMapGroupsWithState by different key?

18 views Asked by At

Here, by chained, I mean does Spark structured streaming support something like below:

val ds1 = dataset.groupByKey(r => r.Key1)
    .flatMapGroupsWithState(OutputMode.Append(), GroupStateTimeout.ProcessingTimeTimeout())(...)
val ds2 = ds1.groupByKey(r => r.Key2)
    .flatMapGroupsWithState(OutputMode.Append(), GroupStateTimeout.ProcessingTimeTimeout())(...)

I asked this question because when I try to do this, I got following error:

Detected pattern of possible 'correctness' issue due to global watermark. The query contains stateful operation which can emit rows older than the current watermark plus allowed late record delay, which are "late rows" in downstream stateful operations and these rows can be discarded. Please refer the programming guide doc for more details. If you understand the possible risk of correctness issue and still need to run the query, you can disable this check by setting the config spark.sql.streaming.statefulOperator.checkCorrectness.enabled to false.

And after I disable the check, Spark app runs, but the second state function has no inputs. Which is not expected because the first state function has output when I removed the second state function.

0

There are 0 answers