RxJava - Is an operator a task or the whole chain a task?

614 views Asked by At

I'm writing some code to insert a record into a Sqlite database (if the table is empty). Before it inserts any data, it makes a web service call LoveToDo.basecampClient().fetchMe() to return some data.

I'm using SqlBrite for database access and Retrofit for web access. Here is my code:

    Observable.just(LoveToDo.briteDatabase())
        .map(new Func1<BriteDatabase, Integer>() {
            @Override
            public Integer call(BriteDatabase briteDatabase) {
                Cursor cursor = briteDatabase.query("SELECT * FROM Accounts");

                try {
                    return cursor.getCount();

                } finally {
                    cursor.close();
                }
            }
        })
        .flatMap(new Func1<Integer, Observable<Person>>() {
            @Override
            public Observable<Person> call(Integer count) {
                if ( count == 0 ) {
                    return LoveToDo.basecampClient().fetchMe();
                }

                return null;
            }
        })
        .map(new Func1<Person, Boolean>() {
            @Override
            public Boolean call(Person person) {
                if ( person == null ) return false;

                BriteDatabase database = LoveToDo.briteDatabase();

                long count = database.insert(Account.TABLE, new Account.Builder()
                    .accountId(Settings.accountId)
                    .userName(Settings.userName)
                    .password(Settings.password)
                    .agent(Settings.agent)
                    .personId(person.id)
                    .build()
                );

                return count > 0;
            }
        })

        .subscribeOn(Schedulers.io())
        .observeOn( Schedulers.io() )

        .subscribe();

Needless to say, I don't think that this is fantastic code. What I would like to do, is find out is how to transform this code into something good. So let's use it and pick at its horribleness.

First, should I combine database and web service call operations in one operator. For example:

    Observable.just(LoveToDo.briteDatabase())
        .flatMap(new Func1<BriteDatabase, Observable<Person>>() {
            @Override
            public Observable<Person> call(BriteDatabase briteDatabase) {
                Cursor cursor = briteDatabase.query("SELECT * FROM Accounts");

                int count;
                try {
                    count = cursor.getCount();

                } finally {
                    cursor.close();
                }

                if ( count == 0 ) {
                    return LoveToDo.basecampClient().fetchMe();
                }

                return null;
            }
        })
        .map(new Func1<Person, Boolean>() {
            @Override
            public Boolean call(Person person) {
                if ( person == null ) return false;

                BriteDatabase database = LoveToDo.briteDatabase();

                long count = database.insert(Account.TABLE, new Account.Builder()
                        .accountId(Settings.accountId)
                        .userName(Settings.userName)
                        .password(Settings.password)
                        .agent(Settings.agent)
                        .personId(person.id)
                        .build()
                );

                return count > 0;
            }
        })

        .subscribeOn(Schedulers.io())
        .observeOn(Schedulers.io())

        .subscribe();

Or is there a good reason to keep such operations isolated in the chain?

The second thing that bugs me is this is a background operation - no user interface will be updated directly as a result of this code. That's why there's a parameterless subscribe() function call. But what happens when there's an exception? Would that mean I'd have to do something like the following?

        .subscribe(new Action1<Boolean>() {
            @Override
            public void call(Boolean aBoolean) {
                // Do nothing
            }
        }, new Action1<Throwable>() {
            @Override
            public void call(Throwable throwable) {
                // Do something with the exception
            }
        });

By the way, do I need the subscribeOn when observeOn is set to a background thread?

Thirdly, the chain is started with a SqlBrite observer. Later in the chain I need SqlBrite again, so I access it using a singleton LoveToDo.briteDatabase(). Is this a bad idea? Is there a better way to do this?

Finally, is there any way to break; the chain? It'd be nice if I could drop what I'm doing rather than checking for missing data at each step

1

There are 1 answers

2
Diolor On BEST ANSWER

I see a lot of questions.

  1. Each method/operator is a "task" which will run based on the previous items and emit items to the next operators.
  2. To reduce code verbocity we usually use Retrolambda or Gradle Retrolamda with RxJava. If you don't want to use Retolambda you can create a class NameModel which contains all the logic from the Observable creation up to just before the subscribe(). In there have all the needed logic, isolated.
  3. It's a great idea to always have an onError Func in the subscribe if you have a network call, unless you catch all the possible errors somewhere before e.g. with the onErrorReturn. The onError is there to help you if something goes wrong e.g. notify the user. It's also a good practise to update something in the subscribe and not from inside the chain, thus isolate the operator's content.
  4. The subscribeOn makes the process be on the background, not the observeOn. So no, the observeOn is not needed if you don't change the thread, example here.
  5. The best way to "break" the chain is to throw an error or a more complex is to unsubscribe the chain from the inside using a custom .lift() operator with a custom subscriber.

Update based on the comment:

From the 2 above the second but I would prefer something like that:

Observable.just(LoveToDo.briteDatabase())
        .flatMap(briteDatabase -> {
            Cursor cursor = briteDatabase.query("SELECT * FROM Accounts");

            int count;
            try {
                count = cursor.getCount();

            } finally {
                cursor.close();
            }

            if (count == 0) {
                return LoveToDo.basecampClient().fetchMe()
                        .map(person -> insertPerson(person, briteDatabase));
            }

            // if you want to track the account creation
            return just(false);
        })
        .subscribeOn(Schedulers.io())
        .subscribe(personInserted -> {
            // do something if the person was created or not
        }, e -> {
        });


private Boolean insertPerson(Person person, BriteDatabase briteDatabase) {
    long count = briteDatabase.insert(Account.TABLE, new Account.Builder()
            .accountId(Settings.accountId)
            .userName(Settings.userName)
            .password(Settings.password)
            .agent(Settings.agent)
            .personId(person.id)
            .build());

    return count > 0;
}