RabbitMQ/Sneakers - Limit specific queue to only one worker at a time?

1.8k views Asked by At

I have a use case with RabbitMQ and the Sneakers gem where I have multiple Workers running to respond to several dozen queues in my project. So it's very likely that workers may be processing messages from the same queue at the same time.

With one queue in particular, though - let's call it :one_at_a_time - I only ever want one worker to be able to process a message from the queue at any given time.

The reason I want to do this is because the worker is designed to perform the following actions:

  1. Look up an AR object by the passed-in ID
  2. Check that an attribute - let's say :worked - is set.
    1. If true, then ack! the message.
    2. If false, email a user, then set :worked to true.

This is designed so that I don't accidentally email the user twice in case two messages are created in rapid succession with the same object ID. And this design would work fine if only one worker ever listened to this queue at any given time, because the first run would go through steps 1 -> 2 -> 2, then the next run would go through steps 1 -> 2 -> 1 and would not email the user. But in testing, I found that there is the potential for a race condition where two workers will pull a message off the :one_at_a_time queue at the same time, get past the check that :worked is set, and both send an email.

So with all this in mind, is there a way I can limit the number of workers that listen to a queue? Thank you.

1

There are 1 answers

0
Mario On

For further references, Argus9's request can be archived following the below steps:

1) You can control your worker's opts to be:

class YourWorker
include Sneakers::Worker
from_queue "your_queue",
           :env => nil,
           :ack => true,
           :workers => 1, #Number of per-cpu processes to run
           :prefetch => 1, #This param will define that single message will be fetched per time
           :threads => 1, #This will define that you have single thread running
           :heartbeat => 2,
           :share_threads => true,
           :timeout_job_after => 3600,
           :exchange => 'your_exchange'

def work(args={})
 #... your steps here
end 
end

2) You need to pay attention to what you have specified in your sneakers.rb as initial parameters (as its consumed by the Sneakers::Runner at worker initialization), so be sure to have the correct params in there like:

Sneakers.configure  :amqp => url,
                :daemonize => true,
                :ack => true,
                :prefetch => 1,
                :threads => 1,
                :start_worker_delay => 0.1,  
                :workers => 1,               
                :exchange => "your_exchange",
                :exchange_type => :direct,
                :log => "log/sneakers.log"
Sneakers.logger.level = Logger::DEBUG

You can also build some extra controls using the RabbitMQ API, which will enables you to check things like if is there already some message been processing?... etc what is not that easy to be archived using bunny and so. Using a very simple code like:

    def queue_info
    queues_infos = {}    
    rabbitmqctl_url = "http://127.0.0.1:15672"
    rabbitmqctl_user = "your_user"
    rabbitmqctl_password = "your_password"
    uri = URI.parse("#{rabbitmqctl_url}/api/queues")
    request = Net::HTTP::Get.new(uri)
    request.basic_auth(rabbitmqctl_user, rabbitmqctl_password)
    req_options = { use_ssl: uri.scheme == 'https' }
    response = Net::HTTP.start(uri.hostname, uri.port, req_options)  do |http|
      http.request(request)
    end
    queue_details = JSON.parse(response.body)
    queue_details.each do |queue|
      queues_infos[queue['name'].to_s] = {  name: queue['name'],
                                            msg_total: queue['messages'],
                                            msg_ready: queue['messages_ready'],
                                            msg_unacknowlged: queue['messages_unacknowledged'],
                                            state: queue['state'],
                                            consumers: queue['consumers'] }
    end
    return queues_infos
end