SpringBoot to Flutter Stream using application/x-ndjson

643 views Asked by At

I am trying to get a Flutter app to process a NDJSON stream properly, but cannot get it to work. I have a SpringBoot server that the Flutter app requests a stream from.

SpringBoot side:

@GetMapping(value = "/streaming", produces = {
//            MediaType.APPLICATION_JSON_VALUE,
            MediaType.APPLICATION_NDJSON_VALUE
    })
    public Flux<String> getItemsStream(){
        FluxSinkImpl<String> fluxSinkConsumer = new FluxSinkImpl<>();
        try {
            logger.info("streaming New Stream!");
            synchronized (fluxSinkConsumers) {
                fluxSinkConsumers.add(fluxSinkConsumer);
            }
            return Flux.create(fluxSinkConsumer).doOnNext(s -> {
                logger.info("streaming doOnNext ["+s+"]");
            }).doFinally(signalType -> {
                logger.info("streaming Done ["+signalType+"]");
                synchronized (fluxSinkConsumers) {
                    fluxSinkConsumers.remove(fluxSinkConsumer);
                }
            });
        } catch (Exception e) {
            logger.error("Failed to register Stream",e);
            throw e;
        }
    }
private synchronized void publishEvent(String jsonStr) {
        Set<FluxSinkImpl> fluxSinkConsumersCopy;
        synchronized (fluxSinkConsumers) {
            fluxSinkConsumersCopy = new HashSet<>(fluxSinkConsumers);
        }
        fluxSinkConsumersCopy.forEach(fluxSink -> {
            try {
                logger.info("streaming publishEvent");
                fluxSink.publishEvent(jsonStr);
            } catch (Throwable t) {
                logger.info("streaming Failed to publish");
                t.printStackTrace();
                synchronized (fluxSinkConsumers) {
                    fluxSinkConsumers.remove(fluxSink);
                }
            }
        });
    }

The flutter side:

void startStreamListener2() async {
    try {
      final client = new http.Client();
      http.Request request = http.Request("GET", Uri.parse(host+'streaming'));
      request.headers['Content-type'] = 'application/x-ndjson';
      request.headers['Accept'] = 'application/x-ndjson';
      Future<http.StreamedResponse> streamedResponseFuture = client.send(request);
      Stream<http.StreamedResponse> asStream = streamedResponseFuture.asStream();
      print('startStreamListener1 [${host+'streaming'}]');
      final StreamController<http.StreamedResponse> controller2 = new StreamController<http.StreamedResponse>();
      asStream.asyncMap((event) {
        event.stream.listen((value) {
          print('startStreamListener4 listen [${utf8.decode(value)}]');
        });
      });
      controller2.stream.listen((http.StreamedResponse event) {
        event.stream.listen((value) {
          print('startStreamListener3 listen [${utf8.decode(value)}]');
        });
      });
      StreamSubscription<http.StreamedResponse> listen = asStream.listen((http.StreamedResponse event) {
        event.stream.listen((value) {
          print('startStreamListener2 listen [${utf8.decode(value)}]');
        });
      });
      listen.onDone(() {
        print('startStreamListener2 Done');
      });
      listen.onError((error) {
        print('startStreamListener2 Error[$error] runtimeType[${error.runtimeType}]');
        if (error is ClientException) {
          print('ClientException [${(error as ClientException).message}]');
        }
      });
    } catch (error) {
      print('startStreamListener error [$error]');
    }
  }

When I use a Browser to connect directly to the stream, it works just fine. I see pushed messages as they are generated. The Stream is supposed to be open for a long time with asynchronous messages being pushed towards listeners (Flutter in this case). Flutter does register, but does not trigger onData on single message. It does register the SpringBoot server restarting.

0

There are 0 answers