how to abort execution of a node and its children in tbb flowgraph

1.1k views Asked by At

I'm currently testing the flow graph feature of tbb. In order to use it, I must be able to abort execution of some node in the graph, including all children which depend on it but leave other children which do not depend on it, executing. Throwing an exception from the body or calling task::cancel_group_execution() aborts the execution of all nodes.

#include <cstdio>
#include "tbb/flow_graph.h"

using namespace tbb::flow;

struct body
{   std::string my_name;
    body( const char *name ) : my_name(name)
    {
    }
    void operator()( continue_msg ) const
    {   if (my_name == "B")
            tbb::task::self().group()->cancel_group_execution();
        else
        {   sleep(1);
            printf("%s\n", my_name.c_str());
        }
    }
};

int main()
{
    graph g;

    broadcast_node< continue_msg > start(g);
    continue_node<continue_msg> a( g, body("A"));
    continue_node<continue_msg> b( g, body("B"));
    continue_node<continue_msg> c( g, body("C"));
    continue_node<continue_msg> d( g, body("D"));
    continue_node<continue_msg> e( g, body("E"));

    make_edge( start, a );
    make_edge( start, b );
    make_edge( a, c );
    make_edge( b, c );
    make_edge( c, d );
    make_edge( a, e );

    for (int i = 0; i < 3; ++i )
        try
        {   start.try_put( continue_msg() );
            g.wait_for_all();
        } catch (...)
        {   printf("Caught exception\n");
        }
    return 0;
}
2

There are 2 answers

0
yohjp On BEST ANSWER

You can represent abort status with bool instead of continue_msg. Each process_node receive predecessor node status and process task when it's available, and send updated abort status to successor node.

struct body
{   std::string my_name;
    body( const char *name ) : my_name(name)
    {
    }
    bool operator()( bool avail ) const
    {   if (!avail)
           printf("%s skipped\n", my_name.c_str());
        else
            if (my_name == "B")
            {   printf("%s fail\n", my_name.c_str());
                avail = false;  // fail task
            }
            else
            {   sleep(1);
                printf("%s\n", my_name.c_str());
            }
        return avail;
    }
};

int main()
{
    graph g;

    typedef function_node<bool, bool> process_node;
    typedef std::tuple<bool,bool> bool_pair;
    broadcast_node< bool > start(g);
    process_node a( g, unlimited, body("A"));
    process_node b( g, unlimited, body("B"));
    process_node c( g, unlimited, body("C"));
    join_node<bool_pair> join_c(g);
    function_node<bool_pair, bool> and_c(g, unlimited, [](const bool_pair& in)->bool {
        return std::get<0>(in) && std::get<1>(in);
    });
    process_node d( g, unlimited, body("D"));
    process_node e( g, unlimited, body("E"));

    /*
     * start -+-> A -+-> E
     *        |       \
     *        |        \
     *        |         join_c -> and_c -> C -> D
     *        |        /
     *        |       /
     *        +-> B -- 
     */
    make_edge( start, a );
    make_edge( start, b );
    make_edge( a, input_port<0>(join_c) );
    make_edge( b, input_port<1>(join_c) );
    make_edge( join_c, and_c );
    make_edge( and_c, c );
    make_edge( c, d );
    make_edge( a, e );

    for (int i = 0; i < 3; ++i )
        try
        {   start.try_put( true );
            g.wait_for_all();
        } catch (...)
        {   printf("Caught exception\n");
        }
    return 0;
}
0
cahuson On

If you want to be able to cancel part of a graph's execution, you need to use task_group_contexts. Add the following include:

#include "tbb/task.h"

and change your main program to the following:

int main()
{
    tbb::task_group_context tgc1;
    tbb::task_group_context tgc2;
    graph g1(tgc1);
    graph g2(tgc2);
    printf("Constructing graph\n");
    broadcast_node< continue_msg > start(g1);
    continue_node<continue_msg> a( g1, body("A"));
    continue_node<continue_msg> b( g2, body("B"));
    continue_node<continue_msg> c( g2, body("C"));
    continue_node<continue_msg> d( g2, body("D"));
    continue_node<continue_msg> e( g1, body("E"));

    make_edge( start, a );
    make_edge( start, b );
    make_edge( a, c );
    make_edge( b, c );
    make_edge( c, d );
    make_edge( a, e );

    for (int i = 0; i < 3; ++i ) {
        try
        {   
            printf("broadcasting graph %d\n", i);
            start.try_put( continue_msg() );
            g1.wait_for_all();
            g2.wait_for_all();
        } catch (...)
        {   printf("Caught exception\n");
        }
        g1.wait_for_all();
        g1.reset();
        g2.reset();
    }
    return 0;
}

Each task_group_context is a subcontext of the (default) parent context. Cancelling g2 does not affect g1. If B throws instead of cancelling, your catch will ensure the exception is not passed to the parent. If you don't catch the exception, the parent context will also be cancelled, as will the context for A and E.

graph with multiple task_group_contexts

Notice you have to wait for both graphs to complete. Also you must reset() the graphs to reset the continue_nodes' counters. Actually, in the case where an exception is thrown and caught, there is no guarantee that g1 is complete after the catch(...) is complete, so you need to do a g1.wait_for_all() outside the try/catch. I edited the code to show that.

Instead of using cancellation to stop part of the computation, you could make B a multifunction_node with an input of continue_msg and a single output of a continue_msg:

typedef multifunction_node<continue_msg, tuple<continue_msg> > mf_type;

struct mf_body {
    std::string my_name;
    mf_body(const char *name) : my_name(name) {}
    void operator()(continue_msg, mf_type::output_ports_type &op) {
        if(my_name == "B") {
            printf("%s returning without sending\n", my_name.c_str());
            return;
        }
        sleep(1);
        get<0>(op).try_put(continue_msg());
        return;
    }
};

Then you create node B:

mf_type b( g, unlimited, mf_body("B"));

and the edge from B to C would be set up like so:

make_edge( output_port<0>(b), c ); 

In this case you don't need to split the graph into two subgraphs. If node B would have cancelled, instead it returns without forwarding a continue_msg to its successor. If node B does not forward a message, node C will not execute, because it needs two continue_msgs to start. You still need to reset the graph after, to reset C's count.

The multifunction_node has the advantage that you can choose to forward a message or not. The caveat here is that a multifunction_node with a continue_msg input is not like a continue_node. The continue_node needs as many continue_msgs as it has predecessors (plus the initialization value on construction.) The multifunction_node body is executed when it receives a continue_msg, no matter how many predecessors it has. So for your graph you cannot just make all the nodes multifunction_nodes.