I've modified the example from thread_bound_filter documentation page in order to make the first filter as thread-bound. It works fine and I think it is what you need:
#include <iostream>
#include "tbb/compat/thread"
#include "tbb/pipeline.h"
using namespace tbb;
using namespace std;
char InputString[] = "abcdefg\n";
class InputFilter: public thread_bound_filter {
char* my_ptr;
public:
void* operator()(void*) {
if (*my_ptr)
return my_ptr++;
else
return NULL;
}
InputFilter()
: thread_bound_filter( serial_in_order ), my_ptr(InputString)
{}
};
class OutputFilter: public filter {
public:
void* operator()(void* item) {
std::cout << *(char*)item;
return NULL;
}
OutputFilter() : filter(serial_in_order) {}
};
void RunPipeline(pipeline* p) {
p->run(8);
}
int main() {
// Construct the pipeline
InputFilter f;
OutputFilter g;
pipeline p;
p.add_filter(f);
p.add_filter(g);
// Another thread initiates execution of the pipeline
thread t(RunPipeline, &p);
// Process the thread_bound_filter with the current thread.
while (f.process_item()!=thread_bound_filter::end_of_stream)
continue;
// Wait for pipeline to finish on the other thread.
t.join();
return 0;
}
Of course, you may want to add more filters, change their type (except the first which must be serial), change number of tokens, use task::enqueue()
instead of explicit thread, replace process_item()
by try_process_item()
in order to avoid being blocked inside when the number of tokens has exceeded.. but the general idea is the same, you can return the control to your thread which processes a filter.