I'm working on implementing a Client and Server proxy system where the proxy is responsible for receiving commands from the client and forwarding them to the server, and vice versa. However, I'm facing a challenge in accessing the client socket from the server instance to forward messages to the client[and vice versa]. I'm currently learning about Boost.Asio behavior. Could you suggest the most effective approach for sharing the socket between the server and client instances? Below is the current implementation
In cmd_handler::read_cmd_done API - client_server_.is_open() is 0 but not 1.
#pragma once
#ifndef __OAMIP_PROXY_H__
#define __OAMIP_PROXY_H__
#include <iostream>
#include <boost/asio.hpp>
#include <thread>
#include <vector>
#include <functional>
#include <deque>
#include "config.h"
namespace asio = boost::asio;
class cmd_handler: public std::enable_shared_from_this<cmd_handler>
{
public:
cmd_handler(asio::io_context &io_context, AppConfig* appConfig);
~cmd_handler();
asio::ip::tcp::socket &socket();
asio::ip::tcp::socket &c_socket();
asio::ip::tcp::endpoint remote_endpoint();
asio::ip::tcp::endpoint c_remote_endpoint();
asio::io_context &m_io_context();
void start();
void read_cmd();
void read_cmd_done(boost::system::error_code const &ec, std::size_t bytes_transferred);
void read_cmd_client();
void read_cmd_client_done(boost::system::error_code const &ec, std::size_t bytes_transferred);
private:
asio::io_context& io_context_;
asio::ip::tcp::socket server_socket_;
asio::ip::tcp::socket client_socket_;
asio::io_context::strand write_strand_;
asio::streambuf in_packet_;
std::deque<std::string> send_cmd_queue;
std::mutex queue_mutex_; // Added for thread safety
AppConfig *app_config;
};
class ProxyServer
{
using shared_handler_t = std::shared_ptr<cmd_handler>;
public:
ProxyServer(int thread_count, AppConfig* appConfig);;
~ProxyServer();
void start_server(std::string ip_addr, int port);
void start_client(std::string ip_addr, int port);
void handle_new_connection(shared_handler_t handler, boost::system::error_code const &ec);
private:
asio::io_context io_context_;
int thread_count_;
asio::ip::tcp::acceptor acceptor_;
asio::ip::tcp::resolver resolver_;
std::vector<std::thread> thread_pool_;
asio::streambuf buffer_;
AppConfig *app_config;
};
#endif // __OAMIP_PROXY_H__
nagarajans1@PA168951:~/Projects/OpenAMIPProxy$ cat proxy.cpp
#include "proxy.h"
#include "parser.cpp"
#include "send.cpp"
ProxyServer::ProxyServer(int thread_count,
AppConfig *appconfig)
: thread_count_(thread_count),
acceptor_(io_context_),
resolver_(io_context_),
thread_pool_(),
app_config(appconfig)
{
}
/*******************************************************************************************/
ProxyServer::~ProxyServer()
{
// Stop and join the io_context to prevent memory leaks
io_context_.stop();
for (auto &thread : thread_pool_)
{
thread.join();
}
}
/*******************************************************************************************/
void ProxyServer::start_server(std::string ip_addr, int port)
{
std::cout << "Starting Server, IP:" << ip_addr << ", Port:" << port << std::endl;
auto handler = std::make_shared<cmd_handler>(io_context_, app_config);
asio::ip::address_v4 ipv4_address = asio::ip::address_v4::from_string(ip_addr);
asio::ip::tcp::endpoint endpoint(ipv4_address, port);
acceptor_.open(endpoint.protocol());
acceptor_.set_option(asio::ip::tcp::acceptor::reuse_address(true));
acceptor_.bind(endpoint);
acceptor_.listen();
std::cout << "Start listening" << std::endl;
try
{
acceptor_.async_accept(handler->socket(), [=](auto ec)
{
if(ec)
std::cerr << "Error accepting connection from : " << ec.message() << std::endl;
else
handle_new_connection(handler, ec); });
}
catch (const std::exception &e)
{
std::cerr << "Exception caught: " << e.what() << std::endl;
}
io_context_.run();
// start pool of threads to process the asio events
/* for (int i = 0; i < thread_count_; ++i)
{
thread_pool_.emplace_back([=]
{ io_context_.run(); });
}
for (auto &thread : thread_pool_)
{
thread.join();
} */
}
void ProxyServer::start_client(std::string ip_addr, int port)
{
std::cout << "Starting client" << std::endl;
auto handler = std::make_shared<cmd_handler>(io_context_, app_config);
asio::ip::tcp::resolver::query query(ip_addr, std::to_string(port));
asio::ip::tcp::resolver::iterator endpoint_iterator = resolver_.resolve(query);
asio::connect(handler->c_socket(), endpoint_iterator);
std::cout << "Connected to server on port " << port << std::endl;
handler->read_cmd_client();
io_context_.run();
}
/*******************************************************************************************/
void ProxyServer::handle_new_connection(shared_handler_t handler, boost::system::error_code const &ec)
{
std::cout << "Handle connection" << std::endl;
if (ec)
{
std::cerr << "Error accepting connection from client: " << ec.message() << std::endl;
return;
}
handler->read_cmd();
auto new_handler = std::make_shared<cmd_handler>(io_context_, app_config);
acceptor_.async_accept(new_handler->socket(), [=](auto ec)
{ handle_new_connection(new_handler, ec); });
}
/*******************************************************************************************/
cmd_handler::cmd_handler(asio::io_context &io_context, AppConfig *appConfig)
: io_context_(io_context), server_socket_(io_context), client_socket_(io_context),write_strand_(io_context), app_config(appConfig)
{
}
/*******************************************************************************************/
cmd_handler::~cmd_handler()
{
// Explicitly clear the buffer to release the allocated memory
in_packet_.consume(in_packet_.size());
}
/*******************************************************************************************/
asio::ip::tcp::socket &cmd_handler::socket()
{
return server_socket_;
}
asio::ip::tcp::socket &cmd_handler::c_socket()
{
return client_socket_;
}
asio::ip::tcp::endpoint cmd_handler::remote_endpoint()
{
return server_socket_.remote_endpoint();
}
asio::ip::tcp::endpoint cmd_handler::c_remote_endpoint()
{
return client_socket_.remote_endpoint();
}
asio::io_context &cmd_handler::m_io_context()
{
return io_context_;
}
/*******************************************************************************************/
void cmd_handler::start()
{
read_cmd();
}
/*******************************************************************************************/
void cmd_handler::read_cmd()
{
auto remote_ip = remote_endpoint().address().to_string();
std::cout << "Read command from " << remote_ip << std::endl;
asio::async_read_until(server_socket_,
in_packet_,
'\n',
[me = shared_from_this()](boost::system::error_code const &ec, std::size_t bytes_xfer)
{
if (ec == asio::error::eof)
{
std::cout << "Connection closed by client:" << std::endl;
return; // No need to read further; the connection is closed.
}
else if (ec)
{
std::cerr << "Error in async_read_until: " << ec.message() << std::endl;
return;
}
else
{
me->read_cmd_done(ec, bytes_xfer);
}
});
}
void cmd_handler::read_cmd_client()
{
std::cout << "Read cmd client" << std::endl;
auto remote_ip = c_remote_endpoint().address().to_string();
std::cout << client_socket_.is_open() << std::endl;
std::cout << "Read command from " << remote_ip << std::endl;
asio::async_read_until(client_socket_,
in_packet_,
'\n',
[me = shared_from_this()](boost::system::error_code const &ec, std::size_t bytes_xfer)
{
if (ec == asio::error::eof)
{
std::cout << "Connection closed by client:" << std::endl;
return; // No need to read further; the connection is closed.
}
else if (ec)
{
std::cerr << "Error in async_read_until: " << ec.message() << std::endl;
return;
}
else
{
me->read_cmd_client_done(ec, bytes_xfer);
}
});
}
void cmd_handler::read_cmd_client_done(boost::system::error_code const &ec, std::size_t bytes_transferred)
{
auto remote_ip = remote_endpoint().address().to_string();
if (ec == asio::error::eof)
{
std::cout << "Connection closed by client:" << remote_ip << std::endl;
return; // No need to read further; the connection is closed.
}
else if (ec)
{
std::cerr << "Error accepting packet from the client: " << remote_ip << "," << ec.message() << std::endl;
return;
}
std::string command(buffers_begin(in_packet_.data()), buffers_begin(in_packet_.data()) + bytes_transferred);
in_packet_.consume(bytes_transferred);
std::cout << "Connected server IP: " << remote_ip << std::endl;
std::cout << "command:" << command << std::endl;
Parser parser(app_config);
std::string recv_cmd = parser.process_cmd(command);
//CmdSender sender(server_socket_);
//sender.send_cmd(recv_cmd);
read_cmd();
};
/*******************************************************************************************/
void cmd_handler::read_cmd_done(boost::system::error_code const &ec, std::size_t bytes_transferred)
{
auto remote_ip = remote_endpoint().address().to_string();
if (ec == asio::error::eof)
{
std::cout << "Connection closed by client:" << remote_ip << std::endl;
return; // No need to read further; the connection is closed.
}
else if (ec)
{
std::cerr << "Error accepting packet from the client: " << remote_ip << "," << ec.message() << std::endl;
return;
}
std::string command(buffers_begin(in_packet_.data()), buffers_begin(in_packet_.data()) + bytes_transferred);
in_packet_.consume(bytes_transferred);
std::cout << "Connected client IP: " << remote_ip << std::endl;
std::cout << "command:" << command << std::endl;
Parser parser(app_config);
std::string recv_cmd = parser.process_cmd(command);
std::cout << client_socket_.is_open() <<c_socket().is_open() << socket().is_open()<< std::endl;
CmdSender sender(client_socket_);
sender.send_cmd(recv_cmd);
read_cmd();
};
/*******************************************************************************************/
int main() {
try
{
int thread_count = 1;
int port = appConfig.serverConfig.port;
int s_port = appConfig.clientConfig.port;
std::vector<std::thread> thread_pool;
std::string ip_addr = appConfig.serverConfig.ip;
// start pool of threads to process the asio events
for (int i = 0; i < thread_count; ++i)
{
thread_pool.emplace_back([&]()
{ proxy_server.start_server(ip_addr, port); });
}
for (int i = 0; i < thread_count; ++i)
{
thread_pool.emplace_back([&]()
{ proxy_server.start_client(ip_addr, s_port);});
}
for (auto &thread : thread_pool)
{
thread.join();
}
}
catch (const std::exception &e)
{
std::cerr << "Exception caught: " << e.what() << std::endl;
}
return 0;
}
Mutexes don't do anything unless used.
A strand synchronizes access to shared resources. Likely you just need the strand, but NOT for the "writes" but for the ... resources (IO objects and buffers used in async operations).
You're running potentially many "servers" and "clients" all on the same endpoints on multiple threads. I don't think you want that. What you probably just wanted is for the "server" to accept multiple connections. You do that by accepting more connections after the previous. You already do that at the end of
handle_new_connection:Some other notes:
That's not required. Destructors already deallocate the resources owned. Nothing needs to be done there.
Session management
Now logically each accepted client connection get its own new connection to the proxied server. There is no need to coordinate. Just move the upstream connection into the "CmdHandler" (rename it ProxySession or something while you're at it) and the problem solved itself.
I was going to edit things but it was going to take ... more than reasonable time. Instead consider looking at this recent Proxy implementation I reviewed: Tcp proxy mysql.The data received from the mysql-client is output in weird symbols It shows basically the exact same ideas.