Reactive Programming with Reactor and RabbitMQ

1.9k views Asked by At

Recently I wrote a demo program to launch reactive programming with a combination of Reactor and RabbitMQ. This is my demo code :

public class FluxWithRabbitMQDemo {

private static final String QUEUE = "demo_thong";

private final reactor.rabbitmq.Sender sender;
private final Receiver receiver;

public FluxWithRabbitMQDemo() {
    this.sender = ReactorRabbitMq.createSender();
    this.receiver = ReactorRabbitMq.createReceiver();
}

public void run(int count) {
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.useNio();
    SenderOptions senderOptions =  new SenderOptions()
            .connectionFactory(connectionFactory)
            .resourceCreationScheduler(Schedulers.elastic());

    reactor.rabbitmq.Sender sender = ReactorRabbitMq.createSender(senderOptions);

    Mono<AMQP.Queue.DeclareOk> queueDeclaration = sender.declareQueue(QueueSpecification.queue(QUEUE));
    Flux<Delivery> messages = receiver.consumeAutoAck(QUEUE);
    queueDeclaration.thenMany(messages).subscribe(m->System.out.println("Get message "+ new String(m.getBody())));



    Flux<OutboundMessageResult> dataStream = sender.sendWithPublishConfirms(Flux.range(1, count)
            .filter(m -> !m.equals(10))
            .parallel()
            .runOn(Schedulers.parallel())
            .doOnNext(i->System.out.println("Message  " + i + " run on thread "+Thread.currentThread().getId()))
            .map(i -> new OutboundMessage("", QUEUE, ("Message " + i).getBytes())));

    sender.declareQueue(QueueSpecification.queue(QUEUE))
            .thenMany(dataStream)
            .doOnError(e -> System.out.println("Send failed"+ e))
            .subscribe(m->{
                if (m!= null){
                    System.out.println("Sent successfully message "+new String(m.getOutboundMessage().getBody()));
                }
            });

    try {
        Thread.sleep(20000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

}

public static void main(String[] args) throws Exception {
    int count = 20;
    FluxWithRabbitMQDemo sender = new FluxWithRabbitMQDemo();
    sender.run(count);
}

} I expected that after Flux emit an item, Sender must send it to RabbitMQ and after receiving RabbitMQ the Receiver must receive it. But everything happened sequentially and this is the result I got

Message  3 run on thread 25
Message  4 run on thread 26
Message  8 run on thread 26
Message  13 run on thread 26
Message  17 run on thread 26
Message  2 run on thread 24
Message  1 run on thread 23
Message  6 run on thread 24
Message  5 run on thread 23
Message  9 run on thread 23
Message  14 run on thread 23
Message  18 run on thread 23
Message  11 run on thread 24
Message  15 run on thread 24
Message  19 run on thread 24
Message  7 run on thread 25
Message  12 run on thread 25
Message  16 run on thread 25
Message  20 run on thread 25
Sent successfully message Message 3
Sent successfully message Message 1
Sent successfully message Message 2
Sent successfully message Message 4
Sent successfully message Message 5
Sent successfully message Message 6
Sent successfully message Message 8
Sent successfully message Message 9
Sent successfully message Message 11
Sent successfully message Message 13
Sent successfully message Message 14
Sent successfully message Message 15
Sent successfully message Message 17
Sent successfully message Message 18
Sent successfully message Message 19
Sent successfully message Message 7
Sent successfully message Message 12
Sent successfully message Message 16
Sent successfully message Message 20
Get message Message 3
Get message Message 1
Get message Message 2
Get message Message 4
Get message Message 5
Get message Message 6
Get message Message 8
Get message Message 9
Get message Message 11
Get message Message 13
Get message Message 14
Get message Message 15
Get message Message 17
Get message Message 18
Get message Message 19
Get message Message 7
Get message Message 12
Get message Message 16
Get message Message 20

I do not know what to do with my code to achieve the results as expected. Can someone help me? Thanks for advance!!!

1

There are 1 answers

0
Alexei Kaigorodov On BEST ANSWER

Messages are generated too fast. To see the interleaving, in dataStream add

.doOnNext(i->Thread.sleep(10))