Declare a queue with x-max-length programmatically using Rabbitmq-c

321 views Asked by At

I am implementing a RPC function for my C application , and try to programmatically declare a queue which limits maximum number of pending messages, after reading the declaration of amqp_table_entry_t and amqp_field_value_t in amqp.h , here's my minimal code sample :

int default_channel_id = 1;
int passive = 0;
int durable = 1;
int exclusive = 0;
int auto_delete = 0;

amqp_table_entry_t  *q_arg_n_elms = malloc(sizeof(amqp_table_entry_t));
*q_arg_n_elms = (amqp_table_entry_t) {.key = amqp_cstring_bytes("x-max-length"),
             .value = {.kind = AMQP_FIELD_KIND_U32, .value = {.u32 = 234 }}};

amqp_table_t  q_arg_table = {.num_entries=1, .entries=q_arg_n_elms};

amqp_queue_declare( conn, default_channel_id, amqp_cstring_bytes("my_queue_123"),
                passive, durable, exclusive, auto_delete, q_arg_table );
amqp_rpc_reply_t   _reply = amqp_get_rpc_reply(conn);

The code above always returns AMQP_RESPONSE_LIBRARY_EXCEPTION in the object of amqp_rpc_reply_t, with error message a socket error occurred , I don't see any active connection triggered by this code in web management UI of the RabbitMQ. so I think rabbitmq-c library doesn't establish a connection and just reply with error.

However everything works perfectly when I replace the argument q_arg_table with default amqp_empty_table (which means no argument).

Here are my questions :

  • Where can I find the code which filter the invalid key of the queue argument ? according to this article , x-max-length should be correct argument key for limiting number of messages in a queue , but I cannot figure out why the library still reports error.
  • Is there any example that demonstrates how to properly set up amqp_table_t passing in amqp_queue_declare(...) ?

Development environment :

  • RabbitMQ v3.2.4
  • rabbitmq-c v0.11.0

Appreciate any feedback , thanks for reading.

[Edit]

According to the server log [email protected], RabbitMQ broker accepted a new connection, found decode error on receiving frame, then close connection immediately. I haven't figured out the Erlang implementation but the root cause is likely the decoding error on the table argument when declaring the queue.

131 =CRASH REPORT==== 18-May-2022::16:05:46 ===
132   crasher:
133     initial call: rabbit_reader:init/2
134     pid: <0.23706.1>
135     registered_name: []                                                                                                                      
136     exception error: no function clause matching 
137                      rabbit_binary_parser:parse_field_value(<<105,0,0,1,44>>) (src/rabbit_binary_parser.erl, line 53)
138       in function  rabbit_binary_parser:parse_table/1 (src/rabbit_binary_parser.erl, line 44)
139       in call from rabbit_framing_amqp_0_9_1:decode_method_fields/2 (src/rabbit_framing_amqp_0_9_1.erl, line 791)
140       in call from rabbit_command_assembler:process/2 (src/rabbit_command_assembler.erl, line 85)
141       in call from rabbit_reader:process_frame/3 (src/rabbit_reader.erl, line 688)
142       in call from rabbit_reader:handle_input/3 (src/rabbit_reader.erl, line 738)
143       in call from rabbit_reader:recvloop/2 (src/rabbit_reader.erl, line 292)
144       in call from rabbit_reader:run/1 (src/rabbit_reader.erl, line 273)
145     ancestors: [<0.23704.1>,rabbit_tcp_client_sup,rabbit_sup,<0.145.0>]
146     messages: [{'EXIT',#Port<0.31561>,normal}]
147     links: [<0.23704.1>]
148     dictionary: [{{channel,1},
149                    {<0.23720.1>,{method,rabbit_framing_amqp_0_9_1}}},
150                   {{ch_pid,<0.23720.1>},{1,#Ref<0.0.20.156836>}}]
151     trap_exit: true
152     status: running
153     heap_size: 2586
154     stack_size: 27
155     reductions: 2849
156   neighbours:

1

There are 1 answers

1
alanxz On BEST ANSWER

RabbitMQ may not support unsigned integers as table values.

Instead try using a signed 32 or 64-bit number (e.g., .value = {.kind = AMQP_FIELD_KIND_I32, .value = {.i32 = 234 }}).

Also the RabbitMQ server logs may contain additional debugging information that can help understand errors like this as well as the amqp_error_string2 function can be used to translate error-code into an error-string.