RxPY - flat_map emissions waits for next generator value

1k views Asked by At

Hi! I'm trying to get my first RxPY project done, but I'm having some problems
undestanding the behavior of flat_map in Python.

In this project there is an Observable created from a generator (a Kafka consumer). It emits values when a message is received, then performs a query based on the message, and emits a value for each result.

I made some changes to the code in order to make it easier to reproduce. Kafka consumer was replaced by a generator that takes a lot of time between emissions, and query results were replaced by an Observable that emits 3 values. Behavior is still the same.

from rx import Observable

generator = (i for i in range(100000000) if i == 0 or i == 50000000)
Observable.from_(generator) \
    .flat_map(lambda i: Observable.from_(['a', 'b', 'c'])) \
    .subscribe(on_next=lambda i: print(i))

Output:

a
(...waits a long time...)
b
a
(...waits a long time...)
c
b
c

I was expecting something like this:

a
b
c
(...waits a long time...)
a
b
c

What is the reason for this behavior? What should I do to get the expected result?

Thank you! :)

1

There are 1 answers

0
Dmytro Severnyuk On

Recentlty came across same issue with flat_map operator and ImmediateScheduler helped here.

Initial code updated a little bit for RxPy 3:

import rx
from rx.operators import flat_map


generator = (i for i in range(100000000) if i == 0 or i == 50000000)
rx.from_(generator).pipe(
    flat_map(
        lambda i: rx.from_(['a', 'b', 'c'])
    )
).subscribe(on_next=lambda i: print(i))

Output differs a little bit but issue is the same:

(... waits a long time ...)
a
b
c
a
b
c

Applied ImmediateScheduler for the observable inside flat_map:

import rx
from rx.operators import flat_map
from rx.scheduler import ImmediateScheduler


generator = (i for i in range(100000000) if i == 0 or i == 50000000)
rx.from_(generator).pipe(
    flat_map(
        lambda i: rx.from_(['a', 'b', 'c'], scheduler=ImmediateScheduler())
    )
).subscribe(on_next=lambda i: print(i))

And got the expected result:

a
b
c
(...waits a long time...)
a
b
c