I am trying to send an image via RabbitMQ C master by using its binary data as message and then transferring it and copying it onto another image file. My code is as follows:
Client/sender:
#include "amqp_wrapper_library.h"
int main(int argc, char const *const *argv)
{
char const *hostname;
int port;
char const *exchange;
char const *routingkey;
char const *messagebody;
char const *username;
char const *password;
amqp_connection_state_t conn;
if (argc < 7) {
fprintf(stderr, "Usage: amqp_sendstring host port exchange routingkey messagebody username password\n");
return 1;
}
hostname = argv[1];
port = atoi(argv[2]);
exchange = argv[3];
routingkey = argv[4];
username = argv[5];
password = argv[6];
int length;
FILE *image1;
image1= fopen("/home/sneha/Desktop/Sender/image1.jpg", "rb");
fseek(image1, 0, SEEK_END);
length= ftell(image1);
rewind(image1);
messagebody = (char *)malloc(sizeof(char)*(length+1));
fread(messagebody, length, 1, image1);
printf("%s",messagebody);
fclose(image1);
conn = create_messagebus_context(hostname,port,username,password);
send_message_on_messagebus(conn,exchange,routingkey,messagebody);
close_messagebus_context(conn);
return 0;
}
Server/receiver:
#include "amqp_wrapper_library.h"
int main(int argc, char const *const *argv)
{
char const *hostname;
int port;
char const *exchange;
char const *bindingkey;
char const *username;
char const *password;
amqp_connection_state_t conn;
FILE *image2;
if (argc < 7) {
fprintf(stderr, "Usage: amqp_listen host port exchange bindingkey username password\n");
return 1;
}
hostname = argv[1];
port = atoi(argv[2]);
exchange = argv[3];
bindingkey = argv[4];
username = argv[5];
password = argv[6];
conn = create_messagebus_context(hostname, port, username, password);
image2 = fopen ("/home/sneha/Desktop/Receiver/image2.jpg", "wb");
char *messagebody = receive_message_on_messagebus(conn,exchange,bindingkey);
fwrite (messagebody, 1, sizeof(messagebody), image2);
printf("%s",messagebody);
fclose(image2);
close_messagebus_context(conn);
return 0;
}
amqp_wrapper_library.c
#include "amqp_wrapper_library.h"
amqp_connection_state_t create_messagebus_context(char const *hostname,int port,char const *username,char const *password)
{
amqp_socket_t *socket = NULL;
amqp_connection_state_t conn = amqp_new_connection();
socket = amqp_tcp_socket_new(conn);
if (!socket) {
die("creating TCP socket");
}
int status = amqp_socket_open(socket, hostname, port);
if (status) {
die("opening TCP socket");
}
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, username, password), "Logging in");
amqp_channel_open(conn, 1);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
return conn;
}
void close_messagebus_context(amqp_connection_state_t conn)
{
die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");
die_on_error(amqp_destroy_connection(conn), "Ending connection");
}
void send_message_on_messagebus(amqp_connection_state_t conn,char const *exchange,char const *routingkey,char const *messagebody)
{
amqp_basic_properties_t props;
props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
props.content_type = amqp_cstring_bytes("text/plain");
props.delivery_mode = 2; /* persistent delivery mode */
die_on_error(amqp_basic_publish(conn, 1, amqp_cstring_bytes(exchange), amqp_cstring_bytes(routingkey), 0, 0, &props, amqp_cstring_bytes(messagebody)), "Publishing");
}
char* receive_message_on_messagebus(amqp_connection_state_t conn,char const *exchange,char const *bindingkey)
{
amqp_bytes_t queuename;
{
amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, amqp_empty_bytes, 0, 0, 0, 1, amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue");
queuename = amqp_bytes_malloc_dup(r->queue);
if (queuename.bytes == NULL)
{
fprintf(stderr, "Out of memory while copying queue name");
return 0;
}
}
amqp_queue_bind(conn, 1, queuename, amqp_cstring_bytes(exchange), amqp_cstring_bytes(bindingkey), amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding queue");
amqp_basic_consume(conn, 1, queuename, amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming");
{
//for (;;)
//{
amqp_rpc_reply_t res;
amqp_envelope_t envelope;
amqp_maybe_release_buffers(conn);
res = amqp_consume_message(conn, &envelope, NULL, 0);
if (AMQP_RESPONSE_NORMAL != res.reply_type)
{
return;
}
char *result = (char *)malloc((int) envelope.message.body.len);
sprintf(result,"%.*s",(int) envelope.message.body.len, (char *) envelope.message.body.bytes);
amqp_destroy_envelope(&envelope);
return result;
//}
}
}
However, the image2 is nt getting reconstructed. I checked image size, the sizes of both images do not match. What is the issue here?
Data from a JPEG file is not a c-string (null-terminated string of bytes), thus functions like
printf
andamqp_cstring_bytes
that expect a null-terminated strings cannot be used to handle the image data.Instead of using a
const char*
to hold the jpg data, use anamqp_bytes_t
struct in your code.To read from a file into an
struct amqp_bytes_t
Do the reverse to write it back out:
Edit: add sample code.