ReplaySubject with distinct elements

1.2k views Asked by At

I want to implement an EventBus with RxJava and I need sticky events. I know I can use a BehaviorSubject but it caches only the last emitted item while I want to cache all events that are distinct by their type(class name). There is another option - ReplaySubject however it has an overhead - it holds all the emitted elements. Is there a way to create some kind of a ReplaySubject which would hold only unique by type elements?

2

There are 2 answers

2
Bob Dalgleish On

I don't believe that there are any clean solutions. However, you might be able to make this work.

  1. Create a BehaviorSubject<> for each event type. Use a ConcurrentMap to dispatch each incoming event to the correct Subject.
  2. Maintain a list of those subjects in the order of arrival of events.
  3. When a new subscription is received, an observable will be created that is the merge of all the subjects, with the first list of subjects that have already received events, followed by those that have not in any order.

This is some code that might clarify the above. Untested.

// The subscription operation will perform a merge of the two lists
Map<EventType, BehaviorSubject<Event>> map = new ConcurrentHashMap<>();
List<BehaviorSubject<Event>> listOfUnseenEvents = new ArrayList<>();
List<BehaviorSubject<Event>> listOfSeenEvents = new ArrayList<>();
// ...
listOfUnseenEvents = map.values().asList();

public Observable<Event> busSub() {
  List<BehaviorSubject<Event>> allEvents = new ArrayList<>();
  synchronized ( map ) {
    allEvents.addAll( listOfSeenEvents );
    allEvents.addAll( listOfUnseenEvents );
  }
  return Observable.merge( allEvents );
}

// receive an event and dispatch it
eventSource
  .subscribe( event -> processEvent( event ) );

public void processEvent( Event event ) {
    BehaviorSubject<Event> eSubject = map.get( event.getEventType() );
    synchronized ( map ) {
      if ( containsEventType( listOfSeenEvents, event.getEventType() ) ) {
        removeEventType( listOfSeenEvents, event.getEventType() );
      } else {
        removeEventType( listOfUnseenEvents, event.getEventType() );
      }
      listOfSeenEvents.add( eSubject );
    }
    eSubject.onNext( event );
}

Please note that this code takes advantage of the fact that merge() will subscribe to each of the given observables in order. There is no guarantee of such in the RxJava documentation.

If all the event types are not known in advance, then this won't work.

0
Neige On

I do no succeed to solve this issue with only one stream.
So I have one stream by event type, and only expose a merged stream.

open class Event(val name: String)
class Event1(name: String) : Event(name)
class Event2(name: String) : Event(name)

val event1Subject = BehaviorSubject.create<Event1>()
val event2Subject = BehaviorSubject.create<Event2>()

event1Subject.onNext(Event1("event1 a"))
event1Subject.onNext(Event1("event1 b"))
event2Subject.onNext(Event2("event2 a"))
event2Subject.onNext(Event2("event2 b"))

Observable.merge(event1Subject, event2Subject)
        .subscribe { println(it.name) }

Console ouput :

event1 b
event2 b