Python reactive programming conditional executions

205 views Asked by At

I started learning about reactive programming in Python using the RxPy module. Now, I have a case where I would like to run different functions depending on the received elements. I've achieved it in a very straightforward approach

from rx.subject import Subject
from rx import operators as ops

def do_one():
    print('exec one')

def do_two():
    print('exec two')

subject = Subject()
subject.pipe(
    ops.filter(lambda action: action == 'one')
).subscribe(lambda x: do_one())

subject.pipe(
    ops.filter(lambda action: action == 'two')
).subscribe(lambda x: do_two())


subject.on_next('one')
subject.on_next('two')

To me this seems a bit ugly, however, I couldn't quite find any case, switch or other method in operations/Observables to trigger the different executions depending on the element received.

Is there a better way of doing this?

1

There are 1 answers

0
JBB On

This solution might be hacky, but you can use it in a few different ways:

from rx.subject import Subject
from rx import operators as ops

def do_one():
    print('exec one')

def do_two():
    print('exec two')

subject = Subject()
subject.subscribe(lambda x: globals()[x]())

subject.on_next('do_one')
subject.on_next('do_two')

produces

exec one
exec two

i've also used something similar where the class was unknown at the time of call... like this:

import Workers  # a module in my app with different workers

Wrkr = getattr(Workers, wrkr_type)  # in your case, x == wrkr_type
w = Wrkr(...)

so when it comes time for you to move some of those functions into different files or even modules, you can still achieve the same solution... generally speaking, of course, you don't want to use globals() ;)