tbb flow graph Segmentation fault in _flow_graph_cache_impl.h

205 views Asked by At

I'm trying to run some code with flow graph, it compiled successfully, however when running it gives a segmentation fault in the tbb flow graph library files after invoking input node, I can't seem to be able to find the reason of it. My input node is like this:

class InputNode{
public:
    // constructor, copy constructor, destructor are implemented
    bool operator() (InputResult &v){
        //some logic here that defines wether to process or not
        if(shouldProcess){
            v = InputResult();
            // some logic to set values inside v
            return true;
        } else
            return false;
    }
};

This node is connected to node of typetbb::flow::multifunction_node<InputResult, std::tuple<InputResult>>, connection is done by tbb::flow::make_edge(src, firstNodeFilter);. InputResult is a pointer, I have checked and confirmed it was set to a valid value not to null.

Exception is being thrown in tbb::flow::interface11::internal::broadcast_cache by function try_put_task at task *new_task = (*i)->try_put_task(t); which is called from tbb::flow::interface11::input_node by function apply_body_bypass at task *last_task = my_successors.try_put_task(v);

----------edit------------

Sorry for unclarity about input_node implementation. I have this code implemented for it.

tbb::flow::input_node<InputResult> src(g, InputNode());

When I try to change the InputNode to match the InputNodeBody I get compiler error in _flow_graph_body_impl.h:

error: no match for call to `InputResult&`
    bool operator()((Output &output) __TBB_override {return body( output ); }

And says there's no known cast from tbb::flowcontrol& to InputResult&

-------------edit 2 ------------------------ The following includes more of my code, removed some template parameters for readability. This code is after upgrading oneTBB.

file 1:

class NavigationQueryExecuter {
public:    
    EdgesRepoClass &edges;
    NodeRepoClass &nodes;
    LinkageRepoClass &linkage;

    NavigationQueryExecuter(NodeRepoClass& nodeRepo, EdgesRepoClass& edgeRepo, LinkageRepoClass& linkageRepo): nodes(nodeRepo),edges(edgeRepo), linkage(linkageRepo){ };

    template<class TQuery>
    TQuery* Execute(){
        typedef typename TQuery::InputResult TInputResult;
        auto query = new TQuery();
        class InputNode{
        private:
            const unsigned int Count = 1024; //const count.

            NodeRepoClass &nodes;
            TQuery& query;
            TPage* lastPage;
            TPid pid;
            const TPid lpid;
            unsigned int idx;
        public:
            InputNode(TQuery&q, NodeRepoClass& nodeRepo, TPid firstPageId, TPid lastPageId): query(q), nodes(nodeRepo), lpid(lastPageId){
                idx=0;
                pid=firstPageId;
                lastPage= nodes.GetPage(pid);
            }

            InputNode(const InputNode &other): query(other.query) ,nodes(other.nodes), pid(other.pid), lpid(other.lpid), idx(other.idx){
                lastPage = nodes.GetPage(other.lastPage->Id);
            }

            ~InputNode(){
                if(lastPage){
                    nodes.ReleasePage(lastPage);
                    lastPage= nullptr;
                }
            }

            TInputResult operator() (tbb::flow_control &fc){ //this function is invoked once only before exception is thrown.
                //logic to skip unused objects removed for simplicity.
                auto node = new NodeClass(lastPage, idx);
                while(idx < Count){
                    if(node->InUse()){
                        auto res = query.ProcessInput(node);
                        delete node;
                        return res; //res is set correctly, breaks after returning without touching any other parts of my code.
                    }
                    node->Id = ++idx;
                }
                delete node;
                fc.stop();
                return nullptr;
            }
        };
        auto g = tbb::flow::graph();
        tbb::flow::input_node<TInputResult> src(g, InputNode(*query,nodes, 0, 40));
        query->BuildGraph(g, src);
        src.activate();
        g.wait_for_all();
        return query;
    }
};

file 2:

class QueryExample{
    EdgesRepoClass &edges;
    NodeRepoClass &nodes;
    LinkageRepoClass &linkage;
public:
    struct Result{
        int n1, n2, e1;
    };

    typedef Result* InputResult;
    typedef std::vector<InputResult> OutputResult;
    typedef tbb::flow::multifunction_node<InputResult, std::tuple<InputResult>> FilterNodeType;

    OutputResult result;

    FilterOnNode(NodeRepoClass& nodeRepo, EdgesRepoClass& edgeRepo, LinkageRepoClass& linkageRepo): nodes(nodeRepo),edges(edgeRepo), linkage(linkageRepo){
        result=OutputResult();
    }

    InputResult ProcessInput(typename NodeRepoClass::TEntry* node){
        //initialize, and process all nodes.
        Result* res = new Result();
        res->n1 = node->Id;
        return res;
    }

    void BuildGraph(tbb::flow::graph &g, tbb::flow::input_node<InputResult> &src) {

        auto firstNodeFilter = FilterNodeType(
                g,
                tbb::flow::unlimited,
                [&](const InputResult &input, typename FilterNodeType::output_ports_type &op) { 
                //processing logic can either output to connected nodes or drop unncessary nodes.
                //this function is never reached, code breaks before it.
                });
        // couple more multifunction_node are created.
        tbb::flow::make_edge(src, firstNodeFilter);
        tbb::flow::make_edge(tbb::flow::output_port<0>(firstNodeFilter), generateEdgesFilter);
        tbb::flow::make_edge(tbb::flow::output_port<0>(generateEdgesFilter), secondNodeFilter);
        tbb::flow::make_edge(tbb::flow::output_port<0>(secondNodeFilter), outputNodeFilter);

    }
};

The code breaks where indicated in file 1 in the comment.

1

There are 1 answers

11
Aleksei Fedotov On

The interface of the input node (former source node) with the passed message to be filled by reference has a flaw that results in potential using of the same memory location concurrently in the input node and its successors. Consider looking into this direction to find the root cause of the segmentation fault.

Also, consider using the new interface, which is based on the tbb::flow_control and returning of the produced message from the node body: input_node, its body requirements