JavaFX and RxJava- TableView infinitely calling setCellValueFactory()

774 views Asked by At

I am running into an issue with the TableView and using reactive bindings from ReactFX for the setCellValueFactory. The EventStream driving the bindings originates from an RxJava Observable however, and in the code below you will find a method that converts it to an EventStream.

However, the TableView never shows anything but the initial binding values for the columns. When I added a System.out.println to the setCellValueFactory() body, I found the setCellValueFactory() was being called in a loop infinitely and the emitted value was never making it to the binding.

I am really puzzled by this. How do I stop this behavior and get the Observable to successfully emit the single value to the EventStream and then the Binding?

Here is my SSCCE.

public class ReactiveTableViewTest extends Application {

    @Override
    public void start(Stage stage) throws Exception {

        Group root = new Group();
        Scene scene = new Scene(root);

        root.getChildren().add(new ReactiveTable(buildSampleData()));

        stage.setScene(scene);
        stage.show();
    }

    private ObservableList<ReactivePoint> buildSampleData() { 
        ObservableList<ReactivePoint> points = FXCollections.observableArrayList();
        points.add(new ReactivePoint(Observable.just(1), Observable.just(2)));
        return points;
    }

    private static final class ReactivePoint {
        private final Observable<Integer> x;
        private final Observable<Integer> y;

        ReactivePoint(Observable<Integer> x, Observable<Integer> y) { 
            this.x = x;
            this.y = y;
        }
        public Observable<Integer> getX() {
            return x;
        }
        public Observable<Integer> getY() { 
            return y;
        }
    }

    private static final class ReactiveTable extends TableView<ReactivePoint> {

        @SuppressWarnings("unchecked")
        private ReactiveTable(ObservableList<ReactivePoint> reactivePoints) { 
            this.setItems(reactivePoints);

            TableColumn<ReactivePoint,Number> xCol = new TableColumn<>("X");
            xCol.setCellValueFactory(cb -> {
                System.out.println("Calling cell value factory for x col");
                return toReactFX(cb.getValue().getX().map(x -> (Number) x)).toBinding(-1); //causes infinite call loop
                //return new SimpleObjectProperty<Number>(1); //works fine
            });

            TableColumn<ReactivePoint,Number> yCol = new TableColumn<>("Y");
            yCol.setCellValueFactory(cb -> {
                System.out.println("Calling cell value factory for y col");
                return toReactFX(cb.getValue().getY().map(y -> (Number) y)).toBinding(-1); //causes infinite call loop
                //return new SimpleObjectProperty<Number>(1); //works fine
            });

            this.getColumns().addAll(xCol, yCol);
        }
    }
    private static <T> EventStream<T> toReactFX(Observable<T> obs) {
        EventSource<T> es = new EventSource<>();
        obs.subscribe(foo -> Platform.runLater(() -> es.push(foo)), e -> e.printStackTrace());
        return es;
    }
    public static void main(String[] args) { 
        launch(args);
    }
}

UPDATE

I think I found an issue with the solution I proposed below. If any Observable is emitted on other threads besides the Platform thread, no value gets populated to the Property.

I tried to remedy this by checking if the thread calling rxToProperty was the Platform thread before putting it on the Platform thread, and this did not work and caused an infinite loop again. I don't know if the threadsafety of Property is derailing things.

But how can I get an Observable emitted on multiple threads to safely populate a Property? Here is my updated SSCCE displaying this behavior. The "X" column never populates because it is multithreaded, but the "Y" column does since it remains on the Platform thread.

public class ReactiveTableViewTest extends Application {

    @Override
    public void start(Stage stage) throws Exception {

        Group root = new Group();
        Scene scene = new Scene(root);

        root.getChildren().add(new ReactiveTable(buildSampleData()));

        stage.setScene(scene);
        stage.show();
    }

    private ObservableList<ReactivePoint> buildSampleData() { 
        ObservableList<ReactivePoint> points = FXCollections.observableArrayList();
        points.add(new ReactivePoint(
                Observable.just(1, 5, 6, 8,2,3,5,2).observeOn(Schedulers.computation()), 
                Observable.just(2,6,8,2,14)
                )
            );
        return points;
    }

    private static final class ReactivePoint {
        private final Observable<Integer> x;
        private final Observable<Integer> y;

        ReactivePoint(Observable<Integer> x, Observable<Integer> y) { 
            this.x = x;
            this.y = y;
        }
        public Observable<Integer> getX() {
            return x;
        }
        public Observable<Integer> getY() { 
            return y;
        }
    }

    private static final class ReactiveTable extends TableView<ReactivePoint> {

