RxJava Observable

1.3k views Asked by At

I am trying to work on my first RxJava example

I have a main activity with a textbox and three buttons. The first button initializes an integer in a separate class. The second button subscribes to an observable that is suppose to be observing the integer. The third button decreases the value of the integer by one.

Here is my code

package com.someurl.www.myobservable;

import android.support.v7.app.ActionBarActivity;
import android.os.Bundle;
import android.view.Menu;
import android.view.MenuItem;
import android.view.View;
import android.widget.Button;
import android.widget.TextView;

import rx.Observable;
import rx.Subscriber;
import rx.android.schedulers.AndroidSchedulers;
import rx.functions.Action1;
import rx.schedulers.Schedulers;


public class MainActivity extends ActionBarActivity {

    TextView tvDisplay;

    Button btnInitialze;
    Button btnSubscribeClass;
    Button btnChangeInt;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        tvDisplay = (TextView)findViewById(R.id.textView);

        btnInitialze = (Button)findViewById(R.id.btnInitialize);
        btnSubscribeClass = (Button)findViewById(R.id.btnSubscribeClass);
        btnChangeInt = (Button)findViewById(R.id.btnChangeInt);



        btnInitialze.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {
                myObserve.InitializeBigInt(6);
            }
        });

        btnSubscribeClass.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {
                //subClassNow();
                subJust();
            }
        });

        btnChangeInt.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {
                int myNewInt = myObserve.DecreaseBigInt();
                tvDisplay.append("\nFrom Button " + String.valueOf(myNewInt));
            }
        });

    }



    @Override
    public boolean onCreateOptionsMenu(Menu menu) {
        // Inflate the menu; this adds items to the action bar if it is present.
        getMenuInflater().inflate(R.menu.menu_main, menu);
        return true;
    }

    @Override
    public boolean onOptionsItemSelected(MenuItem item) {
        // Handle action bar item clicks here. The action bar will
        // automatically handle clicks on the Home/Up button, so long
        // as you specify a parent activity in AndroidManifest.xml.
        int id = item.getItemId();

        //noinspection SimplifiableIfStatement
        if (id == R.id.action_settings) {
            return true;
        }

        return super.onOptionsItemSelected(item);
    }

    Observable<String> mObservable = Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            subscriber.onNext("Hello, World!");
            subscriber.onCompleted();
        }
    });

    public void subNow() {
         mObservable.subscribe(new Subscriber<String>() {

            @Override
            public void onCompleted() {
                tvDisplay.append("\nDone!");

            }

            @Override
            public void onError(Throwable e) {
                tvDisplay.append("Error! \n" + e.getMessage());
            }

            @Override
            public void onNext(String s) {
                tvDisplay.append(s);
            }
        });
    }

    private void subClassNow() {

        myObserve.mObservableClass.subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                tvDisplay.append("\nClass Done!");
            }

            @Override
            public void onError(Throwable e) {
                tvDisplay.append("Class Error! \n" + e.getMessage());
            }

            @Override
            public void onNext(Integer myInt) {
                tvDisplay.append("\nClass " + String.valueOf(myInt));
            }

        });
    }

    private void subJust() {
        myObserve.newObservableClass.subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                tvDisplay.append("\nClass Done!");
            }

            @Override
            public void onError(Throwable e) {
                tvDisplay.append("Class Error! \n" + e.getMessage());
            }

            @Override
            public void onNext(Integer myInt) {
                tvDisplay.append("\nClass " + String.valueOf(myInt));
            }
        });
    }
}

And the class

package com.someurl.www.myobservable;

import rx.Observable;
import rx.Subscriber;

/**
 * Created by Admin on 6/21/15.
 */
public class myObserve {
    static Integer myBigInt;

    public static Observable<Integer> mObservableClass = Observable.create(new Observable.OnSubscribe<Integer>() {
        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            subscriber.onNext(myBigInt);
            //subscriber.onCompleted();
        }

    });

    public static void InitializeBigInt(Integer myInt){
        myBigInt = myInt;
    }

    public static Integer DecreaseBigInt(){
        myBigInt --;
        return myBigInt;
    }

    public static Observable<Integer> newObservableClass = Observable.just(myBigInt);

}

When I tried to subscribe using mObservableClass it just gives me the value of myBigInt (which was 6) and then it gives me onComplete Done!

Then I tried using newObservableClass thinking that I need to use, just instead of .create, but then I get a return of null for myBigInt and then again the onComplete Done!

Can someone help get me started in the right direction on how to observe the value changing of myBigInt. Ideally, I would like to watch the value of myBigInt until it gets decreased to zero (0) then at zero call onComplete Done!

Thanks, John

1

There are 1 answers

0
akarnokd On BEST ANSWER

Generally, you'd want to use BehaviorSubject to store your value and convey changes to it. However, you seem to need atomic decrement capability as well. Try this:

public class AtomicBehaviorSubject {
    private static final AtomicInteger value = new AtomicInteger();
    private static final Subject<Integer, Integer> setter = 
            BehaviorSubject.<Integer>create().toSerialized();

    public static void setValue(int newValue) {
        value.set(newValue);
        setter.onNext(newValue);
    }

    public static void decrementValue() {
        for (;;) {
            int curr = value.get();
            if (curr == 0) {
                return;
            }
            int u = curr - 1;
            if (value.compareAndSet(curr, u)) {
                if (u == 0) {
                    setter.onCompleted();
                } else {
                    setter.onNext(u);
                }
                return;
            }
        }
    }

    public static Observable<Integer> valueChanged() {
        return setter;
    }
}