:" /> :" /> :"/>

Thread task not exiting when atomic value changed

40 views Asked by At

The problem I am facing is that the application does not exit when I type "exit" in the console. It gets stuck in the follwing thread:

void server<T>::handleClientConnections(int serverSocketFD, std::atomic<bool> &__flag)
{
    std::vector<std::thread> __threads;

    while (__flag.load())
    {
        acceptedSocket newAcceptedSocket;

        if (!acceptConnection(serverSocketFD, newAcceptedSocket))
            continue;

        connectedSockets.push_back(newAcceptedSocket);

        std::thread thread(&server::receivedDataHandlerThread, this, newAcceptedSocket);
        thread.detach();

        __threads.push_back(std::move(thread));
    }

    for (auto &thread : __threads)
        thread.join();

    for (const auto &socket : connectedSockets)
        close(socket.getAcceptedSocketFD());
}

I am creating a thread for each connection and I am using an atomic variable to signal the other threads. (SERVER_RUNNING is declared as a global std::atomic)

template <typename T>
void server<T>::consoleListener(std::atomic<bool> &__flag)
{
    underline(75);

    char input[101] = "";

    while (__flag.load())
    {
        std::cout << std::setw(5) << " "
                  << "--> ";
        std::cin >> input;

        if (strcasecmp(input, "exit") == 0)
        {
            std::cout << std::setw(5) << " "
                      << "Shutting down...\n";

            __flag.store(false);

            break;
        }
    }
}

template <typename T>
void server<T>::server_easy_init(int serverSocketFD)
{
    SERVER_RUNNING.store(true);

    std::thread workerThread(&server::handleClientConnections, this, serverSocketFD, std::ref(SERVER_RUNNING));

    consoleListener(std::ref(SERVER_RUNNING));

    workerThread.join();
}

Thank you!

I tried using a mutex and the condition variable, but I encountered the same bug.

1

There are 1 answers

1
Sorin Tudose On

This is what the updated code:

template <typename T>
void server<T>::receivedDataHandlerThread(const class acceptedSocket __socket)
{
    std::thread(&server::receivedDataHandler, this, __socket).detach();
}

template <typename T>
void server<T>::handleClientConnections(int serverSocketFD, std::atomic<bool> &__flag)
{
    while (__flag)
    {
         // Set up the file descriptor set for select
        fd_set readfds;
        FD_ZERO(&readfds);
        FD_SET(serverSocketFD, &readfds);
        
        // Set timeout
        struct timeval timeout;
        timeout.tv_sec = 1; // 1 second timeout
        timeout.tv_usec = 0;

        // Wait for activity on server socket
        int activity = select(serverSocketFD + 1, &readfds, NULL, NULL, &timeout);

        if (activity == -1)
        {
            perror("select");
            break;
        }
        else if (activity == 0)
            continue;
        else
        {
            acceptedSocket newAcceptedSocket;

            if (!acceptConnection(serverSocketFD, newAcceptedSocket))
                continue;

            connectedSockets.push_back(newAcceptedSocket);

            receivedDataHandlerThread(newAcceptedSocket);
        }
    };

    for (const auto &socket : connectedSockets)
        close(socket.getAcceptedSocketFD());
}

But now I get a couple memory leaks :)

==110270== HEAP SUMMARY:
==110270==     in use at exit: 1,724 bytes in 8 blocks
==110270==   total heap usage: 13,999 allocs, 13,991 frees, 2,431,800 bytes allocated
==110270== 
==110270== 912 bytes in 3 blocks are possibly lost in loss record 4 of 4
==110270==    at 0x484DA83: calloc (in /usr/libexec/valgrind/vgpreload_memcheck-amd64-linux.so)
==110270==    by 0x40147D9: calloc (rtld-malloc.h:44)
==110270==    by 0x40147D9: allocate_dtv (dl-tls.c:375)
==110270==    by 0x40147D9: _dl_allocate_tls (dl-tls.c:634)
==110270==    by 0x4C157B4: allocate_stack (allocatestack.c:430)
==110270==    by 0x4C157B4: pthread_create@@GLIBC_2.34 (pthread_create.c:647)
==110270==    by 0x4A10328: std::thread::_M_start_thread(std::unique_ptr<std::thread::_State, std::default_delete<std::thread::_State> >, void (*)()) (in /usr/lib/x86_64-linux-gnu/libstdc++.so.6.0.30)
==110270==    by 0x116129: std::thread::thread<void (net::server<char>::*)(net::server<char>::acceptedSocket), net::server<char>*, net::server<char>::acceptedSocket const&, void>(void (net::server<char>::*&&)(net::server<char>::acceptedSocket), net::server<char>*&&, net::server<char>::acceptedSocket const&) (std_thread.h:143)
==110270==    by 0x111A78: net::server<char>::receivedDataHandlerThread(net::server<char>::acceptedSocket) (serverUtils.cpp:528)
==110270==    by 0x111958: net::server<char>::handleClientConnections(int, std::atomic<bool>&) (serverUtils.cpp:565)
==110270==    by 0x11908E: void std::__invoke_impl<void, void (net::server<char>::*)(int, std::atomic<bool>&), net::server<char>*, int, std::reference_wrapper<std::atomic<bool> > >(std::__invoke_memfun_deref, void (net::server<char>::*&&)(int, std::atomic<bool>&), net::server<char>*&&, int&&, std::reference_wrapper<std::atomic<bool> >&&) (invoke.h:74)
==110270==    by 0x118D9A: std::__invoke_result<void (net::server<char>::*)(int, std::atomic<bool>&), net::server<char>*, int, std::reference_wrapper<std::atomic<bool> > >::type std::__invoke<void (net::server<char>::*)(int, std::atomic<bool>&), net::server<char>*, int, std::reference_wrapper<std::atomic<bool> > >(void (net::server<char>::*&&)(int, std::atomic<bool>&), net::server<char>*&&, int&&, std::reference_wrapper<std::atomic<bool> >&&) (invoke.h:96)
==110270==    by 0x118B2A: void std::thread::_Invoker<std::tuple<void (net::server<char>::*)(int, std::atomic<bool>&), net::server<char>*, int, std::reference_wrapper<std::atomic<bool> > > >::_M_invoke<0ul, 1ul, 2ul, 3ul>(std::_Index_tuple<0ul, 1ul, 2ul, 3ul>) (std_thread.h:259)
==110270==    by 0x118A19: std::thread::_Invoker<std::tuple<void (net::server<char>::*)(int, std::atomic<bool>&), net::server<char>*, int, std::reference_wrapper<std::atomic<bool> > > >::operator()() (std_thread.h:266)
==110270==    by 0x1189B5: std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (net::server<char>::*)(int, std::atomic<bool>&), net::server<char>*, int, std::reference_wrapper<std::atomic<bool> > > > >::_M_run() (std_thread.h:211)
==110270== 
==110270== LEAK SUMMARY:
==110270==    definitely lost: 0 bytes in 0 blocks
==110270==    indirectly lost: 0 bytes in 0 blocks
==110270==      possibly lost: 912 bytes in 3 blocks
==110270==    still reachable: 812 bytes in 5 blocks
==110270==         suppressed: 0 bytes in 0 blocks
==110270== Reachable blocks (those to which a pointer was found) are not shown.
==110270== To see them, rerun with: --leak-check=full --show-leak-kinds=all
==110270== 
==110270== For lists of detected and suppressed errors, rerun with: -s
==110270== ERROR SUMMARY: 1 errors from 1 contexts (suppressed: 0 from 0)

Anyways, thanks a lot for the help!