I have a simple gRPC asynchronous server. It's a also multi-thread, one thread handle one completion queue.
But the thread scalibility is very bad when the count of threads grows:
- For small messages, the best performance is with only one thread.
- For big messages, the performance is only doubled from 1 to 4 threads, and higher count of threads doesn't increase performance (on a server with 64 cores).
Is it a normal behaviour?
The server looks like this:
class Server final
{
public:
Server(const Config& config, Service& sync_service)
: m_config(config),
m_sync_service(sync_service)
{
// Nothing to do
}
~Server()
{
stop();
}
void stop()
{
m_stopped = true;
m_server->Shutdown();
for(auto& cq: m_cqs)
{
cq->Shutdown();
}
}
void run_and_wait(const std::string& grpc_address_port)
{
grpc::ServerBuilder builder;
builder.AddListeningPort(grpc_address_port, grpc::InsecureServerCredentials());
builder.RegisterService(&m_async_service);
const int num_threads = m_config.threads;
const int threads_per_cq = 1;
assert(num_threads % threads_per_cq == 0);
for(int i = 0; i < ceil_div(num_threads, threads_per_cq); i++)
{
auto& cq = m_cqs.emplace_back(builder.AddCompletionQueue());
}
// with the gRPC runtime.
// Finally assemble the server.
m_server = builder.BuildAndStart();
std::cout << "Server listening on " << grpc_address_port << std::endl;
// Proceed to the server's main loop.
std::vector<std::thread> threads;
for(int i = 0; i < num_threads; i++)
{
grpc::ServerCompletionQueue* cq = m_cqs[i / threads_per_cq].get();
for(int j = 0; j < m_config.concurrent_calldatas; j++)
{
new CallDataUnary<IMethod>(m_async_service, m_sync_service, *cq);
}
threads.emplace_back([this, cq]() {
handle_rpcs(*cq);
});
}
// Just wait all poller threads to stop
for(auto& thread: threads)
{
if(thread.joinable())
{ thread.join(); }
}
}
private:
class CallDataBase
{
public:
virtual ~CallDataBase() = default;
virtual void proceed() = 0;
virtual void wait_for_new_request() = 0;
enum class State
{
WAIT_REQUEST,
FINISH
};
protected:
State m_state{State::WAIT_REQUEST};
};
template<typename IMethod>
class CallDataUnary : public CallDataBase
{
public:
CallDataUnary(AsyncService& service, Service& sync_service, grpc::ServerCompletionQueue& cq)
: m_service(service),
m_sync_service(sync_service),
m_writer(&m_context),
m_cq(cq)
{
wait_for_request();
}
void wait_for_new_request()
{
(new CallDataUnary(m_service, m_sync_service, m_cq));
}
void wait_for_request()
{
// Equivalent of calling the RequestXXX()
IMethod::request(
m_service,
&m_context,
&m_request,
&m_writer,
&m_cq,
&m_cq,
this
);
}
void proceed() final
{
if(m_state == State::WAIT_REQUEST)
{
wait_for_new_request();
m_state = CallDataBase::State::FINISH;
// Call the business logic
grpc::Status status = IMethod::dispatch_grpc_sync(m_sync_service, &m_context, &m_request, &m_response);
// At the end to avoid data races.
m_writer.Finish(m_response, status, this);
}
else
{
GPR_ASSERT(m_state == State::FINISH);
delete this;
}
}
private:
/// The service of the RPC method that this CallData is listening to.
AsyncService& m_service;
Service& m_sync_service;
// The gRPC request for this RPC call.
grpc::ServerContext m_context;
/// The gRPC request sent by the client.
RequestCPP m_request;
/// The gRPC response to send to the client when the request is completed.
ResponseCPP m_response;
/// The writer to write the response to the client.
grpc::ServerAsyncResponseWriter<ResponseCPP> m_writer;
grpc::ServerCompletionQueue& m_cq;
};
// This can be run in multiple threads if needed.
void handle_rpcs(grpc::ServerCompletionQueue& cq)
{
void* tag; // uniquely identifies a request.
bool ok;
while(!m_stopped)
{
// Block waiting to read the next event from the completion queue. The
// event is uniquely identified by its tag, which in this case is the
// memory address of a CallData instance.
// The return value of Next should always be checked. This return value
// tells us whether there is any kind of event or m_cq is shutting down.
GPR_ASSERT(cq.Next(&tag, &ok));
auto call = static_cast<CallDataBase*>(tag);
if(ok)
{
call->proceed();
}
else
{
call->wait_for_new_request();
delete call;
}
}
}
Config m_config;
std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> m_cqs;
AsyncService m_async_service;
Service& m_sync_service;
std::unique_ptr<grpc::Server> m_server;
bool m_stopped{false};
};