return a value from observable via rxpy

1.6k views Asked by At

what's the elegant way to convert a rx.Observable object to a 'normal' object in a function?

e.g.:

def foo():
    return rx.Observable.just('value').subscribe(<some magic here>)

>>> print(foo())

# expected:
# value
# however get:
# <rx.disposables.anonymousdisposable.AnonymousDisposable at SOME ADDRESS>

I understand that return of subscribe is a disposable object, and a 'ugly' way to implement this is:

class Foo:
    def __init__(self):
        self.buffer = None

    def call_kernel(self):
        rx.Observable.just('value').subscribe(lambda v: self.buffer = v)

    def __call__(self):
        self.call_kernel()
        return self.buffer
>>> Foo()

# get:
# value

Is there any better way to do this?

Thanks.

1

There are 1 answers

0
concat On BEST ANSWER

Take a look at Observable::to_blocking(): it creates a BlockingObservable which is coercible to a list containing all the emitted items. For your example:

def foo():
  return list(rx.Observable.just('value').to_blocking())[0]

I'd also like to point out your second solution is dangerous, because there is no guarantee exactly when values will be emitted, and your __call__ relies on 'value' to emit immediately.