Grails rabbitmq native filter messages to consumer

712 views Asked by At

I'm using grails 3.2.3 version and rabbitmq native plugin 3.3.2 (http://budjb.github.io/grails-rabbitmq-native/doc/manual/). I am trying to achieve the following scenario. enter image description here
Description: I'm sending multiple messages to a single queue with headers and On the consumer section I tried to apply binding to consume message by specific filtering. But the consumer consumes all messages regardless of filtering - means the binding is not working. Also I'm starter on rabbitmq. So any help/direction is much appreciated. Below is my code.

Queue config in application.groovy:

rabbitmq {
    queues = [
        [
                name      : "mail.queue",
                connection: "defaultConnection",
                durable   : true
        ]
]

}

Sending to queue function:

protected void sendToQueue(QueueType queueType, Map message, Map<String, String> binding = null) {
    rabbitMessagePublisher.send {
        routingKey = queueType.queueName
        body = message
        autoConvert = true
        if (headers != null) {
            headers = binding
        }
    }
}

Here on sendToQueue I made the third parameter optional as I won't need multiple types of consumers in some cases;

Calling send to queue:

sendToQueue(QueueType.EMAIL_QUEUE, [user: user], ["emailType": EmailType.PASSWORD_RESET.name()])
sendToQueue(QueueType.EMAIL_QUEUE, [user: user], ["emailType": EmailType.PASSWORD_RESET_SUCCESS.name()])

Consumer 1:

static rabbitConfig = [
        queue   : QueueType.EMAIL_QUEUE.queueName,
        binding : ["emailType": EmailType.PASSWORD_RESET.name()],
        match   : "all",
        consumer: 10
]

def handleMessage(Map message, MessageContext context) {
    print("From PasswordResetEmailConsumer consumer")
    println(message)
    passwordResetEmailService.sendPasswordResetMail(message)
}

Consumer 2:

static rabbitConfig = [
        queue   : QueueType.EMAIL_QUEUE.queueName,
        binding : ["emailType": EmailType.PASSWORD_RESET_SUCCESS.name()],
        match   : "all",
        consumer: 10
]

def handleMessage(Map message, MessageContext context) {
    print("From PasswordResetSuccessEmailConsumer consumer")
    println(message)
    passwordResetSuccessEmailService.sendPasswordResetSuccessMail(message)
}
1

There are 1 answers

0
Mamun Sardar On BEST ANSWER

After reading the rabbitmq documentation I realized that it is not possible to selectively pull messages from a single queue.

Consumer receives all messages from the queue

Although there is another option "Exchange" where publisher will publish message to exchange with a routing key and those messages will be delivered to the bound queues. More: RabbitMQ Publish/Subscribe Model
The basic idea is also described here: Stackoverflow: RabbitMQ selectively retrieving messages from a queue
Anyway, in my solution I didn't want multiple queues. So I created a single consumer and pass actual handler class bean reference with message to dispatch the message. Sharing the implementation, hope this helps someone:

Queue config in application.groovy:

rabbitmq {
    queues = [
        [
                name      : "mail.queue",
                connection: "defaultConnection",
                durable   : true
        ]
    ]
}

Sending to queue function:

protected void sendToQueue(Map message, QueueType queueType, Class<BaseQueueHandler> queueHandlerServiceClass) {
    message.queueHandlerServiceClass = queueHandlerServiceClass.name
    rabbitMessagePublisher.send {
        routingKey = queueType.queueName // queue name from enum: "mail.queue"
        body = message
        autoConvert = true
    }
}

Handler Interface:

interface BaseQueueHandler {
    void handleMessage(Map message, MessageContext context)
}

Sending to queue:

sendToQueue([user: user], QueueType.EMAIL_QUEUE, PasswordResetEmailService.class)

Queue Consumer:

class EchoEmailQueueConsumer {

    static rabbitConfig = [
            queue   : QueueType.ECHO_EMAIL_QUEUE.queueName,
            consumer: 10
    ]

    GrailsApplication grailsApplication

    def handleMessage(Map message, MessageContext context) {
        String handlerClass = message.remove("queueHandlerServiceClass")
        Class<BaseQueueHandler> handlerClassType = Class.forName(handlerClass);
        BaseQueueHandler queueService = grailsApplication.mainContext.getBean(handlerClassType)
        queueService.handleMessage(message, context)
    }

}

Finally Handler service which implements Handler interface:

class PasswordResetEmailService implements BaseQueueHandler {

    @Override
    void handleMessage(Map message, MessageContext context) {
        println("message received in PasswordResetEmailService")
    }
}