Rx Python - subscription to subject in main thread but on_next in background thread

205 views Asked by At

I am trying to use python reactivex (version 4.2) to move data generated in a background thread back into the main thread. So far I have the following code:

from reactivex import operators as ops
from reactivex.scheduler import CurrentThreadScheduler
from reactivex import Subject
import threading
from threading import Thread

class MyThread(Thread):
    def __init__(self, callback):
        Thread.__init__(self)
        self.callback = callback

    def run(self):
        self.callback("hello")

my_subject = Subject()

def callback(data):
    # this is called in a separate thread
    print(f"In callback: {threading.current_thread().name}")
    my_subject.on_next(data)

if __name__ == "__main__":

    thread_to_execute_on = CurrentThreadScheduler()

    print(f"Before stream: {threading.current_thread().name}")

    background_thread = MyThread(callback=callback)
    my_subject.pipe(
        ops.observe_on(thread_to_execute_on)
    ).subscribe(
       lambda x: print(f"In subscription: {threading.current_thread().name}")
    )
    
    background_thread.start()
    input()

Here is the current output:

Before stream: MainThread
In callback: Dummy-7
In subscription: Dummy-7

I want to achieve the following output:

Before stream: MainThread
In callback: Dummy-7
In subscription: MainThread

Just wondering where I am going wrong here?

UPDATE:

It seems like CurrentThreadScheduler is not doing what I expect. I modified the line to:

thread_to_execute_on = ThreadPoolScheduler(max_workers=1)

And the output is:

Before stream: MainThread
In callback: Thread-7, data: hello
In subscription: ThreadPoolExecutor-0_0

How do I schedule work on the main thread?

1

There are 1 answers

0
lopisan On

In order to observe something on the MainThread, you need to renounce the control of that thread. E.g. by using asyncio loop like this:

import asyncio

from reactivex import operators as ops
from reactivex import Subject
import threading
from threading import Thread

from reactivex.scheduler.eventloop import AsyncIOScheduler


class MyThread(Thread):
    def __init__(self, callback):
        Thread.__init__(self)
        self.callback = callback

    def run(self):
        self.callback("hello")


my_subject = Subject()


def callback(data):
    # this is called in a separate thread
    print(f"In callback: {threading.current_thread().name}")
    my_subject.on_next(data)


if __name__ == "__main__":
    loop = asyncio.new_event_loop()
    thread_to_execute_on = AsyncIOScheduler(loop=loop)

    print(f"Before stream: {threading.current_thread().name}")

    background_thread = MyThread(callback=callback)

    my_subject.pipe(
        ops.observe_on(thread_to_execute_on)
    ).subscribe(
        lambda x: print(f"In subscription2: {threading.current_thread().name}")
    )

    background_thread.start()
    loop.run_forever()