Creating a warm Observable and dealing with unexpected behavior - Monix, Scala, Cask

35 views Asked by At

This is a Cask webserver for a project that uses Monix Observables. The Observables have the following requirements,

  1. Doesn't produce any data until it has at least one subscription
  2. Shares its result with all its derivatives instead of rerunning its code for each one.
  3. Stops producing data when all of the subscriptions are closed
  4. But will restart once a new subscription is created

I believe the following solution handles (1) - (3) of the requirements,

// Class that fetches data
val someObservable = Observable.interval(5.second).map { _ =>
    scribe.info("SS Warm Observable")
    // Fetches data and yields a result
}.share.publish.refCount

def getSomeObservable(name: String): Observable[Long] = {
    someObservable.map { _(name) }
}

val nodeToObservable: PartialFunction[Node, Observable[StatusUpdate]] = {
    case Node(name, SomeThing, _) => someCase(name).distinctUntilChanged
    case Node(name, someObs(Some(_)), _) => 
        getSomeObservable(name).map(longToStatus).distinctUntilChanged.map(status => StatusUpdate(name, status, GraphType.Node(Edges.nodeNamesToOwnedEdgeNames(name))))
}

The following code exists in the webserver class,

  // Webserver 
  val observables = MyProject.nodes.collect(nodeToObservable)

  @websocket("/connect")
  def wsHandler(): WebsocketResult = {
    WsHandler { channel =>
      scribe.info("Beginning Subscriptions")
      val cancellables = observables.map(_.foreach(nodeStatus => 
        channel.send(Ws.Text(nodeStatus.toJson))
      ))
      WsActor {
        case Ws.ChannelClosed() =>
          scribe.info("Unsubscribing observers")
          cancellables.foreach(_.cancel())
      }
    }

Everything operates as expected, I can access the webpage locally once run. However, the following only gets logged once in the terminal (on initial page load),

SS Warm Observable

The problem comes in when I refresh the webpage and when I close then re-open it. In both of these cases, I expect the above to be logged again given that the following operators are chained share.publish.refCount as shown in the first code snippet, but it is not.

If cancellables.foreach(_.cancel()) unsubscribes any/all observers from the observable then why do they not get resubscribed (indicated by SS Warm Observable in the logs) whenever a page is refreshed or re-opened?

Any insight into why this may be the case is appreciated. Also, is this the correct way of implementing a "warm" Observable?

0

There are 0 answers