I'm using the Spring support for RSocket, specifically the request-stream model. I.e.:
@MessageMapping("stream")
Flux<SubscriptionMessage> stream(final SubscriptionMessage request, @AuthenticationPrincipal UserDetails user) {
//....
}
If I understand RSocket correctly, the Flux response is delivered back to the client as a series of Payload messages under a request(n) convention. E.g. n payload messages at a time. After each series of messages, the client credits the server to send an additional set using a REQUEST_N message, and this provides the basis for back-pressure relief.
In the API of the java library (org.springframework.messaging.rsocket, which is based on io.rsocket.RSocket), is there any way to handle/visit the REQUEST_N messages as they arrive, or to set the value of N explicitly via the Strategy (and see the value that a requestor has passed to a server)?
Reason/Why: I'm implementing an rsocket facade over Kafka, and am attempting to provide a request-stream mechanism for subscriptions that will support automatic offset commit as the messages are consumed. For the request-stream convention, I think the occasional REQUEST_N interaction by the requestor to be the ideal point where the commit offset of the topic can be advanced, since it's transmission by the requestor means that the preceding Payload messages sent by the responder have been received.
The only other option I've seen is to use the request-channel model, so that the requestor can send and initial subscription request, and beging receiving data, but also send period messages to specifically control the commit offset over the same channel. I'm considering providing that anyway, but wanted to know if there was a way to inject logic into the periodic request(n) cycle of a stream.
The default of reactor used to be infinite or cancel. You can use the built in operators like take or limitRate to customise the request n behaviours.
https://github.com/making/rsc/blob/50d0c3dc43c60d3b3fe42e5586df49adb016a9cb/src/main/java/am/ik/rsocket/InteractionModel.java#L41-L52
Or implement your own operator, or use custom subscribe/subscription logic to control this precisely. This is a lot more involved so try it out first and show an example of what you are trying to do.
See docs like https://projectreactor.io/docs/core/release/reference/#_on_backpressure_and_ways_to_reshape_requests