Flux.create() not generating events

394 views Asked by At

I'm trying to use Flux to generate asynchronous server sent events using Flux.create. When my client connects the request eventually times out with no event ever received. I hard-coded in an event to be sent by the Flux.create just to see data flow, but still nothing received client side.

@GetMapping(path = "/stream", headers = "Accept=*/*", consumes = MediaType.ALL_VALUE, produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<PricingDTO>> getEventStream() {

  final Flux<ServerSentEvent<PricingDTO>> flux = Flux.<ServerSentEvent<PricingDTO>>create(emitter -> {
    final PricingDTO pricing = new PricingDTO();
    pricing.setId(99L);
    emitter.next(ServerSentEvent.builder(pricing).build());
  });

  return flux;
}

Client side (Angular) code:

const eventSource = new EventSource(url);
eventSource.onmessage = (event) => {
  console.debug('Received event: ' + event);
  const json = JSON.parse(event.data);
  // Should be PricingDTO record here
};

eventSource.onerror = (error) => {
  if (eventSource.readyState === EventSource.CLOSED) {
    console.log('The stream has been closed by the server.');
    eventSource.close();
  } else {
    console.log('Error here: ' + error);
  }
};

I never see an event come through the EventSource. Eventually the request times out and I see the error: net::ERR_EMPTY_RESPONSE

I'm new to using WebFlux and I suspect I'm missing some initialization on the FluxStream before I return the Flux result. I have debugged and do see the request being received by my web service and the Flux object being returned. Any idea why I'm not receiving my events?

1

There are 1 answers

0
p.streef On

Your webflux code seems fine. I tested this with the following simplified example (without your custom classes).

@SpringBootApplication
@RestController
public class App {

    public static void main(String[] args) {
        SpringApplication.run(App.class, args);
    }

    @GetMapping(path = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> getEventStream() {
        return Flux.create(emitter -> emitter.next("hi").next("hi2"));
    }
}

When connecting to the steam in chrome you get to see the events coming in just fine:

data:hi

data:hi2

the problem either lies in your accept header filter, or on the client side. You could ofcourse validate this by connecting to your stream in a browser (or better, a test)