I have a pool of threads using concurrently the same io_context
to run a websocket stream.
I am doing this because first, I have actually 2 websocket streams (I abstracted this because by testing it appears not to be the problem), and because I want to run other io operations aside the websocket ones, which are namely async_read
and async_write
.
Each websocket stream is using its own strand and additional locking is used to ensure that a async_read
(resp. async_write
) is not performed before another one reached the handler.
So basically:
io_context context;
std::vector<std::thread> pool(std::thread::hardware_concurrency());
...
wss(make_strand(context),ssl);
...
wss.async_read(&loop_read_handler);
...
for(auto& th:pool)
th=std::thread([&]{
try{
start_read_loop();//give work to do to each thread
context.run();
}catch(...){}
wss.close(...);//closing the websocket stream, expected to cancel all threads
context.stop();//with or without it, no change
});
for(auto& th:pool)
th.join();//hangs here since the other threads did not return from run()
When I want the program to stop, I close(boost::beast::websocket::close_code::normal,ec)
the stream, which effectively cancels the io operations in the current thread (empty message with error code boost::beast::websocket::error::closed
received), but not in the other threads : Instead of being cancelled, they hang.
Diving into the code, I eliminated the hypothesis of a deadlock of my own and found that the context.run()
just didn't notice the websocket stream was closed and continues waiting for an incoming message.
Of course the problem disappears when the pool is limited to a single thread.
calling close(...)
from outside or inside a io operation does not change the problem.
calling context.stop()
has no effect either on the problem, be it called outside or inside.
What can be the problem and how am I supposed to make the context stop running on a graceful websocket close ?
================================= EDIT WITH SOLUTION
I managed to change my code thanks to sehe answer above. Instead of starting the read loop in each thread, i'm doing it once after the pool initialization, but adding auto work=make_work_guard(context);
and work.reset()
:
io_context context;
auto work=make_work_guard(context);//<<<<<<<<<<<<<<
std::vector<std::thread> pool(std::thread::hardware_concurrency());
...
wss(make_strand(context),ssl);//I keep it because I will add other streams
...
for(auto& th:pool)
th=std::thread([&]{
try{ context.run(); }catch(...){} //<<<<<<<<<<<<<<<<<<<
close_wss_streams_once_each(...);//cancels all threads
});
start_async_read_loop();//<<<<<<<<<<<<<<
work.reset();//<<<<<<<<<<<<<<<<<
for(auto& th:pool)
th.join();
Apparently I shouldn't post an IO operation in each thread, which I had decided to do to give all threads work to do. Instead, using the work guqrd prevents the threads from returning prematurely.
A stream is not a process (or even an operation). You cannot "run a [websocket] stream". You basically only ever run an event loop that executes enqueued handlers, aside from the synchronous code.
The code shown begs the opposite question: why don't all the thread all immediately return (because no work exists before starting the threads)? It is clear your actual code is sufficiently different for this to not happen.
Perhaps you even have an explicit
work_guard
around. If so, that of course explains why things are not shutting down.I'm not sure that helps make sense to me. Logically the chances of a deadlock increase with fewer threads. Regardless, that wasn't your problem.
Imagined Problem Code, But Working
Here's what I imagine, just adding that work-guard to make it so that the threads don't all complete before you even post the first async_read:
Now, let's construct, connect, ssl handshake and ws handshake a websocket client (synchronously for simplicity):
Now let's add your
loop_read_handler
. Apparently that is some kind of (member) function, but we don't have a class here. So let's drop in a closure:Of course, we have to kick off the first read:
Now, I could do a timer, but realistically you want graceful shutdown when your application receives the signal to terminate, so let's do that for demo:
That's all! Now, all we have to do is wait. So, we can remove the scaffolding:
Full Listing
Live On Coliru
Running it against a simple demo wss server:
And either terminating with Ctrl-C in a terminal, or sending it SIGTERM signal:
BONUS
The whole thread pool can be replaced (more correctly!) with
asio::thread_pool
:That way, you don't have to meddle with a work-guard at all (or worry about correct handling of exceptions).