I want to implement an EventBus with RxJava and I need sticky events. I know I can use a BehaviorSubject but it caches only the last emitted item while I want to cache all events that are distinct by their type(class name). There is another option - ReplaySubject however it has an overhead - it holds all the emitted elements. Is there a way to create some kind of a ReplaySubject which would hold only unique by type elements?
ReplaySubject with distinct elements
1.2k views Asked by Buckstabue At
2
There are 2 answers
0
On
I do no succeed to solve this issue with only one stream.
So I have one stream by event type, and only expose a merged stream.
open class Event(val name: String)
class Event1(name: String) : Event(name)
class Event2(name: String) : Event(name)
val event1Subject = BehaviorSubject.create<Event1>()
val event2Subject = BehaviorSubject.create<Event2>()
event1Subject.onNext(Event1("event1 a"))
event1Subject.onNext(Event1("event1 b"))
event2Subject.onNext(Event2("event2 a"))
event2Subject.onNext(Event2("event2 b"))
Observable.merge(event1Subject, event2Subject)
.subscribe { println(it.name) }
Console ouput :
event1 b
event2 b
I don't believe that there are any clean solutions. However, you might be able to make this work.
BehaviorSubject<>
for each event type. Use aConcurrentMap
to dispatch each incoming event to the correctSubject
.This is some code that might clarify the above. Untested.
Please note that this code takes advantage of the fact that
merge()
will subscribe to each of the given observables in order. There is no guarantee of such in the RxJava documentation.If all the event types are not known in advance, then this won't work.