Given the following Dart code snippet:
Stream stream1 = new Stream.periodic(new Duration(seconds: 1), (n) => n)
.take(10)
.asBroadcastStream();
stream1.listen((n) => print("stream1 : $n"),
onError : (err) => print("stream1 : $err"),
onDone : () => print("stream1 : done"),
cancelOnError : false);
Stream stream2 = stream1.where((n) => n % 2 == 0).take(2);
stream2.listen((n) => print("stream2 : $n"),
onError : (err) => print("stream2 : $err"),
onDone : () => print("stream2 : done"),
cancelOnError : false);
Stream stream3 = stream1.where((n) => n % 2 != 0).take(2);
stream3.listen((n) => print("stream3 : $n"),
onError : (err) => print("stream3 : $err"),
onDone : () => print("stream3 : done"),
cancelOnError : false);
StreamController controller = new StreamController.broadcast();
controller.addStream(stream2)
.then((_) => controller.addStream(stream3));
controller.stream.listen((n) => print("composite stream : $n"),
onError : (err) => print("composite stream : $err"),
onDone : () => print("composite stream : done"),
cancelOnError : false);
I get the following output:
stream1 : 0
stream2 : 0
composite stream : 0
stream1 : 1
stream3 : 1
composite stream : 1
stream1 : 2
stream1 : 3
stream1 : 4
stream1 : 5
stream1 : 6
stream1 : 7
stream1 : 8
stream1 : 9
stream1 : done
stream2 : done
stream3 : done
There are a couple of things I don't understand from this output:
why is there only one entry each for
stream2
andstream3
when there should be 2 of each? does the composite stream I created with theStreamController
consume one of the events fromstream2
andstream3
? This behaviour seems odd to me, am I missing something?why does
stream2
andstream3
only complete whenstream1
complete? this is not the natural behaviour I'd expect when both are bounded, and contradicts with the behaviour of.take(10)
onstream1
. If I remove the.take(10)
onstream1
thenstream2
andstream3
in fact never completes.
If I modify controller
to also add the source stream1
(see snippet and output below), then the stream2
and stream3
in fact completes at their natural positions when their 2 elements are up, but I then also get an exception because it tried to listen to one of the streams twice.
StreamController controller = new StreamController.broadcast();
controller.addStream(stream1)
.then((_) => controller.addStream(stream2))
.then((_) => controller.addStream(stream3));
controller.stream.listen((n) => print("composite stream : $n"),
onError : (err) => print("composite stream : $err"),
onDone : () => print("composite stream : done"),
cancelOnError : false);
stream1 : 0
stream2 : 0
composite stream : 0
stream1 : 1
stream3 : 1
composite stream : 1
stream1 : 2
stream2 : 2
stream2 : done
composite stream : 2
stream1 : 3
stream3 : 3
stream3 : done
composite stream : 3
stream1 : 4
composite stream : 4
stream1 : 5
composite stream : 5
stream1 : 6
composite stream : 6
stream1 : 7
composite stream : 7
stream1 : 8
composite stream : 8
stream1 : 9
stream1 : done
composite stream : 9
Uncaught Error: Bad state: Stream has already been listened to.
Stack Trace:
#0 _StreamController._subscribe (dart:async/stream_controller.dart:151:7)
#1 _ControllerStream._createSubscription (dart:async/stream_controller.dart:259:157)
#2 _StreamImpl.listen (dart:async/stream_impl.dart:260:58)
#3 _ForwardingStreamSubscription._ForwardingStreamSubscription (dart:async/stream_pipe.dart:53:43)
#4 _ForwardingStream._createSubscription (dart:async/stream_pipe.dart:35:16)
#5 _ForwardingStream.listen (dart:async/stream_pipe.dart:32:31)
#6 _AsBroadcastStream.listen (dart:async/stream_impl.dart:466:37)
#7 _ForwardingStreamSubscription._ForwardingStreamSubscription (dart:async/stream_pipe.dart:53:43)
#8 _ForwardingStream._createSubscription (dart:async/stream_pipe.dart:35:16)
#9 _ForwardingStream.listen (dart:async/stream_pipe.dart:32:31)
#10 _ForwardingStreamSubscription._ForwardingStreamSubscription (dart:async/stream_pipe.dart:53:43)
#11 _ForwardingStream._createSubscription (dart:async/stream_pipe.dart:35:16)
#12 _ForwardingStream.listen (dart:async/stream_pipe.dart:32:31)
#13 _AddStreamState._AddStreamState (dart:async/stream_controller.dart:300:133)
#14 _BroadcastStreamController.addStream (dart:async/broadcast_stream_controller.dart:140:27)
#15 main.<anonymous closure> (file:///C:/SourceCode/personal/SteamTest/lib/streamtest.dart:38:38)
#16 _ThenFuture._zonedSendValue (dart:async/future_impl.dart:371:24)
#17 _TransformFuture._sendValue.<anonymous closure> (dart:async/future_impl.dart:348:48)
#18 _ZoneBase._runInZone (dart:async/zone.dart:82:17)
#19 _ZoneBase._runUnguarded (dart:async/zone.dart:102:22)
#20 _ZoneBase.executeCallback (dart:async/zone.dart:58:23)
#21 _TransformFuture._sendValue (dart:async/future_impl.dart:348:26)
#22 _FutureImpl._setValueUnchecked (dart:async/future_impl.dart:184:26)
#23 _FutureImpl._asyncSetValue.<anonymous closure> (dart:async/future_impl.dart:218:25)
#24 _asyncRunCallback (dart:async/event_loop.dart:9:15)
#25 _createTimer.<anonymous closure> (dart:async-patch/timer_patch.dart:8:13)
#26 _Timer._createTimerHandler._handleTimeout (timer_impl.dart:95:21)
#27 _Timer._createTimerHandler.<anonymous closure> (timer_impl.dart:111:23)
#28 _ReceivePortImpl._handleMessage (dart:isolate-patch/isolate_patch.dart:81:92)
Unhandled exception:
Bad state: Stream has already been listened to.
#0 _DefaultZone.handleUncaughtError.<anonymous closure> (dart:async/zone.dart:146:7)
#1 _asyncRunCallback (dart:async/event_loop.dart:9:15)
#2 _asyncRunCallback (dart:async/event_loop.dart:13:7)
#3 _createTimer.<anonymous closure> (dart:async-patch/timer_patch.dart:8:13)
#4 _Timer._createTimerHandler._handleTimeout (timer_impl.dart:95:21)
#5 _Timer._createTimerHandler._handleTimeout (timer_impl.dart:103:7)
#6 _Timer._createTimerHandler._handleTimeout (timer_impl.dart:103:7)
#7 _Timer._createTimerHandler.<anonymous closure> (timer_impl.dart:111:23)
#8 _ReceivePortImpl._handleMessage (dart:isolate-patch/isolate_patch.dart:81:92)
Can someone help me make sense of what is going on here?
Thanks,
After debugging the dart code for your example, It seems to me this is a bug.
The take call generates a Take-stream that has a reference to a Where-stream that has a reference to the broadcastStream stream1. By the way, because stream1 is a broadcastStream, this also holds for the Where- and Take-stream (checked during debugging).
The counter for the number of events is a state variable of the Take-stream, but each time you subscribe to the Take-Stream (by calling listen), this creates a subscription to the Where-stream which in turns creates a subscription to stream1. In this case you have 2 subscriptions to stream2 (1 via listen, 1 via the controller) resulting in two 2 subscriptions to stream1 (so 4 in total because of stream3 but this is not important for the discussion).
As a consequence, when stream1 fires event 0, it passes twice through the Take-stream in stream2 decreasing the counter of the Take-stream twice and causing the onDone-event to pass to the second subscription only. Because the counter is decreased by 2 and it is a state variable of the Take-stream, the Take-Stream will not fire anymore when stream1 fires event 2.
So this looks like a bug to me. Two observations.
Should the take-stream counter be a state variable in the subscription to the take-stream instead of the take-stream itself?
Should every subscription to the Take-Stream result in a new subscription to the source stream? (see also ForwardingStream in the Dart code). This might depend on the broadcast-context.
Maybe they should have a further look at the Rx-code.