I faced an interesting problem during writing some unit-tests for my project.

Here is a map which user can use to place markers:

class DomainMap {
  static const _DEFAULT_COORDINATE = const Coordinate(40.73, -73.93);
  final ReverseGeocodingStrategy _geocodingStrategy;
  final RouteDefinitionStrategy _assemblyStrategy;
  final List<_IdentifiedCoordinate> _addressed = [];
  final List<Coordinate> _markers = [];
  final _Route _route = _Route();

  Coordinate get defaultCoordinate => _DEFAULT_COORDINATE;

  DomainMap(this._geocodingStrategy, this._assemblyStrategy);

  Stream<MarkersUpdateEvent> mark(Coordinate coordinate) async* {
    _markers.add(coordinate);
    yield _assembleMarkersUpdate();
    final Address address = await _geocodingStrategy.geocode(coordinate);
    _addressed.add(_IdentifiedCoordinate(coordinate, address));
    if (_addressed.length > 1) {
      final Iterable<Coordinate> assembledPolyline =
          await _assemblyStrategy.buildRoute(BuiltList(_addressed
              .map((identifiedCoordinate) => identifiedCoordinate.address)));
      assembledPolyline.forEach(_route.add);
      yield _assembleMarkersUpdate();
    }
  }

  MarkersUpdateEvent _assembleMarkersUpdate() =>
      MarkersUpdateEvent(BuiltList.from(_markers), _route.readOnly);
}

class _Route {
  final List<Coordinate> _points = [];

  Iterable<Coordinate> get readOnly => BuiltList(_points);

  void add(final Coordinate coordinate) => _points.add(coordinate);

  void addAll(final Iterable<Coordinate> coordinate) => _points.addAll(coordinate);
}

And here is a unit test for it that checks that on second mark here should be a route returned:

test("mark, assert that on second mark at first just markers update is published, and then the polyline update too", () async {
  final Coordinate secondCoordinate = plus(givenCoordinate, 1);
  final givenRoute = [
    givenCoordinate,
    minus(givenCoordinate, 1),
    plus(givenCoordinate, 1)
  ];
  when(geocodingStrategy.geocode(any)).thenAnswer((invocation) => Future.value(Address(invocation.positionalArguments[0].toString())));
  when(assemblyStrategy.buildRoute(any))
    .thenAnswer((_) => Future.value(givenRoute));
  final expectedFirstUpdate =
    MarkersUpdateEvent([givenCoordinate, secondCoordinate], []);
  final expectedSecondUpdate =
    MarkersUpdateEvent([givenCoordinate, secondCoordinate], givenRoute);
  final DomainMap map = domainMap();
  map.mark(givenCoordinate)
  //.forEach(print) //Important
  ;
  expect(map.mark(secondCoordinate),
    emitsInOrder([expectedFirstUpdate, expectedSecondUpdate]));
}, timeout: const Timeout(const Duration(seconds: 10)));

When I run it like that, then the test fails and says that the stream emitted only one value - an update event with only markers field not empty which contains only a secondCoordinate. But when I uncomment forEach, then the test passes.

As far as I understand - async* method is not invoked until stream's values won't get requested, so when forEach is invoked - function gets executed till the end. So if I request all the stream's (that was returned from the first call) values - the method get executed, markers list populated, and the second execution gets executed in the expected state.

Am I understanding async* semantics correctly? And is here a way to make this function eager instead of lazy (I don't want to request unneeded stream's values)?

1 Answers

2
lrn On Best Solutions

Yes, async* lazily invokes the function after you have called listen on the returned stream. If you never listen, then nothing happens. It even does it asynchronously, and not directly in response to the listen call.

So, if you definitely need something to happen, but only maybe need to look at the response, then you can't use an async* function to do the something.

What you probably want to do is to conditionally populate the stream, but only if the stream is actually listened to. That is an untraditional sequence of operations which doesn't match async* or even async semantics. You have to be prepared for the operation to complete, and then the stream is listened to later. This suggests splitting the operation into two parts, one async for the request, and one async* for the response, and sharing the future between the two, which means listening to the same future twice, a distinctively non-async behavior.

I'd recommend splitting the stream behavior out and using a StreamController for that.

Stream<MarkersUpdateEvent> mark(Coordinate coordinate) {
  var result = StreamController<MarkersUpdateEvent>();
  () async {
    _markers.add(coordinate);
    result.add(_assembleMarkersUpdate());
    final Address address = await _geocodingStrategy.geocode(coordinate);
    _addressed.add(_IdentifiedCoordinate(coordinate, address));
    if (_addressed.length > 1) {
      final Iterable<Coordinate> assembledPolyline =
          await _assemblyStrategy.buildRoute(BuiltList(_addressed
              .map((identifiedCoordinate) => identifiedCoordinate.address)));
      assembledPolyline.forEach(_route.add);
      result.add(_assembleMarkersUpdate());
    }
    result.close();
  }().catchError(result.addError);
  return result.stream;
}

This way the program logic runs independently of whether anyone listens to the stream. You still do buffer all the stream events. There is no real way to avoid that, unless you can compute them at a later time because you can't know when someone might listen to the returned stream. It doesn't have to happen immediately when it's returned.