IllegalStateException - Sink.asPublisher only supports one subscribe - when using WebSockets

113 views Asked by At

In production, our application generated the following stacktrace. There's none of our own code here - not really much to go on.

Can someone who understands the akka-stream library explain:

  1. What's happening here?
  2. Is this just an underlying bug in Play. We've not reproduced it in our fairly similar test environment so it's not practical to generate a test case as required by the PlayFramework GitHub.

Play version: 2.8.18 Akka version: 2.6.20

2023-11-06 09:20:05 GMT [ERROR] p.c.s.c.WebSocketFlowHandler [WebSocketFlowHandler.scala:249] - WebSocket flow threw exception
java.lang.IllegalStateException: Sink.asPublisher(fanout = false) only supports one subscriber (which is allowed, see reactive-streams specification, rule 1.11)
        at akka.stream.impl.ReactiveStreamsCompliance$.rejectAdditionalSubscriber(ReactiveStreamsCompliance.scala:62)
        at akka.stream.impl.VirtualPublisher.rec$6(StreamLayout.scala:481)
        at akka.stream.impl.VirtualPublisher.subscribe(StreamLayout.scala:486)
        at akka.stream.impl.fusing.ActorGraphInterpreter$BatchingActorInputBoundary.preStart(ActorGraphInterpreter.scala:148)
        at akka.stream.impl.fusing.GraphInterpreter.init(GraphInterpreter.scala:306)
        at akka.stream.impl.fusing.GraphInterpreterShell.init(ActorGraphInterpreter.scala:619)
        at akka.stream.impl.fusing.ActorGraphInterpreter.tryInit(ActorGraphInterpreter.scala:727)
        at akka.stream.impl.fusing.ActorGraphInterpreter.finishShellRegistration(ActorGraphInterpreter.scala:770)
        at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$shortCircuitBatch(ActorGraphInterpreter.scala:788)
        at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:819)
1

There are 1 answers

2
James Roper On

Without seeing your code (ie, what you're trying to achieve), it's hard to know what's going wrong. But what it indicates is that somewhere, you've taken a Publisher produced by Sink.asPublisher, and tried to subscribe to it twice. This is not possible, since the publisher returned by Sink.asPublisher represents one running instance of a stream, and there's no way to restart that stream for a second subscriber if you subscribe a second time.

Conceptually thinking, when you receive a WebSocket connection, and you want to consume the incoming stream of messages, you can supply a sink to do that. Sink.asPublisher turns that stream into a Reactive Streams publisher that you can consume messages from with other reactive streams libraries. When you subscribe to it, the supplied subscriber will be connected to the existing WebSocket connection. But if you try to subscribe from that publisher a second time, what can it do? The WebSocket server can't go back to the WebSocket client and say "hey, can you connect to me again so I can have another stream of messages to send to this new subscriber?" No, it can only fail, saying that you can't subscribe twice.