I fill up my queue, check it has the right number of tasks to work and then have workers in parallell set to prefetch(1)
to ensure each just takes one task at a time.
I want each worker to work its task, send a manual acknowledgement, and keep working taking from the queue if there is more work.
If there is not more work, i.e. the queue is empty, I want the worker script to finish up and return(0)
.
So, this is what I have now:
require 'bunny'
connection = Bunny.new("amqp://my_conn")
connection.start
channel = connection.create_channel
queue = channel.queue('my_queue_name')
channel.prefetch(1)
puts ' [*] Waiting for messages.'
begin
payload = 'init'
until queue.message_count == 0
puts "worker working queue length is #{queue.message_count}"
_delivery_info, _properties, payload = queue.pop
unless payload.nil?
puts " [x] Received #{payload}"
raise "payload invalid" unless payload[/cucumber/]
begin
do_stuff(payload)
rescue => e
puts "Error running #{payload}: #{e.backtrace.join('\n')}"
#failing stuff
end
end
puts " [x] Done with #{payload}"
end
puts "done with queue"
connection.close
exit(0)
ensure
connection.close
end
I want to still make sure I am done when the queue is empty. This is the example from the RabbitMQ site... https://www.rabbitmq.com/tutorials/tutorial-two-ruby.html . It has a number of things we want for our work queue, most importantly manual acknowledgements. But it does not stop running and I need that to happen programmatically when the queue is done:
#!/usr/bin/env ruby
require 'bunny'
connection = Bunny.new(automatically_recover: false)
connection.start
channel = connection.create_channel
queue = channel.queue('task_queue', durable: true)
channel.prefetch(1)
puts ' [*] Waiting for messages. To exit press CTRL+C'
begin
queue.subscribe(manual_ack: true, block: true) do |delivery_info, _properties, body|
puts " [x] Received '#{body}'"
# imitate some work
sleep body.count('.').to_i
puts ' [x] Done'
channel.ack(delivery_info.delivery_tag)
end
rescue Interrupt => _
connection.close
end
How can this script be adapted to exit out when the queue has been completely worked (0 total and 0 unacked)?
From what I understand, you want your subscriber to end if there are no pending messages in the RabbitMQ queue.
Given your second script, you could avoid passing
block: true
, and that will return nothing when there's no more data to process. In that case, you could exit the program.You can see that in the documentation: http://rubybunny.info/articles/queues.html#blocking_or_nonblocking_behavior
By default it's non-blocking.