I am using grpcio version 1.48.2
My application is essentially a collection of gRPC servers on different docker containers, with 10 max workers in the thread-pool.
Recently I started experiencing an Assertion Error which causes the gRPC server to shut down:
[E0218 1000 ev_epoll1_linux.cc:1142] ASSERTION FAILED: next_worker->state == KICKED
This error occurred right after a gRPC call had finished executing successfully. (it finished successfully, because I have a logger print at the end which indicates success.)
I tried looking deeply into the error, i also retrieved the actual function that the assertion failed on in the gRPCio source code:
(The assertion failed in line 1142~, which means it failed inside this function:
static grpc_error_handle pollset_kick(grpc_pollset* pollset,
grpc_pollset_worker* specific_worker) {
GPR_TIMER_SCOPE("pollset_kick", 0);
GRPC_STATS_INC_POLLSET_KICK();
grpc_error_handle ret_err = GRPC_ERROR_NONE;
if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
std::vector<std::string> log;
log.push_back(absl::StrFormat(
"PS:%p KICK:%p curps=%p curworker=%p root=%p", pollset, specific_worker,
static_cast<void*>(g_current_thread_pollset),
static_cast<void*>(g_current_thread_worker), pollset->root_worker));
if (pollset->root_worker != nullptr) {
log.push_back(absl::StrFormat(
" {kick_state=%s next=%p {kick_state=%s}}",
kick_state_string(pollset->root_worker->state),
pollset->root_worker->next,
kick_state_string(pollset->root_worker->next->state)));
}
if (specific_worker != nullptr) {
log.push_back(absl::StrFormat(" worker_kick_state=%s",
kick_state_string(specific_worker->state)));
}
gpr_log(GPR_DEBUG, "%s", absl::StrJoin(log, "").c_str());
}
if (specific_worker == nullptr) {
if (g_current_thread_pollset != pollset) {
grpc_pollset_worker* root_worker = pollset->root_worker;
if (root_worker == nullptr) {
GRPC_STATS_INC_POLLSET_KICKED_WITHOUT_POLLER();
pollset->kicked_without_poller = true;
if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
gpr_log(GPR_INFO, " .. kicked_without_poller");
}
goto done;
}
grpc_pollset_worker* next_worker = root_worker->next;
if (root_worker->state == KICKED) {
GRPC_STATS_INC_POLLSET_KICKED_AGAIN();
if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
gpr_log(GPR_INFO, " .. already kicked %p", root_worker);
}
SET_KICK_STATE(root_worker, KICKED);
goto done;
} else if (next_worker->state == KICKED) {
GRPC_STATS_INC_POLLSET_KICKED_AGAIN();
if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
gpr_log(GPR_INFO, " .. already kicked %p", next_worker);
}
SET_KICK_STATE(next_worker, KICKED);
goto done;
} else if (root_worker == next_worker && // only try and wake up a poller
// if there is no next worker
root_worker ==
reinterpret_cast<grpc_pollset_worker*>(
gpr_atm_no_barrier_load(&g_active_poller))) {
GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD();
if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
gpr_log(GPR_INFO, " .. kicked %p", root_worker);
}
SET_KICK_STATE(root_worker, KICKED);
ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
goto done;
} else if (next_worker->state == UNKICKED) {
GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV();
if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
gpr_log(GPR_INFO, " .. kicked %p", next_worker);
}
GPR_ASSERT(next_worker->initialized_cv);
SET_KICK_STATE(next_worker, KICKED);
gpr_cv_signal(&next_worker->cv);
goto done;
} else if (next_worker->state == DESIGNATED_POLLER) {
if (root_worker->state != DESIGNATED_POLLER) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
gpr_log(
GPR_INFO,
" .. kicked root non-poller %p (initialized_cv=%d) (poller=%p)",
root_worker, root_worker->initialized_cv, next_worker);
}
SET_KICK_STATE(root_worker, KICKED);
if (root_worker->initialized_cv) {
GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV();
gpr_cv_signal(&root_worker->cv);
}
goto done;
} else {
GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD();
if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
gpr_log(GPR_INFO, " .. non-root poller %p (root=%p)", next_worker,
root_worker);
}
SET_KICK_STATE(next_worker, KICKED);
ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
goto done;
}
} else {
GRPC_STATS_INC_POLLSET_KICKED_AGAIN();
GPR_ASSERT(next_worker->state == KICKED);
SET_KICK_STATE(next_worker, KICKED);
goto done;
}
} else {
GRPC_STATS_INC_POLLSET_KICK_OWN_THREAD();
if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
gpr_log(GPR_INFO, " .. kicked while waking up");
}
goto done;
}
GPR_UNREACHABLE_CODE(goto done);
}
if (specific_worker->state == KICKED) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
gpr_log(GPR_INFO, " .. specific worker already kicked");
}
goto done;
} else if (g_current_thread_worker == specific_worker) {
GRPC_STATS_INC_POLLSET_KICK_OWN_THREAD();
if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
gpr_log(GPR_INFO, " .. mark %p kicked", specific_worker);
}
SET_KICK_STATE(specific_worker, KICKED);
goto done;
} else if (specific_worker ==
reinterpret_cast<grpc_pollset_worker*>(
gpr_atm_no_barrier_load(&g_active_poller))) {
GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD();
if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
gpr_log(GPR_INFO, " .. kick active poller");
}
SET_KICK_STATE(specific_worker, KICKED);
ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
goto done;
} else if (specific_worker->initialized_cv) {
GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV();
if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
gpr_log(GPR_INFO, " .. kick waiting worker");
}
SET_KICK_STATE(specific_worker, KICKED);
gpr_cv_signal(&specific_worker->cv);
goto done;
} else {
GRPC_STATS_INC_POLLSET_KICKED_AGAIN();
if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
gpr_log(GPR_INFO, " .. kick non-waiting worker");
}
SET_KICK_STATE(specific_worker, KICKED);
goto done;
}
done:
return ret_err;
}
I don't have any good idea about how to debug it further, to understand what is going on here - I don't intend to change GRPCIO source code nor blame it, but could it be grpcio library bug?
Would love any help / advice/ some shed light on what is wrong with this assertion, why it happened? what could cause this?
Thanks so much in advance, Yves.