Question

In my previous question, I implemented a TBB Pipeline using C++ (Linux) with Input, Transform and Output filters:

incorrect output with TBB pipeline

The Input filter was reading data (C structure) from text files and was passing to Transform filter. The Transform filter was updating data and passing it to Output filter. The Output filter was storing it back on the disc. A simple read and write application.

Now, I am looking to create a MAIN application. The MAIN application will create a TBB Pipeline with three filters in the start:

  1. InputFilter : Receive data/C structure from the main application and pass it on.
  2. TransformFilter: Do some processing.
  3. OutputFilter : Receive it and return the information back to the main application.

    In the start, InputFilter will not do anything as data/C Structure will be empty. So it will be looping or waiting.

    The MAIN application will read data from the file and will pass the information to the InputFilter (if required). The InputFilter will then process it and pass it to the next filter and so on. So the different is that :

    Input is controlled by the MAIN application not with in the InputFilter (as I did before). One way can be to pass data/ C structure by reference to the InputFilter and then update it via MAIN application. But problem is:

    The control is never returned from InputFilter to the MAIN application. Any help will be really appreciated ! !

Was it helpful?

Solution

I've modified the example from thread_bound_filter documentation page in order to make the first filter as thread-bound. It works fine and I think it is what you need:

#include <iostream>
#include "tbb/compat/thread"
#include "tbb/pipeline.h"

using namespace tbb;
using namespace std;

char InputString[] = "abcdefg\n";

class InputFilter: public thread_bound_filter {
    char* my_ptr;
public:
    void* operator()(void*) {
        if (*my_ptr)
            return my_ptr++;
        else
            return NULL;
    }
    InputFilter()
    : thread_bound_filter( serial_in_order ), my_ptr(InputString)
    {}
};

class OutputFilter: public filter {
public:
    void* operator()(void* item) {
        std::cout << *(char*)item;
        return NULL;
    }
    OutputFilter() : filter(serial_in_order) {}
};

void RunPipeline(pipeline* p) {
    p->run(8);
}

int main() {
    // Construct the pipeline
    InputFilter f;
    OutputFilter g;
    pipeline p;
    p.add_filter(f);
    p.add_filter(g);

    // Another thread initiates execution of the pipeline
    thread t(RunPipeline, &p);

    // Process the thread_bound_filter with the current thread.
    while (f.process_item()!=thread_bound_filter::end_of_stream)
        continue;

    // Wait for pipeline to finish on the other thread.
    t.join();

    return 0;
}

Of course, you may want to add more filters, change their type (except the first which must be serial), change number of tokens, use task::enqueue() instead of explicit thread, replace process_item() by try_process_item() in order to avoid being blocked inside when the number of tokens has exceeded.. but the general idea is the same, you can return the control to your thread which processes a filter.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top