Bunny keeps giving me an error that I believe is being sent back from RabbitMQ that the following strings are invalid date formats:
invalid:
- ISO8601 timestamp string
- milliseconds Integer
- YYYY-MM-DD string
- Unix Time or Epoch Time integer
Time.now.to_i
valid:
- Integer (gets back the requested message and everything after)
- first (string) (gets back all messages starting from first)
- last (string)
- next (string)
iso8601 = '2021-08-28T13:40:31-07:00'
opts = {
exclusive: false,
manual_ack: true,
block: true,
arguments: {
'x-stream-offset': iso8601
}
}
queue.subscribe(opts) do |delivery_info, _properties, payload|
msg = JSON.parse(payload)
puts msg
ch.ack(delivery_info.delivery_tag, false)
end
i get an error back that says its an invalid stream offset argument
below is the error message from the rake task that I am running it in
Bunny::PreconditionFailed: PRECONDITION_FAILED - invalid arg 'x-stream-offset' for queue 'stream_test' in vhost '/': {invalid_stream_offset_arg,{longstr,<<"2021-08-28T13:40:31-07:00">>}}
/Users/aronlilland/.rvm/gems/ruby-2.4.1/gems/bunny-2.18.0/lib/bunny/channel.rb:2014:in `raise_if_channel_close!'
/Users/aronlilland/.rvm/gems/ruby-2.4.1/gems/bunny-2.18.0/lib/bunny/channel.rb:944:in `basic_consume_with'
/Users/aronlilland/.rvm/gems/ruby-2.4.1/gems/bunny-2.18.0/lib/bunny/queue.rb:191:in `subscribe'
/Users/aronlilland/Documents/dev/bin/random/rabbit_mq_stream/lib/stream/lib/read.rb:39:in `read'
/Users/aronlilland/Documents/dev/bin/random/rabbit_mq_stream/tasks/read.rake:9:in `block in <top (required)>'
/Users/aronlilland/.rvm/gems/ruby-2.4.1/gems/rake-12.3.3/exe/rake:27:in `<top (required)>'
/Users/aronlilland/.rvm/gems/ruby-2.4.1/bin/ruby_executable_hooks:22:in `eval'
/Users/aronlilland/.rvm/gems/ruby-2.4.1/bin/ruby_executable_hooks:22:in `<main>'
--
below is a working example connecting to rabbitMQ as a stream (without the binary protocol) - the streaming plugin is required
# frozen_string_literal: true
require 'bunny'
require 'json'
require 'pp'
rabbit_user = 'guest'
rabbit_pass = 'guest'
rabbit_host = 'localhost:5672'
conn = Bunny.new(
"amqp://#{rabbit_user}:#{rabbit_pass}@#{rabbit_host}",
client_properties: { connection_name: :stream }
)
conn.start
ch = conn.create_channel(nil, 16)
queue = ch.queue(
'stream_test',
durable: true,
auto_delete: false,
exclusive: false,
arguments: {
'x-queue-type': 'stream',
'x-max-length-bytes': 500_000_000
}
)
50000.times do |i|
queue.publish(JSON.dump({ hello: "world #{i + 1}" }), routing_key: 'stream_test')
puts "published #{i + 1}"
end
ch.basic_qos(25)
opts = {
exclusive: false,
manual_ack: true,
## block will make it consume the main IO instead of being a seperate thread
## it is not recommended in production
block: true,
arguments: {
'x-stream-offset': 'first'
}
}
queue.subscribe(opts) do |delivery_info, _properties, payload|
msg = JSON.parse(payload)
puts msg
ch.ack(delivery_info.delivery_tag, false)
end
puts 'done'
sleep 1
conn.close
the Java client examples show that it should be able to accept a timestamp, but it doesnt appear that I am able to send one, is there an accepted date format?
the Java client Docs for the streaming feature say
Timestamp - a timestamp value specifying the point in time to attach to the log at. It will clamp to the closest offset, if the timestamp is out of range for the stream it will clamp either the start or end of the log respectively. With AMQP 0.9.1, the timestamp used is POSIX time with an accuracy of one second, that is the number of seconds since 00:00:00 UTC, 1970-01-01.
I can find no examples of what a POSIX timestamp looks like
After looking at the Erlang RabbitMQ source code here, which is not shown in any of the developer docs, it appears that you can get time series data by passing in
{number_of_seconds}sto get the number of seconds or{number_of_minutes}mto get the number of minutesSo, time series data can be formatted as a string - it looks like it does indeed accept some format of time, but again it isnt clear how to get it to work, for the sake of my needs a string value is acceptable
The answer in Ruby would be to do the following: