We have a java web application that sends (JobsController.java) and receives messages (JMSMessageListener.java) via JMS. After running the application under constant load for 24 hours and taking heap dumps, I observe a constant increase in memory usage that the application does not let go when in idle state. I know that this is going to cause a java heap out of memory issue.
JobsController is an ejb stateless bean and its resources are destroyed correctly after each call. JMSMessageListener gets handled by ejb global bean pool and its instance is reused.
The suspects i can see from the java heap dump are
- EJB bean injection is causing a memory leak https://blog.akquinet.de/2017/01/04/dont-get-trapped-into-a-memory-leak-using-cdi-instance-injection/
- ActiveMQConnection.finalize(). If it is the culprit than it must happen to all those wildfly activemq deployments. Any hint is appreciated.
ActiveMQConnection.java
@Override
protected final void finalize() throws Throwable {
if (!closed) {
if (this.factoryReference.isFinalizeChecks()) {
ActiveMQJMSClientLogger.LOGGER.connectionLeftOpen(creationStack);
}
close();
}
JobsController
@Stateless
public class JobsController {
@Inject
private JMSContext jmsContext;
private Connection connection;
private Session session;
private MessageProducer jmsProducer;
@Resource(lookup = "java:/ConnectionFactory")
private ConnectionFactory connectionFactory;
@Resource(lookup = JAVA_JMS_JOB_QUEUE)
private Queue jobQueue;
@Resource(lookup = JAVA_JMS_QUEUE)
private Queue progressQueue;
@PreDestroy
void release() {
try {
if (jmsProducer != null) {
jmsProducer.close();
}
if (session != null) {
session.close();
}
if (jmsContext != null) {
jmsContext.close();
}
if (connection !=null) {
connection.close();
}
} catch (JMSException e) {
LOG.warn("failed to close JMS resources: {}", e.getMessage());
}
}
public synchronized MessageProducer getJmsProducer() {
if (jmsProducer == null) {
try {
connection = connectionFactory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
jmsProducer = session.createProducer(jobQueue);
connection.start();
} catch (JMSException e) {
LOG.error("failed to setup JMS message producer: {}", e.getMessage());
}
}
return jmsProducer;
}
public void addMessageToProgressQueue(ProgressMessage progressMessage) {
ObjectMessage objectMessage = jmsContext.createObjectMessage(progressMessage);
try {
getJmsProducer().send(progressQueue, objectMessage);
} catch (JMSException e) {
LOG.error("failed to send progress message {}: {}", objectMessage, e.getMessage());
}
}
}
JMSMessageListener.java
@MessageDriven(name = "JMSMessageListener", mappedName = JAVA_JMS_QUEUE, activationConfig = {
@ActivationConfigProperty(
propertyName = "acknowledgeMode",
propertyValue = "Auto-acknowledge"),
@ActivationConfigProperty(
propertyName = "destinationType",
propertyValue = "javax.jms.Queue"),
@ActivationConfigProperty(
propertyName = "destination",
propertyValue = JAVA_JMS_QUEUE)
})
public class JMSMessageListener implements MessageListener {
private static Logger LOG = LoggerFactory.getLogger(JMSMessageListener.class);
@EJB
private JobsController jobsController;
private final ObjectMapper progressMessageMapper;
public JMSMessageListener() {
progressMessageMapper = new ObjectMapper();
progressMessageMapper.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true);
}
@Override
public void onMessage(Message message) {
ProgressMessage progressMessage = null;
try {
if (message instanceof BytesMessage) {
BytesMessage bytesMessage = (BytesMessage) message;
int TEXT_LENGTH = new Long(bytesMessage.getBodyLength()).intValue();
byte[] textBytes = new byte[TEXT_LENGTH];
bytesMessage.readBytes(textBytes, TEXT_LENGTH);
String progressText = new String(textBytes, "UTF-8");
progressText = progressText.replaceAll("'totalSteps': None", "'totalSteps': 0");
progressMessage = progressMessageMapper.readValue(progressText, ProgressMessage.class);
} else if (message instanceof ObjectMessage) {
progressMessage = message.getBody(ProgressMessage.class);
}
if (progressMessage != null) {
jobsController.sendProgressMessage(progressMessage);
} else {
LOG.error("An empty progress message was received");
}
} catch (JMSException | IOException e) {
LOG.error("failed to process progress message: {}", e.getMessage(), e);
}
}
}
Couple of things: