Linked Questions

Popular Questions

I have a multiplexed-ish application protocol I have implemented pipeline stages for processing into Kotlin data classes representing each type of message in the protocol, in both directions. There is no formal request-response pattern; the client and server may send each other messages at any point in the socket's lifespan.

In the protocol, there is a constraint that for any connected client, messages are handled serially. That is, when the server receives a message from a client, it cannot handle the next message from the client until the prior one is fully handled. The same rule applies on the client handling messages from the server. This is more or less how channel pipelines are handled within Netty; a given pipeline stage can only handle one thing at a time. This is easy enough to encode by having a blocking pipeline stage on a new executor group.

However, I want to write the handlers for each type of message the client may send in the form of suspending functions, as the range of things my server may do in response to a client message is very wide and may involve contacting other servers or databases, and a group of message types may also lock portions of server state internally during processing to exclude other concurrent connected clients. So I need a way of bridging into a coroutine context from the tail end of the Channel pipeline, such that the pipeline stage is "blocked" from continuing to process messages, but does not prevent the processing of other channels' pipelines at this stage, and without creating a single thread event executor per socket.

I have thought to use a separate dispatcher and event loop from Netty to represent the processing of each client session, using an actor to represent each, such that received messages are sent to the actor's mailbox channel and a coroutine processes them in order received, and I can cancel the actor and on its cancellation close the SocketChannel and vice versa. But this is rather ugly and doesn't feel in the spirit of Netty.

tl;dr I am wondering, what is the best way to implement a pipeline stage that integrates a suspending function cleanly with its executor group?

Related Questions