When I unsubscribe the last Sub and sub again, I expect the Sink/Flux to work or throw an Exception.
See my Testcode:
void playground() {
Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();
var disposer = sink.asFlux().subscribe(s -> System.out.println("1: "+s));
System.out.println("CurrentSubs:" + sink.currentSubscriberCount());
sink.tryEmitNext("Test1");
var disposer2 = sink.asFlux().subscribe(s -> System.out.println("2: "+s));
System.out.println("CurrentSubs:" + sink.currentSubscriberCount());
sink.tryEmitNext("Test2");
disposer.dispose();
System.out.println("CurrentSubs:" + sink.currentSubscriberCount());
sink.tryEmitNext("Test3");
disposer2.dispose();
System.out.println("CurrentSubs:" + sink.currentSubscriberCount());
var disposer3 = sink.asFlux().subscribe(s -> System.out.println("3: "+s));
System.out.println("CurrentSubs:" + sink.currentSubscriberCount());
sink.tryEmitNext("Test4");
disposer3.dispose();
System.out.println("CurrentSubs:" + sink.currentSubscriberCount());
}
And the Output:
1: Test1
CurrentSubs:2
1: Test2
2: Test2
CurrentSubs:1
2: Test3
CurrentSubs:0
CurrentSubs:0
CurrentSubs:0
This behaviour seems weird to me, I would at least expect an Exception when I try to sub again and fail.
Is there any reason behind this behaviour.
I think this is kind of a gray area and not documented well. The
onBackpressureBuffer()Sinks.Manydoes by defaultautoCancel, so once the subscriber count drops to zero, the operator stops emitting values to new subscribers. Since it is now in cancelled state, all new subscriptions will immediately complete without emitting a value right upon subscription. Therefore the following is trueIn reactor, cancellation per se is not an error. Therefore, it is kind of OKish when you don't get an exception when subscribing to an
autoCancelledSinks.Many.What bothers me more is that even though the
SinkwasautoCanceled, it still accepts emissions viatryEmitNext. So following istrueeven after theautoCancellation took place.So the API will let you keep emitting values even though the
Sinkis pretty much dead. Only after you fill the internalqueuein theSinkyou getEmitResult.FAIL_OVERFLOWresult. TheSink"knew" it has beenautoCanceled and will still let you emit values which the new subscribers will never see... I can imagine a scenario where this could lead to an intermittent memory leak or such.Anyway, if you want your example to work more intuitively just disable the
autoCancelbehavior ofonBackpressureBuffer()like so