I am implementing a RPC function for my C application , and try to programmatically declare a queue which limits maximum number of pending messages, after reading the declaration of amqp_table_entry_t
and amqp_field_value_t
in amqp.h
, here's my minimal code sample :
int default_channel_id = 1;
int passive = 0;
int durable = 1;
int exclusive = 0;
int auto_delete = 0;
amqp_table_entry_t *q_arg_n_elms = malloc(sizeof(amqp_table_entry_t));
*q_arg_n_elms = (amqp_table_entry_t) {.key = amqp_cstring_bytes("x-max-length"),
.value = {.kind = AMQP_FIELD_KIND_U32, .value = {.u32 = 234 }}};
amqp_table_t q_arg_table = {.num_entries=1, .entries=q_arg_n_elms};
amqp_queue_declare( conn, default_channel_id, amqp_cstring_bytes("my_queue_123"),
passive, durable, exclusive, auto_delete, q_arg_table );
amqp_rpc_reply_t _reply = amqp_get_rpc_reply(conn);
The code above always returns AMQP_RESPONSE_LIBRARY_EXCEPTION
in the object of amqp_rpc_reply_t
, with error message a socket error occurred
, I don't see any active connection triggered by this code in web management UI of the RabbitMQ. so I think rabbitmq-c
library doesn't establish a connection and just reply with error.
However everything works perfectly when I replace the argument q_arg_table
with default amqp_empty_table
(which means no argument).
Here are my questions :
- Where can I find the code which filter the invalid key of the queue argument ? according to this article ,
x-max-length
should be correct argument key for limiting number of messages in a queue , but I cannot figure out why the library still reports error. - Is there any example that demonstrates how to properly set up
amqp_table_t
passing inamqp_queue_declare(...)
?
Development environment :
- RabbitMQ v3.2.4
- rabbitmq-c v0.11.0
Appreciate any feedback , thanks for reading.
[Edit]
According to the server log [email protected]
, RabbitMQ broker accepted a new connection, found decode error on receiving frame, then close connection immediately. I haven't figured out the Erlang implementation but the root cause is likely the decoding error on the table argument when declaring the queue.
131 =CRASH REPORT==== 18-May-2022::16:05:46 ===
132 crasher:
133 initial call: rabbit_reader:init/2
134 pid: <0.23706.1>
135 registered_name: []
136 exception error: no function clause matching
137 rabbit_binary_parser:parse_field_value(<<105,0,0,1,44>>) (src/rabbit_binary_parser.erl, line 53)
138 in function rabbit_binary_parser:parse_table/1 (src/rabbit_binary_parser.erl, line 44)
139 in call from rabbit_framing_amqp_0_9_1:decode_method_fields/2 (src/rabbit_framing_amqp_0_9_1.erl, line 791)
140 in call from rabbit_command_assembler:process/2 (src/rabbit_command_assembler.erl, line 85)
141 in call from rabbit_reader:process_frame/3 (src/rabbit_reader.erl, line 688)
142 in call from rabbit_reader:handle_input/3 (src/rabbit_reader.erl, line 738)
143 in call from rabbit_reader:recvloop/2 (src/rabbit_reader.erl, line 292)
144 in call from rabbit_reader:run/1 (src/rabbit_reader.erl, line 273)
145 ancestors: [<0.23704.1>,rabbit_tcp_client_sup,rabbit_sup,<0.145.0>]
146 messages: [{'EXIT',#Port<0.31561>,normal}]
147 links: [<0.23704.1>]
148 dictionary: [{{channel,1},
149 {<0.23720.1>,{method,rabbit_framing_amqp_0_9_1}}},
150 {{ch_pid,<0.23720.1>},{1,#Ref<0.0.20.156836>}}]
151 trap_exit: true
152 status: running
153 heap_size: 2586
154 stack_size: 27
155 reductions: 2849
156 neighbours:
RabbitMQ may not support unsigned integers as table values.
Instead try using a signed 32 or 64-bit number (e.g.,
.value = {.kind = AMQP_FIELD_KIND_I32, .value = {.i32 = 234 }}
).Also the RabbitMQ server logs may contain additional debugging information that can help understand errors like this as well as the
amqp_error_string2
function can be used to translate error-code into an error-string.