How to tell RSocket to read data stream by Java 8 Stream which backed by Blocking queue

189 views Asked by At

I have the following scenario whereby my program is using blocking queue to process message asynchronously. There are multiple RSocket clients who wish to receive this message. My design is such a way that when a message arrives in the blocking queue, the stream that binds to the Flux will emit. I have tried to implement this requirement as below, but the client doesn't receive any response. However, I could see Stream supplier getting triggered correctly.

Can someone pls help.

  @MessageMapping("addListenerHook")
public Flux<QueryResult> addListenerHook(String clientName){
    System.out.println("Adding Listener:"+clientName);
    BlockingQueue<QueryResult> listenerQ = new LinkedBlockingQueue<>();
    Datalistener.register(clientName,listenerQ);

    return Flux.fromStream(
            ()-> Stream.generate(()->streamValue(listenerQ))).map(q->{
        System.out.println("I got an event : "+q.getResult());
        return q;
    });
}


private QueryResult streamValue(BlockingQueue<QueryResult> inStream){
    try{
        return inStream.take();
    }catch(Exception e){
        return null;
    }
}
1

There are 1 answers

0
Yuri Schimke On BEST ANSWER

This is tough to solve simply and cleanly because of the blocking API. I think this is why there aren't simple bridge APIs here to help you implement this. You should come up with a clean solution to turn the BlockingQueue into a Flux first. Then the spring-boot part becomes a non-event.

This is why the correct solution is probably involving a custom BlockingQueue implementation like ObservableQueue in https://www.nurkiewicz.com/2015/07/consuming-javautilconcurrentblockingque.html

A alternative approach is in How can I create reactor Flux from a blocking queue?

If you need to retain the LinkedBlockingQueue, a starting solution might be something like the following.

  val f = flux<String> {
    val listenerQ = LinkedBlockingQueue<QueryResult>()
    Datalistener.register(clientName,listenerQ);
    
    while (true) {
      send(bq.take())
    }
  }.subscribeOn(Schedulers.elastic())

With an API like flux you should definitely avoid any side effects before the subscribe, so don't register your listener until inside the body of the method. But you will need to improve this example to handle cancellation, or however you cancel the listener and interrupt the thread doing the take.