I've implemented a queuer in Spring WebFlux as I asked about here: Queuing up requests in Spring WebFlux
TL;DR is that it queues up incoming requests to an external API, and then batches them after waiting for X requests or Y seconds. After the implementation was working I started adding unit tests. So what I would like to test is that if my API receives one request it will queue it up for up to Y seconds, and if it then receives X - 1 more requests, it will then dispatch the requests.
If I curl my API using the same approach it works as expected and I'm seeing the following in the logs:
curl "localhost:8080/batch?orderIds=1"
curl "localhost:8080/batch?orderIds=2,3,4,5"
123456789101112132023-08-13T12:56:42.875+02:00 com.testreactor.BatcherService : Received request for orders with ids: [1]
2023-08-13T12:56:42.886 com.testreactor.BatcherService : Queuing up request for order id 1
2023-08-13T12:56:44.894 com.testreactor.BatcherService : Received request for orders with ids: [2, 3, 4, 5]
2023-08-13T12:56:44.894 com.testreactor.BatcherService : Queuing up request for order id 2
2023-08-13T12:56:44.894 com.testreactor.BatcherService : Queuing up request for order id 3
2023-08-13T12:56:44.895 com.testreactor.BatcherService : Queuing up request for order id 4
2023-08-13T12:56:44.895 com.testreactor.BatcherService : Queuing up request for order id 5
2023-08-13T12:56:44.895 com.testreactor.BatcherService : Dispatching queue
2023-08-13T12:56:44.897 com.testreactor.BatcherService : Order with id 1 received
2023-08-13T12:56:44.934 com.testreactor.BatcherService : Order with id 2 received
2023-08-13T12:56:44.934 com.testreactor.BatcherService : Order with id 3 received
2023-08-13T12:56:44.935 com.testreactor.BatcherService : Order with id 4 received
2023-08-13T12:56:44.935 com.testreactor.BatcherService : Order with id 5 received
The unit test I've implemented looks like this:
@Test
public void testOnePlusFour() {
var order1 = new Order(1L, UUID.randomUUID());
var order2 = new Order(2L, UUID.randomUUID());
var order3 = new Order(3L, UUID.randomUUID());
var order4 = new Order(4L, UUID.randomUUID());
var order5 = new Order(5L, UUID.randomUUID());
var oneFirstOrder = List.of(order1);
var fourAdditionalOrders = List.of(order2, order3, order4, order5);
var allOrders = Stream.concat(oneFirstOrder.stream(), fourAdditionalOrders.stream()).toList();
when(orderApi.getOrders(eq(allOrders.stream().map(Order::id).toList())))
.thenReturn(Flux.fromStream(allOrders.stream()));
var batcherService = new AtomicReference<BatcherService>();
StepVerifier.withVirtualTime(() -> {
var bs = new BatcherService(orderApi);
batcherService.set(bs);
return bs.getOrders(oneFirstOrder.stream().map(Order::id).toList());
})
.expectSubscription()
.expectNoEvent(Duration.ofSeconds(4))
.then(() -> batcherService.get().getOrders(fourAdditionalOrders.stream().map(Order::id).toList()))
.expectNext(allOrders)
.expectComplete()
.verify();
}
What happens is that it's just queued and never dispatched (no more logs after this, the test is still running):
13:04:16.443 [main] INFO com.testreactor.BatcherService -- Received request for orders with ids: [1]
13:04:16.473 [main] INFO com.testreactor.BatcherService -- Queuing up request for order id 1
13:04:16.481 [main] INFO com.testreactor.BatcherService -- Received request for orders with ids: [2, 3, 4, 5]
How can I implement the unit test utilizing StepVerifier to verify that the first order is indeed queued for up to five seconds before the queue is filled?
Lastly, for reference, is the implementation of BatcherService and associated data classes:
public record Order(Long id, UUID storeId) {
}
public record QueueItem<T, R>(T id, Sinks.One<R> sink) {
}
@Service
public class BatcherService {
private static final Logger LOG = LoggerFactory.getLogger(BatcherService.class);
private final Sinks.Many<QueueItem<Long, Order>> queue = Sinks.many().multicast().onBackpressureBuffer();
private final ConcurrentMap<Long, Mono<Order>> queuedItems = new ConcurrentHashMap<>();
public BatcherService(OrderApi orderApi) {
queue.asFlux()
.bufferTimeout(5, Duration.ofSeconds(5))
.flatMap(queueItems -> {
LOG.info("Dispatching queue");
var orderIds = queueItems.stream().map(QueueItem::id).toList();
return orderApi.getOrders(orderIds)
.doOnNext(order -> {
LOG.info("Order with id {} received", order.id());
queueItems.stream()
.filter(queueItem -> queueItem.id().equals(order.id()))
.findFirst()
.ifPresent(queueItem -> queueItem.sink().tryEmitValue(order));
})
.then();
})
.subscribe();
}
public Mono<List<Order>> getOrders(List<Long> orderIds) {
LOG.info("Received request for orders with ids: {}", orderIds);
return Flux.fromIterable(orderIds)
.flatMapSequential(this::getOrder)
.collectList();
}
private Mono<Order> getOrder(Long orderId) {
var existingQueuedRequest = queuedItems.get(orderId);
if (existingQueuedRequest == null) {
LOG.info("Queuing up request for order id {}", orderId);
existingQueuedRequest = enqueueRequest(orderId);
queuedItems.put(orderId, existingQueuedRequest);
}
else {
LOG.info("Order {} already found in the queue", orderId);
}
return existingQueuedRequest;
}
private Mono<Order> enqueueRequest(Long orderId) {
Sinks.One<Order> sink = Sinks.one();
queue.tryEmitNext(new QueueItem<>(orderId, sink));
return sink.asMono();
}
}