Can I execute on get my `std::future` and wait on it too?

1.1k views Asked by At

So you can create a std::future that does no work until .get() is called:

auto f_deferred = std::async( std::launch::deferred, []{ std::cout << "I ran\n"; } );

You can also write a std::future that is waitable, and can be made ready at any point by code in any thread:

std::packaged_task<void()> p( []( std::cout << "I also ran\n"; } );
auto f_waitable = p.get_future();

If you call f_deferred.wait_for(1ms), it won't bother waiting. If you call f_deferred.get(), a lambda of your choice (in this case, one that prints "I ran\n" executes.

If you call f_waitable.get(), there is no way for code managing the tasks to be aware that someone is waiting on the future. But if you call f_deferred.wait(1ms);, you simply get future_status::deferred immediately.

Is there any way I can combine these two?

A concrete use case is a thread pool returning futures when people queue tasks. If an unqueued future is .get()'d, I want to use the thread that is blocked to execute the task rather than having it idle. On the other hand, I want people with the returned futures to be able to determine if the task is finished, and even wait a bounded amount of time for the task to be finished. (in the case where you are waiting, I'm ok with your thread being idle during your wait)

Failing that, are there solutions in upcoming proposals that would solve this problem better than having my thread pool return a future with all of its limitations? I've heard that there is no future in futures and better solutions exist to the problem futures solve.

2

There are 2 answers

0
bradgonesurfing On

Taskflow implements a work stealing scheduler.

tf::Executor::corun waits for the child taskflow to finish but it doesn't block the worker thread. The worker thread that is running waiting task then starts stealing jobs from other workers. This happens inside the corun method itself.

https://taskflow.github.io/

The below code creates 1000 tasks which themselves wait on 1000 other tasks to complete. At the same time there are on 2 threads working and there is no deadlock.

tf::Executor executor(2);
tf::Taskflow taskflow;
std::array<tf::Taskflow, 1000> others;

std::atomic<size_t> counter{0};

for(size_t n=0; n<1000; n++) {
  for(size_t i=0; i<1000; i++) {
    others[n].emplace([&](){ counter++; });
  }
  taskflow.emplace([&executor, &tf=others[n]](){
    executor.corun(tf); // Does not block the worker thread ( no deadlock )
    //executor.run(tf).wait();  <- blocking the worker without doing anything
    //                             will introduce deadlock
  });
}
executor.run(taskflow).wait();
0
cantordust On

I am not sure if this is exactly what you need, but it serves the purpose of illustrating what I suggested in the comment. At the very least, I hope it gives you some ideas to implement what you need if it doesn't cover all your needs.

Disclaimer: This is very crude. A lot of things could certainly be done more elegantly and efficiently.

#include <iostream>
#include <thread>
#include <future>
#include <memory>
#include <functional>
#include <queue>
#include <random>
#include <chrono>
#include <mutex>

typedef std::packaged_task<void()> task;
typedef std::shared_ptr<task> task_ptr;
typedef std::lock_guard<std::mutex> glock;
typedef std::unique_lock<std::mutex> ulock;
typedef unsigned int uint;
typedef unsigned long ulong;

// For sync'd std::cout
std::mutex cout_mtx;

// For task scheduling
std::mutex task_mtx;
std::condition_variable task_cv;

// Prevents main() from exiting
// before the last worker exits
std::condition_variable kill_switch;

// RNG engine
std::mt19937_64 engine;

// Random sleep (in ms)
std::uniform_int_distribution<int> sleep(100, 10000);

// Task queue
std::queue<task_ptr> task_queue;

static uint tasks = 0;
static std::thread::id main_thread_id;
static uint workers = 0;

template<typename T>
class Task
{
    // Not sure if this needs
    // to be std::atomic.
    // A simple bool might suffice.
    std::atomic<bool> working;
    task_ptr tp;

public:

    Task(task_ptr _tp)
        :
          working(false),
          tp(_tp)
    {}

    inline T get()
    {
        working.store(true);
        (*tp)();
        return tp->get_future().get();
    }

    inline bool is_working()
    {
        return working.load();
    }
};

auto task_factory()
{
    return std::make_shared<task>([&]
    {
        uint task_id(0);
        {
            glock lk(cout_mtx);
            task_id = ++tasks;
            if (std::this_thread::get_id() == main_thread_id)
            {
                std::cout << "Executing task " << task_id << " in main thread.\n";
            }
            else
            {
                std::cout << "Executing task " << task_id << " in worker " << std::this_thread::get_id() << ".\n";
            }
        }
        std::this_thread::sleep_for(std::chrono::milliseconds(sleep(engine)));
        {
            glock lk(cout_mtx);
            std::cout << "\tTask " << task_id << " completed.\n";
        }
    });
}

auto func_factory()
{
    return [&]
    {

        while(true)
        {
            ulock lk(task_mtx);
            task_cv.wait(lk, [&]{ return !task_queue.empty(); });
            Task<void> task(task_queue.front());
            task_queue.pop();

            // Check if the task has been assigned
            if (!task.is_working())
            {
                // Sleep for a while and check again.
                // If it is still not assigned after 1 s,
                // start working on it.
                // You can also place these checks
                // directly in Task::get()
                {
                    glock lk(cout_mtx);
                    std::cout << "\tTask not started, waiting 1 s...\n";
                }
                lk.unlock();
                std::this_thread::sleep_for(std::chrono::milliseconds(1000));
                lk.lock();
                if (!task.is_working())
                {
                    {
                        glock lk(cout_mtx);
                        std::cout << "\tTask not started after 1 s, commencing work...\n";
                    }
                    lk.unlock();
                    task.get();
                    lk.lock();
                }

                if (task_queue.empty())
                {
                    break;
                }
            }
        }
    };
}

int main()
{
    engine.seed(std::chrono::high_resolution_clock::now().time_since_epoch().count());

    std::cout << "Main thread: " << std::this_thread::get_id() << "\n";
    main_thread_id = std::this_thread::get_id();

    for (int i = 0; i < 50; ++i)
    {
        task_queue.push(task_factory());
    }

    std::cout << "Tasks enqueued: " << task_queue.size() << "\n";

    // Spawn 5 workers
    for (int i = 0; i < 5; ++i)
    {
        std::thread([&]
        {
            {
                ulock lk(task_mtx);
                ++workers;
                task_cv.wait(lk);
                {
                    glock lk(cout_mtx);
                    std::cout << "\tWorker started\n";
                }
            }

            auto fn(func_factory());
            fn();

            ulock lk(task_mtx);
            --workers;
            if (workers == 0)
            {
                kill_switch.notify_all();
            }

        }).detach();
    }

    // Notify all workers to start processing the queue
    task_cv.notify_all();

    // This is the important bit:
    // Tasks can be executed by the main thread
    // as well as by the workers.
    // In fact, any thread can grab a task from the queue,
    // check if it is running and start working
    // on it if it is not.
    auto fn(func_factory());
    fn();

    ulock lk(task_mtx);
    if (workers > 0)
    {
        kill_switch.wait(lk);
    }

    return 0;
}

This is my CMakeLists.txt

cmake_minimum_required(VERSION 3.2)

project(tp_wait)

set(CMAKE_CXX_COMPILER "clang++")
set(CMAKE_CXX_STANDARD 14)
set(CMAKE_CXX_STANDARD_REQUIRED ON)

set(CMAKE_BUILD_TYPE "Debug" CACHE STRING "Build type" FORCE)

find_package(Threads REQUIRED)

add_executable(${PROJECT_NAME} "main.cpp")
target_link_libraries(${PROJECT_NAME} ${CMAKE_THREAD_LIBS_INIT})