I am using alanxz's RabbitMQ C master. The link is as follows: https://github.com/alanxz/rabbitmq-c
In the amqp_sendstring.c and amqp_listen.c examples, when I shut down the amqp_listen.c and still continue sending strings using amqp_sendstring.c, the messages are getting lost. So when i restart amqp_listen.c, it is not receiving the messages. I want the amqp_listen.c to receive the messages sent by amqp_sendstring.c while the former was inactive. What do I do?
amqp_sendstring.c:
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <stdint.h>
#include <amqp_tcp_socket.h>
#include <amqp.h>
#include <amqp_framing.h>
#include "utils.h"
int main(int argc, char const *const *argv)
{
char const *hostname;
int port, status;
char const *exchange;
char const *routingkey;
char const *messagebody;
amqp_socket_t *socket = NULL;
amqp_connection_state_t conn;
if (argc < 6) {
fprintf(stderr, "Usage: amqp_sendstring host port exchange routingkey messagebody\n");
return 1;
}
hostname = argv[1];
port = atoi(argv[2]);
exchange = argv[3];
routingkey = argv[4];
messagebody = argv[5];
conn = amqp_new_connection();
socket = amqp_tcp_socket_new(conn);
if (!socket) {
die("creating TCP socket");
}
status = amqp_socket_open(socket, hostname, port);
if (status) {
die("opening TCP socket");
}
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
"Logging in");
amqp_channel_open(conn, 1);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
{
amqp_basic_properties_t props;
props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
props.content_type = amqp_cstring_bytes("text/plain");
props.delivery_mode = 2; /* persistent delivery mode */
die_on_error(amqp_basic_publish(conn,
1,
amqp_cstring_bytes(exchange),
amqp_cstring_bytes(routingkey),
0,
0,
&props,
amqp_cstring_bytes(messagebody)),
"Publishing");
}
die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");
die_on_error(amqp_destroy_connection(conn), "Ending connection");
return 0;
}
amqp_listen.c:
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <stdint.h>
#include <amqp_tcp_socket.h>
#include <amqp.h>
#include <amqp_framing.h>
#include <assert.h>
#include "utils.h"
int main(int argc, char const *const *argv)
{
char const *hostname;
int port, status;
char const *exchange;
char const *bindingkey;
amqp_socket_t *socket = NULL;
amqp_connection_state_t conn;
amqp_bytes_t queuename;
if (argc < 5) {
fprintf(stderr, "Usage: amqp_listen host port exchange bindingkey\n");
return 1;
}
hostname = argv[1];
port = atoi(argv[2]);
exchange = argv[3];
bindingkey = argv[4];
conn = amqp_new_connection();
socket = amqp_tcp_socket_new(conn);
if (!socket) {
die("creating TCP socket");
}
status = amqp_socket_open(socket, hostname, port);
if (status) {
die("opening TCP socket");
}
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
"Logging in");
amqp_channel_open(conn, 1);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
{
amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, amqp_empty_bytes, 0, 0, 0, 1,
amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue");
queuename = amqp_bytes_malloc_dup(r->queue);
if (queuename.bytes == NULL) {
fprintf(stderr, "Out of memory while copying queue name");
return 1;
}
}
amqp_queue_bind(conn, 1, queuename, amqp_cstring_bytes(exchange), amqp_cstring_bytes(bindingkey),
amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding queue");
amqp_basic_consume(conn, 1, queuename, amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming");
{
for (;;) {
amqp_rpc_reply_t res;
amqp_envelope_t envelope;
amqp_maybe_release_buffers(conn);
res = amqp_consume_message(conn, &envelope, NULL, 0);
if (AMQP_RESPONSE_NORMAL != res.reply_type) {
break;
}
printf("Delivery %u, exchange %.*s routingkey %.*s\n",
(unsigned) envelope.delivery_tag,
(int) envelope.exchange.len, (char *) envelope.exchange.bytes,
(int) envelope.routing_key.len, (char *) envelope.routing_key.bytes);
if (envelope.message.properties._flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {
printf("Content-type: %.*s\n",
(int) envelope.message.properties.content_type.len,
(char *) envelope.message.properties.content_type.bytes);
}
printf("----\n");
amqp_dump(envelope.message.body.bytes, envelope.message.body.len);
amqp_destroy_envelope(&envelope);
}
}
die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");
die_on_error(amqp_destroy_connection(conn), "Ending connection");
return 0;
}
Please suggest how do I retrieve lost messages in the correct order.
The queue that is declared by the
amqp_listen.c
is declared with theauto_delete
flag set. That means that the queue will be deleted when the consumer on the queue is terminated (usually by the underlying connection being closed). If you would like the queue to persist set theauto_delete
parameter in theamqp_queue_declare
to 0.Additionally when the queue name isn't specified (e.g., using
amqp_empty_bytes
for thequeue
parameter), the broker creates a new unique queue name each time you call that. Change theamqp_queue_declare
function to use the same queue name each time you call it.See also: https://www.rabbitmq.com/amqp-0-9-1-reference.html#queue.declare