Problem
I am using boost::asio for a project where two processes on the same machine communicate using TCP/IP. One generates data to be read by the other, but I am encountering a problem where intermittently no data is being sent through the connection. I've boiled this down to a very simple example below, based on the async tcp echo server example.
The processes (source code below) start out fine, delivering data at a fast rate from the sender to the receiver. Then all of a sudden, no data at all is delivered for about five seconds. Then data is delivered again until the next inexplicable pause. During these five seconds, the processes eat 0% CPU and no other processes seem to do anything in particular. The pause is always the same length - five seconds.
I am trying to figure out how to get rid of these stalls and what causes them.
CPU usage during an entire run:
Notice how there are three dips of CPU usage in the middle of the run - a "run" is a single invocation of the server process and the client process. During these dips, no data was delivered. The number of dips and their timing differs between runs - some times no dips at all, some times many.
I am able to affect the "probability" of these stalls by changing the size of the read buffer - for instance if I make the read buffer a multiple of the send chunk size it appears that this problem almost goes away, but not entirely.
Source and test description
I've compiled the below code with Visual Studio 2005, using Boost 1.43 and Boost 1.45. I have tested on Windows Vista 64 bit (on a quad-core) and Windows 7 64 bit (on both a quad-core and a dual-core).
The server accepts a connection and then simply reads and discards data. Whenever a read is performed a new read is issued.
The client connects to the server, then puts a bunch of packets into a send queue. After this it writes the packets one at the time. Whenever a write has completed, the next packet in the queue is written. A separate thread monitors the queue size and prints this to stdout every second. During the io stalls, the queue size remains exactly the same.
I have tried to used scatter io (writing multiple packets in one system call), but the result is the same. If I disable IO completion ports in Boost using BOOST_ASIO_DISABLE_IOCP
, the problem appears to go away but at the price of significantly lower throughput.
// Example is adapted from async_tcp_echo_server.cpp which is
// Copyright (c) 2003-2010 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Start program with -s to start as the server
#ifndef _WIN32_WINNT
#define _WIN32_WINNT 0x0501
#endif
#include <iostream>
#include <tchar.h>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/thread.hpp>
#define PORT "1234"
using namespace boost::asio::ip;
using namespace boost::system;
class session {
public:
session(boost::asio::io_service& io_service) : socket_(io_service) {}
void do_read() {
socket_.async_read_some(boost::asio::buffer(data_, max_length),
boost::bind(&session::handle_read, this, _1, _2));
}
boost::asio::ip::tcp::socket& socket() { return socket_; }
protected:
void handle_read(const error_code& ec, size_t bytes_transferred) {
if (!ec) {
do_read();
} else {
delete this;
}
}
private:
tcp::socket socket_;
enum { max_length = 1024 };
char data_[max_length];
};
class server {
public:
explicit server(boost::asio::io_service& io_service)
: io_service_(io_service)
, acceptor_(io_service, tcp::endpoint(tcp::v4(), atoi(PORT)))
{
session* new_session = new session(io_service_);
acceptor_.async_accept(new_session->socket(),
boost::bind(&server::handle_accept, this, new_session, _1));
}
void handle_accept(session* new_session, const error_code& ec) {
if (!ec) {
new_session->do_read();
new_session = new session(io_service_);
acceptor_.async_accept(new_session->socket(),
boost::bind(&server::handle_accept, this, new_session, _1));
} else {
delete new_session;
}
}
private:
boost::asio::io_service& io_service_;
boost::asio::ip::tcp::acceptor acceptor_;
};
class client {
public:
explicit client(boost::asio::io_service &io_service)
: io_service_(io_service)
, socket_(io_service)
, work_(new boost::asio::io_service::work(io_service))
{
io_service_.post(boost::bind(&client::do_init, this));
}
~client() {
packet_thread_.join();
}
protected:
void do_init() {
// Connect to the server
tcp::resolver resolver(io_service_);
tcp::resolver::query query(tcp::v4(), "localhost", PORT);
tcp::resolver::iterator iterator = resolver.resolve(query);
socket_.connect(*iterator);
// Start packet generation thread
packet_thread_.swap(boost::thread(
boost::bind(&client::generate_packets, this, 8000, 5000000)));
}
typedef std::vector<unsigned char> packet_type;
typedef boost::shared_ptr<packet_type> packet_ptr;
void generate_packets(long packet_size, long num_packets) {
// Add a single dummy packet multiple times, then start writing
packet_ptr buf(new packet_type(packet_size, 0));
write_queue_.insert(write_queue_.end(), num_packets, buf);
queue_size = num_packets;
do_write_nolock();
// Wait until all packets are sent.
while (long queued = InterlockedExchangeAdd(&queue_size, 0)) {
std::cout << "Queue size: " << queued << std::endl;
Sleep(1000);
}
// Exit from run(), ignoring socket shutdown
work_.reset();
}
void do_write_nolock() {
const packet_ptr &p = write_queue_.front();
async_write(socket_, boost::asio::buffer(&(*p)[0], p->size()),
boost::bind(&client::on_write, this, _1));
}
void on_write(const error_code &ec) {
if (ec) { throw system_error(ec); }
write_queue_.pop_front();
if (InterlockedDecrement(&queue_size)) {
do_write_nolock();
}
}
private:
boost::asio::io_service &io_service_;
tcp::socket socket_;
boost::shared_ptr<boost::asio::io_service::work> work_;
long queue_size;
std::list<packet_ptr> write_queue_;
boost::thread packet_thread_;
};
int _tmain(int argc, _TCHAR* argv[]) {
try {
boost::asio::io_service io_svc;
bool is_server = argc > 1 && 0 == _tcsicmp(argv[1], _T("-s"));
std::auto_ptr<server> s(is_server ? new server(io_svc) : 0);
std::auto_ptr<client> c(is_server ? 0 : new client(io_svc));
io_svc.run();
} catch (std::exception& e) {
std::cerr << "Exception: " << e.what() << "\n";
}
return 0;
}
So my question is basically:
How do I get rid of these stalls?
What causes this to happen?
Update: There appears to be some correlation with disk activity contrary to what I stated above, so it appears that if I start a large directory copy on the disk while the test is running this might increase the frequency of the io stalls. This could indicate that this is the Windows IO Prioritization that kicks in? Since the pauses are always the same length, that does sound somewhat like a timeout somewhere in the OS io code...