Hi! I'm trying to get my first RxPY project done, but I'm having some problems
undestanding the behavior of flat_map in Python.
In this project there is an Observable created from a generator (a Kafka consumer). It emits values when a message is received, then performs a query based on the message, and emits a value for each result.
I made some changes to the code in order to make it easier to reproduce. Kafka consumer was replaced by a generator that takes a lot of time between emissions, and query results were replaced by an Observable that emits 3 values. Behavior is still the same.
from rx import Observable
generator = (i for i in range(100000000) if i == 0 or i == 50000000)
Observable.from_(generator) \
.flat_map(lambda i: Observable.from_(['a', 'b', 'c'])) \
.subscribe(on_next=lambda i: print(i))
Output:
a
(...waits a long time...)
b
a
(...waits a long time...)
c
b
c
I was expecting something like this:
a
b
c
(...waits a long time...)
a
b
c
What is the reason for this behavior? What should I do to get the expected result?
Thank you! :)
Recentlty came across same issue with flat_map operator and ImmediateScheduler helped here.
Initial code updated a little bit for RxPy 3:
Output differs a little bit but issue is the same:
Applied ImmediateScheduler for the observable inside flat_map:
And got the expected result: