Employing parallelism in pipelined execution

1.2k views Asked by At

I am trying to develop a pipeline in which data is first read and processed, manipulated once, manipulated in a different way, and then displayed. I have a design in mind in which the data IO feeds into a buffer that is read by the first manipulator. Subsequently that first manipulator writes to another buffer which is read when possible by the second manipulator. Lastly, the output of the second manipulator is written to a display buffer which is read by the visualizer and displayed using OpenGL.

In my mind this is a fairly straightforward parallel problem in which each task has its own thread and they communicate via data buffers. However, all the tutorials I've come across for threaded programs seem to suggest that multithreading is something to be left up to some middleware (like OpenMP) that decides how to divide the workload.

I'm new to developing multithreaded applications, so this may be a dumb question, but is what I've described feasible and can it be done with middleware like OpenMP? I realize the obvious answer is "try it," and I want to, but the tutorials don't shed any light on *how* to try it.

3

There are 3 answers

0
cbuchart On BEST ANSWER

OpenMP is better suited for algorithms that easily span to multiple cores (SIMD). Other scenarios are possible but in your case I think a direct use of threads will work better and will be easier to code and maintain.

I'm dividing my answer in two parts: a general solution without OpenMP, and some specific changes to use OpenMP.

As mentioned in a comment, you're facing the producer/consumer problem, but twice: one thread is filling a buffer (producing an item), which then must be read (and modified) by a second one (consumed). The particularity of your problem is that this second thread is also a producer (the image to be drawn) and a third thread is the one in charge of consuming it (the visualizer).

As you already know, the P/C problem is solved using a buffer (probably a circular buffer or a queue of produced items), where each element of the buffer is marked as produced or consumed, and where threads have exclusive access when adding or taking items from it.


Lets use the queue approach with your problem in following example program.

  • Produced items will be stored in a queue. The front of the queue contains the oldest elements, those that must be consumed first.
  • There are two queues: one for data produced by the first manipulator (and to be consumed by the second manipulator), and another one for data produced by the second manipulator (and that is going to be visualized by another thread).
  • The production phase is simple: gain exclusive access to the corresponding queue and insert the element at the end.
  • Consumption is similar but must wait for the queue to have at least one element (be not empty).
  • I've added some sleeps to simulate other operations.
  • The stop condition is for illustration purposes.

Note: I'm assuming you have access to a C++11 compiler for the sake of simplicity. Implementations using other APIs are relatively similar.

#include <iostream>
#include <thread>
#include <mutex>
#include <atomic>
#include <chrono>
#include <list>

using namespace std::chrono_literals;

std::mutex g_data_produced_by_m1_mutex;
std::list<int> g_data_produced_by_m1;

std::mutex g_data_produced_by_m2_mutex;
std::list<int> g_data_produced_by_m2;

std::atomic<bool> stop = false;

void manipulator1_kernel()
{
  while (!stop) {
    // Producer 1: generate data
    {
      std::lock_guard<std::mutex> lock(g_data_produced_by_m1_mutex);
      g_data_produced_by_m1.push_back(rand());
    }
    std::this_thread::sleep_for(100ms);
  }
}

void manipulator2_kernel()
{
  int data;

  while (!stop) {
    // Consumer 1
    while (!stop) { // wait until there is an item to be consumed
      {
        std::lock_guard<std::mutex> lock(g_data_produced_by_m1_mutex);
        if (!g_data_produced_by_m1.empty()) { // is there data to be consumed?
          data = g_data_produced_by_m1.front(); // consume
          g_data_produced_by_m1.pop_front();
          break;
        }
      }
      std::this_thread::sleep_for(100ms);
    }

    // Producer 2: modify and send to the visualizer
    {
      std::lock_guard<std::mutex> lock(g_data_produced_by_m2_mutex);
      g_data_produced_by_m2.push_back(5 * data);
    }

    std::this_thread::sleep_for(100ms);
  }
}

void visualizer_kernel()
{
  int data;

  while (!stop) {
    // Consumer 2
    while (!stop) { // wait until there is an item to be visualized
      {
        std::lock_guard<std::mutex> lock(g_data_produced_by_m2_mutex);
        if (!g_data_produced_by_m2.empty()) {
          data = g_data_produced_by_m2.front();
          g_data_produced_by_m2.pop_front();
          break;
        }
      }
      std::this_thread::sleep_for(100ms);
    }

    std::cout << data << std::endl; // render to display
    std::this_thread::sleep_for(100ms);

    if (data % 8 == 0) stop = true; // some stop condition for the example
  }
}

