How to create RSocket request endpoints without using Spring Library in Java?

448 views Asked by At

I want to create RSocket request endpoints using plan Java RSocket SDK and without using Spring Framework. I was able to create a sample request with the help of the below code snippet. This works only for the request tcp://localhost:7000. But I want to create different endpoints similar to this.

public class ServerExample {
    public static void main(String[] args) {
        Hooks.onErrorDropped(e -> {});
        RSocket handler = new RSocket() {
            @Override
            public Mono<Payload> requestResponse(Payload payload) {
                System.out.println("RequestResponse: " + payload.getDataUtf8());
                return Mono.just(payload);
            }

            @Override
            public Flux<Payload> requestStream(Payload payload) {
                System.out.println("RequestStream: " + payload.getDataUtf8());
                return Flux.just("First", "Second").map(DefaultPayload::create);
            }

            @Override
            public Mono<Void> fireAndForget(Payload payload) {
                System.out.println("FireAndForget: " + payload.getDataUtf8());
                return Mono.empty();
            }
        };
        RSocketServer.create(SocketAcceptor.with(handler))
                .bindNow(TcpServerTransport.create("localhost", 7000))
                .onClose()
                .doOnSubscribe(subscription -> System.out.println("RSocket Server listen on tcp://localhost:7000"))
                .block();
    }
}
1

There are 1 answers

1
tmarwen On

You can leverage the RSocket protocol defined metadata and more explicitly the Routing extension metadata.

Your server implementation would be updated to respond based on the particular request routing metadata value dynamically switching responders:

public class ServerExample {

    public static void main(String[] args) {
        Hooks.onErrorDropped(e -> {
        });
        final String userRoute = "/user"; // this defines one of the route values
        final String organizationRoute = "/organization"; // this defines another route value
        RSocket handler = new RSocket() {
            @Override
            public Mono<Payload> requestResponse(Payload payload) {
                final String route;
                if (payload.hasMetadata()) { // check if you have compulsory metadata
                    route = new RoutingMetadata(payload.metadata().slice()).iterator().next(); // read the routing metadata value
                } else {
                    throw new IllegalStateException();
                }
                switch (route) { // based on the route value, you can respond accordingly
                    case userRoute: {
                        System.out.println("RequestResponse for route " + route + ": " + payload.getDataUtf8());
                        return Mono.just(DefaultPayload.create("Echo for: User"));
                    }
                    case organizationRoute: {
                        System.out.println("RequestResponse for route " + route + ": " + payload.getDataUtf8());
                        return Mono.just(DefaultPayload.create("Echo for: Organization"));
                    }
                    default: return Mono.just(DefaultPayload.create("Unsupported route"));
                }
            }

            @Override
            public Flux<Payload> requestStream(Payload payload) {
                System.out.println("RequestStream: " + payload.getDataUtf8());
                return Flux.just("First", "Second").map(DefaultPayload::create);
            }

            @Override
            public Mono<Void> fireAndForget(Payload payload) {
                System.out.println("FireAndForget: " + payload.getDataUtf8());
                return Mono.empty();
            }
        };
        RSocketServer.create(SocketAcceptor.with(handler))
                .bindNow(TcpServerTransport.create("localhost", 7000))
                .onClose()
                .doOnSubscribe(subscription -> System.out.println("RSocket Server listen on tcp://localhost:7000"))
                .block();
    }
}

Within the client implementation, you need to provide the encoded routing metadata as follows:

public class ClientExample {

    public static void main(String[] args) {
        Mono<RSocket> source =
                RSocketConnector.create()
                        .reconnect(Retry.backoff(50, Duration.ofMillis(500)))
                        .connect(TcpClientTransport.create("localhost", 7000));
        // User route request
        ByteBuf userRouteMetadata = TaggingMetadataCodec.createRoutingMetadata(
                        UnpooledByteBufAllocator.DEFAULT,
                        Collections.singletonList("/user")
                )
                .getContent();

        RSocketClient.from(source)
                .requestResponse(Mono.just(DefaultPayload.create(Unpooled.buffer().writeBytes("Requesting user resource".getBytes()), userRouteMetadata)))
                .doOnSubscribe(s -> logger.info("Executing Request"))
                .doOnNext(
                        d -> {
                            logger.info("Received response data {}", d.getDataUtf8());
                            d.release();
                        })
                .repeat(5)
                .blockLast();

        // Organization route request
        ByteBuf organizationRouteMetadata = TaggingMetadataCodec.createRoutingMetadata(
                        UnpooledByteBufAllocator.DEFAULT,
                        Collections.singletonList("/organization")
                )
                .getContent();
        RSocketClient.from(source)
                .requestResponse(Mono.just(DefaultPayload.create(Unpooled.buffer().writeBytes("Requesting organization resource".getBytes()), organizationRouteMetadata)))
                .doOnSubscribe(s -> logger.info("Executing Request"))
                .doOnNext(
                        d -> {
                            logger.info("Received response data {}", d.getDataUtf8());
                            d.release();
                        })
                .repeat(5)
                .blockLast();
    }
}

As you can note from this sample, the implementation is fairly complex and will grow in complexity as your implementation gathers more requirements. Special attention must be paid to proper ByteBuf manipulation as this can badly hurt your application memory and you can easily leak one of these by retaining a reference hence the need to rely on a solid implementation as the one provided by Spring.