I am losing messages using spring-reactor, what is wrong with my setup?

2.2k views Asked by At

I thought I'd look into Pivotal's newly released reactor framework for a simple program I am writing which requires some multi-threading to complete in a timely manner.

I wrote the following sample project to get to know the framework and play with it to understand how it is used:

Main.java:

package reactortest;

import org.springframework.context.annotation.AnnotationConfigApplicationContext;

public class Main { 
    public static void main(String[] args) throws InterruptedException {
        try(AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(MainConfiguration.class)) {
            MyProducer producer = context.getBean(MyProducer.class);
            producer.run();
        }
    }
}

MyProducer.java:

package reactortest;

import java.util.concurrent.CountDownLatch;

import reactor.core.Reactor;
import reactor.event.Event;

public class MyProducer {
    private final Reactor reactor;
    private final Integer messagesToPrint;
    private final CountDownLatch countDownLatch;

    public MyProducer(final Reactor reactor, final Integer messagesToPrint, CountDownLatch countDownLatch) {
        this.reactor = reactor;
        this.messagesToPrint = messagesToPrint;
        this.countDownLatch = countDownLatch;
    }

    public void run() throws InterruptedException {
        for(int i = 0; i < messagesToPrint; ++i) {
            reactor.notify(Event.wrap("String event: " + i));
        }

        countDownLatch.await();
        System.out.println("Finished. Remaining count is: " + countDownLatch.getCount());
    }
}

MyConsumer.java:

package reactortest;

import java.util.concurrent.CountDownLatch;

import reactor.event.Event;
import reactor.function.Consumer;

public class MyConsumer implements Consumer<Event<String>> {
    private final CountDownLatch countDownLatch;

    public MyConsumer(CountDownLatch countDownLatch) {
        this.countDownLatch = countDownLatch;
    }

    @Override
    public void accept(Event<String> message) {
        System.out.println(message);
        countDownLatch.countDown();
    }
}

and finally, MainConfiguration.java:

package reactortest;

import java.util.concurrent.CountDownLatch;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import reactor.core.Environment;
import reactor.core.Reactor;
import reactor.core.spec.Reactors;
import reactor.spring.context.config.EnableReactor;

@Configuration
@EnableReactor
public class MainConfiguration {
    private final Integer MESSAGESTOPRINT = 10;

    @Autowired private Environment environment;

    @Bean
    public CountDownLatch countDownLatch() {
        CountDownLatch countDownLatch = new CountDownLatch(MESSAGESTOPRINT);
        return countDownLatch;
    }

    @Bean
    public Reactor reactor() {
        Reactor reactor = Reactors.reactor().env(environment).dispatcher(Environment.THREAD_POOL).randomEventRouting().get();
        reactor.on(consumer());
        return reactor;
    }

    @Bean
    public MyProducer producer() {
        MyProducer producer = new MyProducer(reactor(), MESSAGESTOPRINT, countDownLatch());
        return producer;
    }

    @Bean
    public MyConsumer consumer() {
        MyConsumer consumer = new MyConsumer(countDownLatch());
        return consumer;
    }
}

My problem is that the program never finishes. The consumer also prints out different information each run. From three consecutive runs it printed:

1st run:
Event{id=null, headers=null, replyTo=null, data=String event: 0}
Event{id=null, headers=null, replyTo=null, data=String event: 1}
Event{id=null, headers=null, replyTo=null, data=String event: 7}
Event{id=null, headers=null, replyTo=null, data=String event: 8}

2nd run:
Event{id=null, headers=null, replyTo=null, data=String event: 0}
Event{id=null, headers=null, replyTo=null, data=String event: 1}
Event{id=null, headers=null, replyTo=null, data=String event: 5}
Event{id=null, headers=null, replyTo=null, data=String event: 6}
Event{id=null, headers=null, replyTo=null, data=String event: 9}

3rd run:
Event{id=null, headers=null, replyTo=null, data=String event: 2}
Event{id=null, headers=null, replyTo=null, data=String event: 4}
Event{id=null, headers=null, replyTo=null, data=String event: 6}

It seems like I must have missed something really obvious, since apart from this being javaconfig instead of annotation configured, and not having any interaction with the outside world I can't see how this differs from the example here.

1

There are 1 answers

0
Jamey On BEST ANSWER

While asking this question, I was refining the code and it eventually worked (some great rubber ducking there). I thought rather than deleting my question I would post it in case anyone else runs into the same problem.

The problem with the above code is the randomEventRouting() call while setting up the reactor, when setting this flag it randomly selects the consumer to route to. Because I am not setting up a specific selector/key to define the consumer to dispatch to, and since all consumers match when no key is provided, I assume there is a default consumer being setup behind the scenes which is being passed some of my events.

Changing the reactor.on() to accept a selector:

reactor.on(Selectors.$(selector()), consumer());

where the selector is simply:

@Bean
public String selector() {
    String selector = "My very special event";
    return selector;
}

and injecting this key into the producer, and using it when calling reactor.notify():

reactor.notify(selector, Event.wrap("String event: " + i));

worked as expected.

I imagine this is quite an edge case, since most users will (and should) be defining keys, but you never know. :)