Dynamically creating asynchronous message queues in Java

8.6k views Asked by At

I need to create asynchronous message queues dynamically in Java. My use case is sending email via multiple SMTP servers: I need to enforce that emails to the same SMTP server are processes sequentially, but emails to different SMTP servers may be processed concurrently. I've used JMS in the past, but as far as I can see it only allows for compile-time queue creation, whereas I need to create queues at runtime (one queue for each SMTP server).

Am I missing something regarding JMS or is there some other tool/proposal which I should have a look at?

5

There are 5 answers

0
mhaller On BEST ANSWER

I agree with Adam, the use case sounds like JMS is overhead. Java built-in functionality sufficient:

package de.mhaller;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;

import org.junit.Assert;
import org.junit.Test;

public class Mailer {

    @Test
    public void testMailer() throws Exception {
        ExecutorService executor = Executors.newCachedThreadPool();
        ArrayList<Mail> log = new ArrayList<Mail>();
        LinkedBlockingDeque<Mail> incoming = new LinkedBlockingDeque<Mail>();

        // TODO: Put mails to be sent into the incoming queue
        incoming.offer(new Mail("foo1@localhost", "localhost"));
        incoming.offer(new Mail("foo2@otherhost", "otherhost"));
        incoming.offer(new Mail("foo3@otherhost", "otherhost"));
        incoming.offer(new Mail("foo4@localhost", "localhost"));

        Map<Mailserver, Queue<Mail>> queues = new HashMap<Mailserver, Queue<Mail>>();
        while (!incoming.isEmpty()) {
            Mail mail = incoming.pollFirst();
            Mailserver mailserver = findMailserver(mail);
            if (!queues.containsKey(mailserver)) {
                ArrayDeque<Mail> serverQueue = new ArrayDeque<Mail>();
                queues.put(mailserver, serverQueue);
                executor.execute(new SendMail(mailserver, serverQueue));
            }
            Queue<Mail> slot = queues.get(mailserver);
            slot.offer(mail);
        }

        assertMailSentWithCorrectServer(log);
    }

    private void assertMailSentWithCorrectServer(ArrayList<Mail> log) {
        for (Mail mail : log) {
            if (!mail.server.equals(mail.sentBy.mailserver)) {
                Assert.fail("Mail sent by wrong server: " + mail);
            }
        }
    }

    private Mailserver findMailserver(Mail mail) {
        // TODO: Your lookup logic which server to use
        return new Mailserver(mail.server);
    }

    private static class Mail {
        String recipient;
        String server;
        SendMail sentBy;

        public Mail(String recipient, String server) {
            this.recipient = recipient;
            this.server = server;
        }

        @Override
        public String toString() {
            return "mail for " + recipient;
        }
    }

    public static class SendMail implements Runnable {

        private final Deque<Mail> queue;
        private final Mailserver mailserver;

        public SendMail(Mailserver mailserver, Deque<Mail> queue) {
            this.mailserver = mailserver;
            this.queue = queue;
        }

        @Override
        public void run() {
            while (!queue.isEmpty()) {
                Mail mail = queue.pollFirst();
                // TODO: Use SMTP to send the mail via mailserver
                System.out.println(this + " sent " + mail + " via " + mailserver);
                mail.sentBy = this;
            }
        }

    }

    public static class Mailserver {
        String hostname;

        public Mailserver(String hostname) {
            this.hostname = hostname;
        }

        @Override
        public String toString() {
            return hostname;
        }

        @Override
        public int hashCode() {
            return hostname.hashCode();
        }

        @Override
        public boolean equals(Object obj) {
            return hostname.equals(((Mailserver) obj).hostname);
        }

    }

}
1
Steve B. On

I've done this with activemq - I actually posted a question on this at the time, as I had similar concerns (the JMS documentation at the time stated that this was not supported) and was assured that it was supported.

0
AudioBubble On

Create a queue for each of your SMTP sever and limit queue consumer(MDB or a message listener) to 1

0
Will Hartung On

JMS itself as a spec is rather silent on the issue. Most implementations allow you to do this, just not through JMS itself, but using their own API. But you won't be able to hook up something formal like an MDB to a dynamic queue. Rather you'll need to manage your own connections and listeners.

0
djna On

The last time we looked at this in a WebSphere environment it was surprisingly difficult/impossible to create queues dynamically (temporary queues are too transient for you I think). Although APIs for creating queues existed they required a server restart afterwards to become active. Then there's the MDB issue allused to.

How about a dirty work-around based on the adage that all problems can be solved by an extra level of indirection, which assumes that the set of available printers is comparatively small.

Create Queues Printer01 to Printer99 (or some smaller number). Have a "database" which maps queues to real printers. As requests for printers come along you can add to the mapping table. You might have some overhead of MDBs looking at queues that will never be used, but unless your pootential number of printers is vast maybe you can afford it?