Bunny and RabbitMQ - Adapting the WorkQueue tutorial to cancel subscribing when the Queue is completely worked

329 views Asked by At

I fill up my queue, check it has the right number of tasks to work and then have workers in parallell set to prefetch(1) to ensure each just takes one task at a time.

I want each worker to work its task, send a manual acknowledgement, and keep working taking from the queue if there is more work.

If there is not more work, i.e. the queue is empty, I want the worker script to finish up and return(0).

So, this is what I have now:

require 'bunny'

connection = Bunny.new("amqp://my_conn")
connection.start

channel = connection.create_channel
queue = channel.queue('my_queue_name')

channel.prefetch(1)
puts ' [*] Waiting for messages.'

begin
  payload = 'init'
  until queue.message_count == 0
    puts "worker working queue length is #{queue.message_count}"
    _delivery_info, _properties, payload = queue.pop
    unless payload.nil?
      puts " [x] Received #{payload}"
      raise "payload invalid" unless payload[/cucumber/]

      begin
        do_stuff(payload)
      rescue => e
        puts "Error running #{payload}: #{e.backtrace.join('\n')}"
        #failing stuff
      end
    end
    puts " [x] Done with #{payload}"
  end

  puts "done with queue"
  connection.close
  exit(0)
ensure
  connection.close
end 

I want to still make sure I am done when the queue is empty. This is the example from the RabbitMQ site... https://www.rabbitmq.com/tutorials/tutorial-two-ruby.html . It has a number of things we want for our work queue, most importantly manual acknowledgements. But it does not stop running and I need that to happen programmatically when the queue is done:

#!/usr/bin/env ruby
require 'bunny'

connection = Bunny.new(automatically_recover: false)
connection.start

channel = connection.create_channel
queue = channel.queue('task_queue', durable: true)

channel.prefetch(1)
puts ' [*] Waiting for messages. To exit press CTRL+C'

begin
  queue.subscribe(manual_ack: true, block: true) do |delivery_info, _properties, body|
    puts " [x] Received '#{body}'"
    # imitate some work
    sleep body.count('.').to_i
    puts ' [x] Done'
    channel.ack(delivery_info.delivery_tag)
  end
rescue Interrupt => _
  connection.close
end

How can this script be adapted to exit out when the queue has been completely worked (0 total and 0 unacked)?

1

There are 1 answers

1
Magd Kudama On

From what I understand, you want your subscriber to end if there are no pending messages in the RabbitMQ queue.

Given your second script, you could avoid passing block: true, and that will return nothing when there's no more data to process. In that case, you could exit the program.

You can see that in the documentation: http://rubybunny.info/articles/queues.html#blocking_or_nonblocking_behavior

By default it's non-blocking.