Observable.Generate in RxJava?

1.3k views Asked by At

.NET Reactive Extensions has a neat method to generate sequences using corecursion which is called Observable.Generate.

Is there analogues method in RxJava that allows data generation via corecursion? If not, could it be implemented based on existing methods?

1

There are 1 answers

0
akarnokd On BEST ANSWER

It's not an exact match but we have SyncOnSubscribe (and AsyncOnSubscriber) that can generate values, for example:

@Test
public void testRange() {
    final int start = 1;
    final int count = 4000;
    OnSubscribe<Integer> os = SyncOnSubscribe.createStateful(new Func0<Integer>(){
        @Override
        public Integer call() {
            return start;
        }}, 
        new Func2<Integer, Observer<? super Integer>, Integer>() {
            @Override
            public Integer call(Integer state, Observer<? super Integer> subscriber) {
                subscriber.onNext(state);
                if (state == count) {
                    subscriber.onCompleted();
                }
                return state + 1;
            }
        });

    @SuppressWarnings("unchecked")
    Observer<Object> o = mock(Observer.class);
    InOrder inOrder = inOrder(o);

    Observable.create(os).subscribe(o);

    verify(o, never()).onError(any(TestException.class));
    inOrder.verify(o, times(count)).onNext(any(Integer.class));
    inOrder.verify(o).onCompleted();
    inOrder.verifyNoMoreInteractions();
}