Why does Dart StreamController.addStream not work expectedly when joining streams

2.9k views Asked by At

Given the following Dart code snippet:

Stream stream1 = new Stream.periodic(new Duration(seconds: 1), (n) => n)
                           .take(10)
                           .asBroadcastStream();
stream1.listen((n) => print("stream1 : $n"),
               onError : (err) => print("stream1 : $err"),
               onDone : () => print("stream1 : done"),
               cancelOnError : false);

Stream stream2 = stream1.where((n) => n % 2 == 0).take(2);
stream2.listen((n) => print("stream2 : $n"),
               onError : (err) => print("stream2 : $err"),
               onDone : () => print("stream2 : done"),
               cancelOnError : false);

Stream stream3 = stream1.where((n) => n % 2 != 0).take(2);
stream3.listen((n) => print("stream3 : $n"),
               onError : (err) => print("stream3 : $err"),
               onDone : () => print("stream3 : done"),
               cancelOnError : false);

StreamController controller = new StreamController.broadcast();
controller.addStream(stream2)
  .then((_) => controller.addStream(stream3));
controller.stream.listen((n) => print("composite stream : $n"),
                         onError : (err) => print("composite stream : $err"),
                         onDone : () => print("composite stream : done"),
                         cancelOnError : false);

I get the following output:

stream1 : 0
stream2 : 0
composite stream : 0
stream1 : 1
stream3 : 1
composite stream : 1
stream1 : 2
stream1 : 3
stream1 : 4
stream1 : 5
stream1 : 6
stream1 : 7
stream1 : 8
stream1 : 9
stream1 : done
stream2 : done
stream3 : done

There are a couple of things I don't understand from this output:

  1. why is there only one entry each for stream2 and stream3 when there should be 2 of each? does the composite stream I created with the StreamController consume one of the events from stream2 and stream3? This behaviour seems odd to me, am I missing something?

  2. why does stream2 and stream3 only complete when stream1 complete? this is not the natural behaviour I'd expect when both are bounded, and contradicts with the behaviour of .take(10) on stream1. If I remove the .take(10) on stream1 then stream2 and stream3 in fact never completes.

If I modify controller to also add the source stream1 (see snippet and output below), then the stream2 and stream3 in fact completes at their natural positions when their 2 elements are up, but I then also get an exception because it tried to listen to one of the streams twice.

StreamController controller = new StreamController.broadcast();
controller.addStream(stream1)
  .then((_) => controller.addStream(stream2))
  .then((_) => controller.addStream(stream3));
controller.stream.listen((n) => print("composite stream : $n"),
                         onError : (err) => print("composite stream : $err"),
                         onDone : () => print("composite stream : done"),
                         cancelOnError : false);

