Why is Flow created on ConflatedBroadcastChannel only able to receive last element?

947 views Asked by At

The following code only prints 10000 i.e. only the last element

val channel = BroadcastChannel<Int>(Channel.CONFLATED)
val flowJob = channel.asFlow().buffer(Channel.UNLIMITED).onEach {
    println(it)
}.launchIn(GlobalScope)

for (i in 0..100) {
    channel.offer(i*i)
}
flowJob.join()

Code can be ran in the playground.

But since the Flow is launched in separate dispatching thread, and value is sent to the Channel and since Flow has an unlimited buffer, it should receive each element till onEach is invoked. But why only the last element is able to get received?

Is this the expected behavior or some bug? If its expected behavior how would somebody try to push only the newest elements to the flow, but all the flow that has certain buffer can receive the element.

3

There are 3 answers

5
Adam Arold On

The problem here is the Channel.CONFLATED. Taken from the docs:

Channel that buffers at most one element and conflates all subsequent `send` and `offer` invocations,
so that the receiver always gets the most recently sent element.
Back-to-send sent elements are _conflated_ -- only the the most recently sent element is received,
while previously sent elements **are lost**.
Sender to this channel never suspends and [offer] always returns `true`.

This channel is created by `Channel(Channel.CONFLATED)` factory function invocation.

This implementation is fully lock-free.

so this is why you only get the most recent (last) element. I'd use an UNLIMITED Channel instead:

val channel = Channel<Int>(Channel.UNLIMITED)
val flowJob = channel.consumeAsFlow().onEach {
    println(it)
}.launchIn(GlobalScope)

for (i in 0..100) {
    channel.offer(i*i)
}
flowJob.join()
0
Róbert Nagy On

As some of the comments stated, using Channel.CONFLATED will store only the last value, and you are offering to the channel, even if your flow has a buffer.

Also join() will suspend until the Job is not complete, in your case infinitely, that's why you needed the timeout.

 val channel = Channel<Int>(Channel.RENDEZVOUS)
 val flowJob = channel.consumeAsFlow().onEach {
     println(it)
 }.launchIn(GlobalScope)

GlobalScope.launch{
    for (i in 0..100) {
        channel.send(i * i)
    }
    channel.close()
}
flowJob.join()

Check out this solution (playground link), with the Channel.RENDEZVOUS your channel will accept new elements only if the others are already consumed. This is why we have to use send instead of offer, send suspends until it can send elements, while offer returns a boolean indicating if send was succesfull. At last, we have to close the channel, in order for join() not to suspend until eternity.

0
Xarybdis On

Actually, this is about the "Conflate" way of buffering. For buffering a flow you have a couple of ways such as using buffer() method or collectLatest() or conflate(). Each of them has their own way to buffer. So conflate() method's way is that when the flow emits values, it tries to collect but when the collector is too slow, then conflate() skips the intermediate values for the sake of the flow. And it's doing it even tho every time it's emitted in a separate coroutine. So in a channel, a similar thing is happening basically.

Here is the official doc explanation:

When a flow represents partial results of the operation or operation status updates, it may not be necessary to process each value, but instead, only most recent ones. In this case, the conflate operator can be used to skip intermediate values when a collector is too slow to process them.

Check out this link.

The explanation is for flow but you need to focus on the feature that you are using. And in this case, conflation is same for channel and flow.