I am using stream bridge to send messages over pubsub
I have some 100k messages that I need to push to pubsub. I am using executor pool of 5 threads to do the job
private void fetchAndPublish(List<MyObject> list) {
int threads = 5 < list.size() ? 5 : list.size();
ExecutorService es = Executors.newFixedThreadPool(threads);
CompletableFuture<?>[] futures = list.stream()
.map(s -> new MyRunnable(s))
.map(task -> CompletableFuture.runAsync(task, es))
.toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();
es.shutdown();
}
public class MyRunnable implements Runnable {
private final MyObject myObject;
public MyRunnable(MyObject myObject) {
this.myObject = myObject;
}
@Override
public void run() {
try {
Message<MyObject> message = MessageBuilder.withPayload(myObject)
.build();
messagePublisherService.publish(message);
} catch (Exception exception) {
log.error("Failed ", exception);
}
}
}
My service to publish message is here
@Component
@Slf4j
public class MessagePublisherService {
@Value("${publish.destination:myDes-out-0}")
private String destination;
@Autowired
private StreamBridge streamBridge;
public void publish(Message<MyObject> message) {
log.info("publishing {}", message.getPayload().getId());
streamBridge.send(destination, message);
}
}
It works good, but after some time I am getting Out of memory exception. What I am noticing is the memory is not release as soon as the data is pushed to streamBridge
I have 16GB of heap memory, and it gets filled very quickly. My data is not that huge.
Dynatrace Stats I can see the initially the publish call to pubsubs took 200+ms. But progressively this time was increased and occuapaid resources
This is my Dynatrace slowly we can see that in the beginning the requests to PUBLISH to pubsub took small time
After some time it started to take so much time and occupy resources