Question

I have written a C structure with different values (100 times) in text files such as 1.txt, 2.txt ... 100.txt


I am using Intel TBB on Linux. I have created:

  1. InputFilter (serial_in_order MODE)
  2. TransformFIlter (serial_in_order MODE)
  3. OutputFilter (Serial_in_order MODE)

The InputFilter reads structure from a file and passes it to TransformFilter. The TrasnformFilter updates the structure values and passes it to OutputFilter. The OutputFilter writes the new structure on the disc.

Basically, it is simple read and write application for a structure.

class InputFilter: public tbb::filter {
public:
    InputFilter( int );
    ~InputFilter();
private:
    int total_streams;
    int count;
    struct video_process_object input_obj;
    void* operator()( void* );
};

InputFilter::InputFilter( int x )
        : filter( serial_in_order ) {
    total_streams = x;
    count = 1;
}

InputFilter::~InputFilter() {
    total_streams = 0;
}

void* InputFilter::operator()( void* ) {
    char path[50] = { };
    sprintf( path, "input//%d.txt", count );
    printf( "Path : %s\n", path );
    FILE *fp;
    fp = fopen( path, "r" );

    if( fp == NULL || count > total_streams ) {
        fclose( fp );
        printf( "\n*******Cannot find more data.Terminating********\n\n\n" );
        return NULL;
    }

    fscanf( fp, "%d", &input_obj.video_id );
    fscanf( fp, "%s", &input_obj.storage_url );
    fscanf( fp, "%s", &input_obj.storage_type );
    fscanf( fp, "%d", &input_obj.face_detect );
    fscanf( fp, "%d", &input_obj.face_recognise );
    fscanf( fp, "%d", &input_obj.scene_recognise );
    fscanf( fp, "%d", &input_obj.activity_recognise );
    fscanf( fp, "%d", &input_obj.speech_recognise );
    fclose( fp );

    count++;
    return &input_obj;
}

class TransformFilter: public tbb::filter {
public:
    TransformFilter();
    ~TransformFilter();
private:
    struct video_process_object input_transform;
    void* operator()( void* );
};

TransformFilter::TransformFilter()
        : filter( serial_in_order ) {
}

TransformFilter::~TransformFilter() {
}

void* TransformFilter::operator()( void *item ) {

    input_transform = *static_cast<struct video_process_object*>( item );

    input_transform.video_id += 1000;
    strcat( input_transform.storage_url, "  nabeel" );
    strcat( input_transform.storage_type, " N" );
    input_transform.face_detect += 1000;
    input_transform.face_recognise += 1000;

    return &input_transform;
}

class OutputFilter: public tbb::filter {
public:
    OutputFilter();
    ~OutputFilter();
private:
    struct video_process_object output_obj;
    void* operator()( void* );
};

OutputFilter::OutputFilter()
        : filter( serial_in_order ) {
    int status = mkdir( "output", S_IRWXU | S_IRWXG | S_IRWXO );
    if( status == -1 )
        printf( "\nOutput directory already exists\n\n" );
}

OutputFilter::~OutputFilter() {
}

void* OutputFilter::operator()( void *item ) {

    output_obj = *static_cast<struct video_process_object*>( item );

    FILE *fp;

    char path[50] = { };
    sprintf( path, "output//%d.txt", output_obj.video_id - 1000 );
    printf( "Output Path : %s\t\t %d\n\n", path, output_obj.video_id );

    if( (fp = fopen( path, "w" )) == NULL ) {
        fprintf( stderr, "Cannot open output file.\n" );
        return NULL;
    }

    fprintf( fp, "%d\n", output_obj.video_id );
    fprintf( fp, "%s\n", output_obj.storage_url );
    fprintf( fp, "%s\n", output_obj.storage_type );
    fprintf( fp, "%d\n", output_obj.face_detect );
    fprintf( fp, "%d\n", output_obj.face_recognise );
    fprintf( fp, "%d\n", output_obj.scene_recognise );
    fprintf( fp, "%d\n", output_obj.activity_recognise );
    fprintf( fp, "%d\n", output_obj.speech_recognise );

    fclose( fp );
    return NULL;
}

int main() {
    tbb::pipeline pipeline;

    InputFilter input_filter( 100 );
    pipeline.add_filter( input_filter );

    TransformFilter transform_filter;
    pipeline.add_filter( transform_filter );

    OutputFilter output_filter;
    pipeline.add_filter( output_filter );

    tbb::tick_count t0 = tbb::tick_count::now();

    tbb::task_scheduler_init init_parallel;
    pipeline.run( 1 );
    tbb::tick_count t1 = tbb::tick_count::now();

    return 0;
}

Everything works fine with small number of files such 5 or 10. The problem starts when I read large number of files such as 50 or 100. The problem is:

Sometimes InputFilter reads 10.txt file and TransformFilter processes it. But immediately InputFilter reads 11.txt. OutputFIlter skips 10.txt and processes 11.txt.

How can I make sure that this will not happen?

Was it helpful?

Solution

There is a data-race, because the video_process_objects are placed inside filter structure and passed by reference between filters (which run in parallel of course). So, you have the situation when InputFilter starts processing the next token reading new data into its video_process_object while the first token just starts reading the data by the same address in TransformFilter:

     Token 1                ||         Token 2
input_filter.operator()     ||
transform_filter.operator() ||  input_filter.operator()
...

To fix it, allocate the data dynamically, e.g.:

struct video_process_object *input_obj_ptr = new video_process_object;
fscanf( fp, "%d", &input_obj_ptr->video_id );
...
return input_obj_ptr;

And deallocate it in the last filter since its return value is ignored anyway. The slides 49-50 in this old presentation sketches the similar code.

And finally, let me challenge your choice of tbb::pipelene and serial_in_order filter type. The TBB Reference says:

Parallel filters are preferred when practical because they permit parallel speedup. If a filter must be serial, the out of order variant is preferred when practical because it puts less contraints on processing order.

I see no reasons to put this additional 'in order' limitation since the processing and files are independent. And another quote to consider for sake of better-structured code:

Function parallel_pipeline provides a strongly typed lambda-friendly way to build and run pipelines.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top