Recently I wrote a demo program to launch reactive programming with a combination of Reactor and RabbitMQ. This is my demo code :
public class FluxWithRabbitMQDemo {
private static final String QUEUE = "demo_thong";
private final reactor.rabbitmq.Sender sender;
private final Receiver receiver;
public FluxWithRabbitMQDemo() {
this.sender = ReactorRabbitMq.createSender();
this.receiver = ReactorRabbitMq.createReceiver();
}
public void run(int count) {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.useNio();
SenderOptions senderOptions = new SenderOptions()
.connectionFactory(connectionFactory)
.resourceCreationScheduler(Schedulers.elastic());
reactor.rabbitmq.Sender sender = ReactorRabbitMq.createSender(senderOptions);
Mono<AMQP.Queue.DeclareOk> queueDeclaration = sender.declareQueue(QueueSpecification.queue(QUEUE));
Flux<Delivery> messages = receiver.consumeAutoAck(QUEUE);
queueDeclaration.thenMany(messages).subscribe(m->System.out.println("Get message "+ new String(m.getBody())));
Flux<OutboundMessageResult> dataStream = sender.sendWithPublishConfirms(Flux.range(1, count)
.filter(m -> !m.equals(10))
.parallel()
.runOn(Schedulers.parallel())
.doOnNext(i->System.out.println("Message " + i + " run on thread "+Thread.currentThread().getId()))
.map(i -> new OutboundMessage("", QUEUE, ("Message " + i).getBytes())));
sender.declareQueue(QueueSpecification.queue(QUEUE))
.thenMany(dataStream)
.doOnError(e -> System.out.println("Send failed"+ e))
.subscribe(m->{
if (m!= null){
System.out.println("Sent successfully message "+new String(m.getOutboundMessage().getBody()));
}
});
try {
Thread.sleep(20000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
int count = 20;
FluxWithRabbitMQDemo sender = new FluxWithRabbitMQDemo();
sender.run(count);
}
} I expected that after Flux emit an item, Sender must send it to RabbitMQ and after receiving RabbitMQ the Receiver must receive it. But everything happened sequentially and this is the result I got
Message 3 run on thread 25
Message 4 run on thread 26
Message 8 run on thread 26
Message 13 run on thread 26
Message 17 run on thread 26
Message 2 run on thread 24
Message 1 run on thread 23
Message 6 run on thread 24
Message 5 run on thread 23
Message 9 run on thread 23
Message 14 run on thread 23
Message 18 run on thread 23
Message 11 run on thread 24
Message 15 run on thread 24
Message 19 run on thread 24
Message 7 run on thread 25
Message 12 run on thread 25
Message 16 run on thread 25
Message 20 run on thread 25
Sent successfully message Message 3
Sent successfully message Message 1
Sent successfully message Message 2
Sent successfully message Message 4
Sent successfully message Message 5
Sent successfully message Message 6
Sent successfully message Message 8
Sent successfully message Message 9
Sent successfully message Message 11
Sent successfully message Message 13
Sent successfully message Message 14
Sent successfully message Message 15
Sent successfully message Message 17
Sent successfully message Message 18
Sent successfully message Message 19
Sent successfully message Message 7
Sent successfully message Message 12
Sent successfully message Message 16
Sent successfully message Message 20
Get message Message 3
Get message Message 1
Get message Message 2
Get message Message 4
Get message Message 5
Get message Message 6
Get message Message 8
Get message Message 9
Get message Message 11
Get message Message 13
Get message Message 14
Get message Message 15
Get message Message 17
Get message Message 18
Get message Message 19
Get message Message 7
Get message Message 12
Get message Message 16
Get message Message 20
I do not know what to do with my code to achieve the results as expected. Can someone help me? Thanks for advance!!!
Messages are generated too fast. To see the interleaving, in
dataStreamadd