This is a Cask webserver for a project that uses Monix Observables. The Observables have the following requirements,
- Doesn't produce any data until it has at least one subscription
- Shares its result with all its derivatives instead of rerunning its code for each one.
- Stops producing data when all of the subscriptions are closed
- But will restart once a new subscription is created
I believe the following solution handles (1) - (3) of the requirements,
// Class that fetches data
val someObservable = Observable.interval(5.second).map { _ =>
scribe.info("SS Warm Observable")
// Fetches data and yields a result
}.share.publish.refCount
def getSomeObservable(name: String): Observable[Long] = {
someObservable.map { _(name) }
}
val nodeToObservable: PartialFunction[Node, Observable[StatusUpdate]] = {
case Node(name, SomeThing, _) => someCase(name).distinctUntilChanged
case Node(name, someObs(Some(_)), _) =>
getSomeObservable(name).map(longToStatus).distinctUntilChanged.map(status => StatusUpdate(name, status, GraphType.Node(Edges.nodeNamesToOwnedEdgeNames(name))))
}
The following code exists in the webserver class,
// Webserver
val observables = MyProject.nodes.collect(nodeToObservable)
@websocket("/connect")
def wsHandler(): WebsocketResult = {
WsHandler { channel =>
scribe.info("Beginning Subscriptions")
val cancellables = observables.map(_.foreach(nodeStatus =>
channel.send(Ws.Text(nodeStatus.toJson))
))
WsActor {
case Ws.ChannelClosed() =>
scribe.info("Unsubscribing observers")
cancellables.foreach(_.cancel())
}
}
Everything operates as expected, I can access the webpage locally once run. However, the following only gets logged once in the terminal (on initial page load),
SS Warm Observable
The problem comes in when I refresh the webpage and when I close then re-open it. In both of these cases, I expect the above to be logged again given that the following operators are chained share.publish.refCount as shown in the first code snippet, but it is not.
If cancellables.foreach(_.cancel()) unsubscribes any/all observers from the observable then why do they not get resubscribed (indicated by SS Warm Observable in the logs) whenever a page is refreshed or re-opened?
Any insight into why this may be the case is appreciated. Also, is this the correct way of implementing a "warm" Observable?