rxpy composing observables efficiently

1.1k views Asked by At

Intro: Hello. I am exploring the python rxpy library for my use case - where I am building an execution pipeline using the reactive programming concepts. This way I expect I would not have to manipulate too many states. Though my solution seems to be functional, but I am having trouble trying to compose a new Observable from other Observables.

The problem is that the way I am composing my observables is causing some expensive calculations to be repeated twice. For performance, I really want to prevent triggering expensive calculations.

I am very new the reactive programming. Trying to scratch my head and have looked through internet resources and reference documentation - seems a little too terse for me to grasp. Please advice.

Following is a toy example which illustrates what I am doing:

import rx
from rx import operators as op
from rx.subject import Subject

root = Subject()

foo = root.pipe(
        op.map( lambda x : x + 1 ),
        op.do_action(lambda r: print("foo(x) = %s (expensive)" % str(r)))
    )

bar_foo = foo.pipe(
        op.map( lambda x : x * 2 ),
        op.do_action(lambda r: print("bar(foo(x)) = %s" % str(r)))
    )

bar_foo.pipe(
        op.zip(foo),
        op.map(lambda i: i[0]+i[1]),
        op.do_action(lambda r: print("foo(x) + bar(foo(x)) = %s" % str(r)))
    ).subscribe()


print("-------------")
root.on_next(10)
print("-------------")

Output:

-------------
foo(x) = 11 (expensive)
bar(foo(x)) = 22
foo(x) = 11 (expensive)
foo(x) + bar(foo(x)) = 33
-------------

You could think of foo() and bar() to be expensive and complex operations. I first build an observable foo. Then compose a new observable bar_foo that incorporates foo. Later both are zipped together to calculate the final result foo(x)+bar(foo(x)).

Question:

  1. What can I do to prevent foo() from getting triggered more than once for a single input? I have really strong reasons to keep foo() and bar() separate. Also I also do not want to explicitly memoize foo().

  2. Anyone with experience using rxpy in production could share their experiences. Will using rxpy lead to better performance or slowdowns as compared to an equivalent hand crafted (but unmaintainable) code?

1

There are 1 answers

0
bheijden On

Adding op.share() right after the expensive calculation in the foo pipeline could be useful here. So changing the foo pipeline to:

foo = root.pipe(
        op.map( lambda x : x + 1 ),
        op.do_action(lambda r: print("foo(x) = %s (expensive)" % str(r))),
        op.share() # added to pipeline
    )

will result in:

-------------
foo(x) = 11 (expensive)
bar(foo(x)) = 22
foo(x) + bar(foo(x)) = 33
-------------

I believe that .share() makes the emitted events of the expensive operation being shared among downstream subscribers, so that the result of a single expensive calculation can be used multiple times.

Regarding your second question; I am new to RxPy as well, so interested in the answer of more experienced users. Until now, I've noticed that as a beginner you can easily create (bad) pipelines where messages and calculations are repeated in the background. .share() seems to reduce this to some extend, but not sure about what is happening in the background.