Using Otto for communication between threads: Can it cause any problems?

1.9k views Asked by At

I've tried out Otto in my latest Android project, and it really does simplify communication between Objects a ton. However, I'm not certain if there can be any hidden problems from communicating between Threads with it.

This is what I did, created a SingletonBus using an enum so that the Bus is accessible anywhere:

public enum SingletonBus {
    INSTANCE;

    private static String TAG = SingletonBus.class.getSimpleName();

    private Bus bus;

    private boolean paused;

    private final Vector<Object> eventQueueBuffer = new Vector<>();

    private Handler handler = new Handler(Looper.getMainLooper());

    private SingletonBus() {
        this.bus = new Bus(ThreadEnforcer.ANY);
    }

    public <T> void postToSameThread(final T event) {
        bus.post(event);
    }

    public <T> void postToMainThread(final T event) {
        try {
            if(paused) {
                eventQueueBuffer.add(event);
            } else {
                handler.post(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            bus.post(event);
                        } catch(Exception e) {
                            Log.e(TAG, "POST TO MAIN THREAD: BUS LEVEL");
                            throw e;
                        }
                    }
                });
            }
        } catch(Exception e) {
            Log.e(TAG, "POST TO MAIN THREAD: HANDLER LEVEL");
            throw e;
        }
    }

    public <T> void register(T subscriber) {
        bus.register(subscriber);
    }

    public <T> void unregister(T subscriber) {
        bus.unregister(subscriber);
    }

    public boolean isPaused() {
        return paused;
    }

    public void setPaused(boolean paused) {
        this.paused = paused;
        if(!paused) {
            Iterator<Object> eventIterator = eventQueueBuffer.iterator();
            while(eventIterator.hasNext()) {
                Object event = eventIterator.next();
                postToMainThread(event);
                eventIterator.remove();
            }
        }
    }
}

Then I created an Event which can contain the result of an operation (I'm not subclassing anything yet but creating an Event for each operation, but I'll try refactoring it when it'll get messy if it'll be necessary):

public class KeyPairCreatedEvent {
    private KeyPair keyPair;

    public KeyPairCreatedEvent(KeyPair keyPair) {
        this.keyPair = keyPair;
    }

    public KeyPair getKeyPair() {
        return keyPair;
    }
}

And then I created and posted this event:

@Subscribe
public void keyPairCreate(KeyPairCreateEvent keyPairCreateEvent) {
    Thread thread = new Thread(new Runnable() {
        @Override
        public void run() {
            try {
                KeyPair keyPair = keyPairCreator.createKeyPair();
                SingletonBus.INSTANCE.getBus().post(new KeyPairCreatedEvent(keyPair));
            } catch(...){...}
        }
    });
    thread.start();
}

And subscribed an event to get the result when the keypair was created:

@Subscribe
public void keyPairCreated(KeyPairCreatedEvent keyPairCreatedEvent) {
    Log.d(MainActivity.class.getName(), "Acquired keypair: " + keyPairCreatedEvent.getKeyPair());
    //do stuff
}

My question is, it seems to be working, but can there be any hidden errors from using Otto with ThreadEnforcer.ANY to communicate between threads? Is there anything wrong with this approach?

2

There are 2 answers

1
sergej shafarenka On BEST ANSWER

Otto dispatches events synchronously in the same thread, in which they were posted. ThreadEnforcer is there just to verify, that you call post() method from the expected thread. ThreadEnforcer.MAIN asserts that you post() from main thread only. If you use ThreadEnforcer.MAIN and post() from a background thread, then bus will throw a runtime exception warning you from doing wrong things. With ThreadEnforcer.ANY no checks are basically done. You are allowed to post() from any thread, but, as I already said, you have to expect subscribers to be called from any thread too.

Applied to your code it means KeyPairCreatedEvent will be posted from a background and keyPairCreated(KeyPairCreatedEvent) subscriber will also be called in that background thread. If two threads (background and main) work on same data, then you have to synchronize, otherwise it could lead to inconsistencies. If you want to have your result delivered in main thread (to avoid synchronization), then you need to use Handler.post() and call bus.post() from there.

Alternatively, you can try out TinyBus, which uses same interfaces as Otto, but calls subscribers in main thread even when events were posted from a background thread.

Hope this helps.

1
Wil On

the dispatch queue in Otto is wrapped with ThreadLocal, so there would be no problem if you have considered concurrency when dealing with received events in your subscriber method.