stream1 : 0
stream2 : 0
composite stream : 0
stream1 : 1
stream3 : 1
composite stream : 1
stream1 : 2
stream2 : 2
stream2 : done
composite stream : 2
stream1 : 3
stream3 : 3
stream3 : done
composite stream : 3
stream1 : 4
composite stream : 4
stream1 : 5
composite stream : 5
stream1 : 6
composite stream : 6
stream1 : 7
composite stream : 7
stream1 : 8
composite stream : 8
stream1 : 9
stream1 : done
composite stream : 9
Uncaught Error: Bad state: Stream has already been listened to.
Stack Trace:
#0      _StreamController._subscribe (dart:async/stream_controller.dart:151:7)
#1      _ControllerStream._createSubscription (dart:async/stream_controller.dart:259:157)
#2      _StreamImpl.listen (dart:async/stream_impl.dart:260:58)
#3      _ForwardingStreamSubscription._ForwardingStreamSubscription (dart:async/stream_pipe.dart:53:43)
#4      _ForwardingStream._createSubscription (dart:async/stream_pipe.dart:35:16)
#5      _ForwardingStream.listen (dart:async/stream_pipe.dart:32:31)
#6      _AsBroadcastStream.listen (dart:async/stream_impl.dart:466:37)
#7      _ForwardingStreamSubscription._ForwardingStreamSubscription (dart:async/stream_pipe.dart:53:43)
#8      _ForwardingStream._createSubscription (dart:async/stream_pipe.dart:35:16)
#9      _ForwardingStream.listen (dart:async/stream_pipe.dart:32:31)
#10     _ForwardingStreamSubscription._ForwardingStreamSubscription (dart:async/stream_pipe.dart:53:43)
#11     _ForwardingStream._createSubscription (dart:async/stream_pipe.dart:35:16)
#12     _ForwardingStream.listen (dart:async/stream_pipe.dart:32:31)
#13     _AddStreamState._AddStreamState (dart:async/stream_controller.dart:300:133)
#14     _BroadcastStreamController.addStream (dart:async/broadcast_stream_controller.dart:140:27)
#15     main.<anonymous closure> (file:///C:/SourceCode/personal/SteamTest/lib/streamtest.dart:38:38)
#16     _ThenFuture._zonedSendValue (dart:async/future_impl.dart:371:24)
#17     _TransformFuture._sendValue.<anonymous closure> (dart:async/future_impl.dart:348:48)
#18     _ZoneBase._runInZone (dart:async/zone.dart:82:17)
#19     _ZoneBase._runUnguarded (dart:async/zone.dart:102:22)
#20     _ZoneBase.executeCallback (dart:async/zone.dart:58:23)
#21     _TransformFuture._sendValue (dart:async/future_impl.dart:348:26)
#22     _FutureImpl._setValueUnchecked (dart:async/future_impl.dart:184:26)
#23     _FutureImpl._asyncSetValue.<anonymous closure> (dart:async/future_impl.dart:218:25)
#24     _asyncRunCallback (dart:async/event_loop.dart:9:15)
#25     _createTimer.<anonymous closure> (dart:async-patch/timer_patch.dart:8:13)
#26     _Timer._createTimerHandler._handleTimeout (timer_impl.dart:95:21)
#27     _Timer._createTimerHandler.<anonymous closure> (timer_impl.dart:111:23)
#28     _ReceivePortImpl._handleMessage (dart:isolate-patch/isolate_patch.dart:81:92)


Unhandled exception:
Bad state: Stream has already been listened to.
#0      _DefaultZone.handleUncaughtError.<anonymous closure> (dart:async/zone.dart:146:7)
#1      _asyncRunCallback (dart:async/event_loop.dart:9:15)
#2      _asyncRunCallback (dart:async/event_loop.dart:13:7)
#3      _createTimer.<anonymous closure> (dart:async-patch/timer_patch.dart:8:13)
#4      _Timer._createTimerHandler._handleTimeout (timer_impl.dart:95:21)
#5      _Timer._createTimerHandler._handleTimeout (timer_impl.dart:103:7)
#6      _Timer._createTimerHandler._handleTimeout (timer_impl.dart:103:7)
#7      _Timer._createTimerHandler.<anonymous closure> (timer_impl.dart:111:23)
#8      _ReceivePortImpl._handleMessage (dart:isolate-patch/isolate_patch.dart:81:92)

Can someone help me make sense of what is going on here?

Thanks,

1

There are 1 answers

2
Krustie101 On

After debugging the dart code for your example, It seems to me this is a bug.

The take call generates a Take-stream that has a reference to a Where-stream that has a reference to the broadcastStream stream1. By the way, because stream1 is a broadcastStream, this also holds for the Where- and Take-stream (checked during debugging).

The counter for the number of events is a state variable of the Take-stream, but each time you subscribe to the Take-Stream (by calling listen), this creates a subscription to the Where-stream which in turns creates a subscription to stream1. In this case you have 2 subscriptions to stream2 (1 via listen, 1 via the controller) resulting in two 2 subscriptions to stream1 (so 4 in total because of stream3 but this is not important for the discussion).

As a consequence, when stream1 fires event 0, it passes twice through the Take-stream in stream2 decreasing the counter of the Take-stream twice and causing the onDone-event to pass to the second subscription only. Because the counter is decreased by 2 and it is a state variable of the Take-stream, the Take-Stream will not fire anymore when stream1 fires event 2.

So this looks like a bug to me. Two observations.

  1. Should the take-stream counter be a state variable in the subscription to the take-stream instead of the take-stream itself?

  2. Should every subscription to the Take-Stream result in a new subscription to the source stream? (see also ForwardingStream in the Dart code). This might depend on the broadcast-context.

Maybe they should have a further look at the Rx-code.