int main()
{
  std::thread manipulator1(manipulator1_kernel);
  std::thread manipulator2(manipulator2_kernel);
  std::thread visualizer(visualizer_kernel);

  visualizer.join();
  manipulator2.join();
  manipulator1.join();

  return 0;
}

If you still want to use OpenMP, probably the closest thing you can find are tasks (since OpenMP 3.0 I think). I haven't used them very much, but above program can be rewritten like:

int main()
{
  #pragma omp parallel
  {
    #pragma omp task
    manipulator1_kernel();
    #pragma omp task
    manipulator2_kernel();
    #pragma omp task
    visualizer_kernel();

    #pragma omp taskwait
  }    

  return 0;
}

The rest of the code could be changed to use OpenMP features too, but I think this answers your question.

The main problem with this approach is that you have to create a code-block for tasks to live within the OpenMP parallel, easily complicating the rest of your application logic and structure.

0
Aleksei Fedotov On

To solve this particular problem IntelĀ® Threading Building Blocks library includes special constructions. IntelĀ® TBB is cross-platform library that aids in multithreading programming. We could look at the entities involved in your application as at four different task providers. One type of tasks is input tasks - those that provide input data, another type of tasks is provided by the first manipulation routine, and so on.

Thus, the only thing the user needs to do is to provide the body for those tasks. There are several APIs in the library for specifying what bodies to be processed and how to do it in parallel. Everything else (here I mean thread creation, synchronization between task execution, work balancing, etc.) is done by the library.

The simplest variant of the solution that came to my mind is the using of parallel_pipeline function. Here is the prototype:

#include "tbb/pipeline.h"
using namespace tbb;

int main() {
    parallel_pipeline(/*specify max number of bodies executed in parallel, e.g.*/16,
        make_filter<void, input_data_type>(
            filter::serial_in_order, // read data sequentially
            [](flow_control& fc) -> input_data_type {
                if ( /*check some stop condition: EOF, etc.*/ ) {
                    fc.stop();
                    return input_data_type(); // return dummy value
                }
                auto input_data = read_data();
                return input_data;
            }
        ) &
        make_filter<input_data_type, manipulator1_output_type>(
            filter::parallel, // process data in parallel by the first manipulator
            [](input_data_type elem) -> manipulator1_output_type {
                auto processed_elem = manipulator1::process(elem);
                return processed_elem;
            }
        ) &
        make_filter<manipulator1_output_type, manipulator2_output_type>(
            filter::parallel, // process data in parallel by the second manipulator
            [](manipulator1_output_type elem) -> manipulator2_output_type {
                auto processed_elem = manipulator2::process(elem);
                return processed_elem;
            }
        ) &
        make_filter<manipulator2_output_type, void>(
            filter::serial_in_order, // visualize frame by frame
            [](manipulator2_output_type elem) {
                visualize(elem);
            }
        )
    );
    return 0;
}

provided that necessary functions (read_data, visualize) are implemented. Here input_data_type, manipulator1_output_type, etc. are the types that are passed between pipeline stages, and manipulator's process functions do necessary computation on the passed arguments.

BTW, to avoid working with locks and other synchronization primitives, you can use concurrent_bounded_queue from the library and put your input data into this queue, by possibly different thread (e.g. dedicated to IO operations), as simple as concurrent_bounded_queue_instance.push(elem), and then read it via input_data_type elem; concurrent_bounded_queue_instance.pop(elem). Note that popping an item is a blocking operation here. concurrent_queue provides non-blocking try_pop alternative.

The other possibility is to use tbb::flow_graph and its nodes for organizing the same pipelining scheme. Take a look at two examples that describe dependency and data flow graphs. You might need to use sequencer_node for correct ordering of items execution (if necessary).

It is worth reading the SO questions marked by tag to see how other people use this library.

0
Andriy Tylychko On

have you implemented single-threaded version? profiled?

they are crucial steps, w/o them you can get the optimal implementation of your highly-parallel design just to realise that the bottleneck is I/O of your buffers and/or thread synchronisation and/or false sharing and/or cache misses or similar issues.

I'd first try a simple thread pool with tasks that do all steps sequentially. Then after analysing how it works, what is CPU consumption etc. I'd experiment with more sophisticated tools always comparing their performance with the first simple edition