How do RSocket issue lease to multiple clients?

155 views Asked by At

I could create a server lease to a single client as follows:

    @Slf4j
    public class LeaseServer {
    
        private static final String SERVER_TAG = "server";
    
        public static void main(String[] args) throws InterruptedException {
            // Queue for incoming messages represented as Flux
            // Imagine that every fireAndForget that is pushed is processed by a worker
            int queueCapacity = 50;
            BlockingQueue<String> messagesQueue = new ArrayBlockingQueue<>(queueCapacity);
            // emulating a worker that process data from the queue
            Thread workerThread =
                    new Thread(
                            () -> {
                                try {
                                    while (!Thread.currentThread().isInterrupted()) {
                                        String message = messagesQueue.take();
                                        System.out.println("consume message:" + message);
                                        Thread.sleep(100000); // emulating processing
                                    }
                                } catch (InterruptedException e) {
                                    throw new RuntimeException(e);
                                }
                            });
            workerThread.start();
            CloseableChannel server = getFireAndForgetServer(messagesQueue, workerThread);
            TimeUnit.MINUTES.sleep(10);
            server.dispose();
        }
    
        private static CloseableChannel getFireAndForgetServer(BlockingQueue<String> messagesQueue, Thread workerThread) {
            CloseableChannel server =
                    RSocketServer.create((setup, sendingSocket) ->
                            Mono.just(new RSocket() {
                                @Override
                                public Mono<Void> fireAndForget(Payload payload) {
                                    // add element. if overflows errors and terminates execution
                                    // specifically to show that lease can limit rate of fnf requests in
                                    // that example
                                    try {
                                        if (!messagesQueue.offer(payload.getDataUtf8())) {
                                            System.out.println("Queue has been overflowed. Terminating execution");
                                            sendingSocket.dispose();
                                            workerThread.interrupt();
                                        }
                                    } finally {
                                        payload.release();
                                    }
                                    return Mono.empty();
                                }
                            }))
                            .lease(() -> Leases.create().sender(new LeaseCalculator(SERVER_TAG, messagesQueue)))
                            .bindNow(TcpServerTransport.create("localhost", 7000));
            return server;
        }
    }

But how do I issue a lease to multiple clients connected to that server?

Otherwise my queue will be written multiple times by multiple clients, resulting in an overflow of the service.

I can't find the details in the public documents and materials.

Your help was very much appreciated.

0

There are 0 answers