        @SuppressWarnings("unchecked")
        private ReactiveTable(ObservableList<ReactivePoint> reactivePoints) { 
            this.setItems(reactivePoints);


            System.out.println("Constructor is happening on FX THREAD: " + Platform.isFxApplicationThread());

            TableColumn<ReactivePoint,Number> xCol = new TableColumn<>("X");
            xCol.setCellValueFactory(cb -> { 
                System.out.println("CellValueFactory for X called on FX THREAD: " + Platform.isFxApplicationThread());
                return rxToProperty(cb.getValue().getX().map(x -> (Number) x));
                }
            );

            TableColumn<ReactivePoint,Number> yCol = new TableColumn<>("Y");
            yCol.setCellValueFactory(cb -> {
                System.out.println("CellValueFactory for Y called on FX THREAD: " + Platform.isFxApplicationThread());
                return rxToProperty(cb.getValue().getY().map(y -> (Number) y));
            }
        );

            this.getColumns().addAll(xCol, yCol);
        }
    }
    private static <T> ObjectProperty<T> rxToProperty(Observable<T> obs) { 
        ObjectProperty<T> property = new SimpleObjectProperty<>();

        obs.subscribe(v -> { 
            if (Platform.isFxApplicationThread()) {
                System.out.println("Emitting " + v + " on FX Thread");
                property.set(v);
            }
            else { 
                System.out.println("Emitting " + v + " on Non-FX Thread");
                Platform.runLater(() -> property.set(v));
            }
        });

        return property;
    }
    public static void main(String[] args) { 
        launch(args);
    }
}
1

There are 1 answers

1
tmn On BEST ANSWER

I found a solution although I have not quite figured out the exact root cause of the problem in the OP code (if anybody can enlighten why, I will mark it as the answer). I initially thought the Platform.runLater() might be causing an infinite loop as originally set up (shown below).

private static <T> EventStream<T> toReactFX(Observable<T> obs) {
    EventSource<T> es = new EventSource<>();
    obs.subscribe(foo -> Platform.runLater(() -> es.push(foo)), e -> e.printStackTrace());
    return es;
}

This theory turned out to be correct. Removing the Platform.runLater() caused the infinite loop to go away. Perhaps the emitted value kept throwing itself to the back of the GUI thread, therefore never making it to the Binding? But still nothing was being emitted and the table values stayed at -1, the initial Binding value.

private static <T> EventStream<T> toReactFX(Observable<T> obs) {
    EventSource<T> es = new EventSource<>();
    obs.subscribe(foo -> es.push(foo), e -> e.printStackTrace());
    return es;
}

I did find something that worked. I created a new conversion method called rxToProperty(), and replaced the calls to rxToReactFX() with it. After that, everything seemed to work great.

private static <T> ObjectProperty<T> rxToProperty(Observable<T> obs) { 
    ObjectProperty<T> property = new SimpleObjectProperty<>();
    obs.subscribe(v -> property.set(v));
    return property;
}

And here are the new TableColumn setups.

TableColumn<ReactivePoint,Number> xCol = new TableColumn<>("X");
xCol.setCellValueFactory(cb -> rxToProperty(cb.getValue().getX().map(x -> (Number) x)));

TableColumn<ReactivePoint,Number> yCol = new TableColumn<>("Y");
yCol.setCellValueFactory(cb -> rxToProperty(cb.getValue().getY().map(y -> (Number) y)));

If anybody has a better solution, or can explain why the Platform.runLater() and EventStream was not working, I will mark that as the accepted answer.

UPDATE

Had some issues with Observables emitting on non-FX threads, and values never populating to the Property. I found this could be resolved by using cache() to hold the last value, and it would re=emit on the calling thread that subscribes, which would be the FX thread. I also did some synchronization and read only wrappers of the returned Property.

private static <T> ReadOnlyObjectProperty<T> rxToProperty(Observable<T> obs) { 
        ReadOnlyObjectWrapper<T> property = new ReadOnlyObjectWrapper<>();

        obs.cache(1).subscribe(v -> { 
            synchronized(property) { 
                if (Platform.isFxApplicationThread()) {
                    System.out.println("Emitting val " + v + " on FX Thread");
                }
                else { 
                    System.out.println("Emitting val " + v + " on Non-FX Thread");
                }
                property.set(v);
            }
        });

        return property.getReadOnlyProperty();
    }

FINAL UPDATE Tomas Mikula gave some very helpful insight to this behavior, as well as the solution, on the ReactFX GitHub project.

https://github.com/TomasMikula/ReactFX/issues/22