Question

I tried out writing a pipelined version of Bitonic Sort using with File Read, Sort, File Write stages using Intel TBB as shown below. The code freezes at the spinlock at while(!outQueue.try_pop(line)); in the FileWriter Filter. Can someone explain why this might be?

update: I did some further testing and discovered that internal_try_pop that is called by try_pop from the header file _concurrent_queue_internal.h has a compare_and_swap operation that fails forever for this particular try_pop. The following are values i extracted from the internal_try_pop

head counter(k)15
tail counter1605177747
item ticket(tk)15
k after head CAS 15
(k=tk)15,15---break!!

I think the tail counter value is garbage. The only reason i can think of for this situation is that the value added to the queue by the sorter may be implicitly modified by it thus making it unavailable.

Any ideas?

Thanks :)

#include <iostream>
#include <sstream>
#include <string>
#include <algorithm>
#include <fstream>
#include "tbb\parallel_for.h"
#include "tbb\blocked_range.h"
#include "tbb\pipeline.h"
#include "tbb\concurrent_queue.h"

using namespace tbb;
using namespace std;

// Filter that writes lines to the output file.
class FileWriterFilter: public tbb::filter {
string outPath;
public:
FileWriterFilter(string outPath );
/*override*/void* operator()( void* item );
};

FileWriterFilter::FileWriterFilter( string outPath ) :
tbb::filter(/*is_serial=*/true),
outPath(outPath)
{
}

void* FileWriterFilter::operator()( void* item ) {

concurrent_queue<string>& outQueue = *static_cast<concurrent_queue<string>*>(item);
string line;
while(!outQueue.try_pop(line));

ofstream myfile(outPath);
if (myfile.is_open())
{
    myfile <<line<<endl;
}
//myfile.close();
return NULL;

 }

 class FileReaderFilter: public tbb::filter {
public:

FileReaderFilter(string inPath);

private:
ifstream ifs;
tbb::concurrent_queue<string> queue;
/*override*/ void* operator()(void*);

 };

 FileReaderFilter::FileReaderFilter(string inPath ) :
filter(/*is_serial=*/true),
ifs(inPath)
{
}

 void* FileReaderFilter::operator()(void*) {

string temp;

if( getline( ifs, temp ))
{

    queue.push(temp);
}

return &queue;
 }

class BitonicSort: public tbb::filter{

public:
    BitonicSort();
/*override*/void* operator()( void* item );

size_t *a;

private : static const  bool ASCENDING=true, DESCENDING=false;

public :void sort(size_t *b,int n)
{
    a=b;
    bitonicSort(0,n,ASCENDING);

}

private: void bitonicSort(int lo,int n,bool dir)
{
    if(n>1)
    {
        int m=n/2;
        bitonicSort(lo,m,ASCENDING);

        bitonicSort(lo+m,m,DESCENDING);
        bitonicMerge(lo,n,dir);


    }

}

private : void bitonicMerge(int lo,int n,bool dir)
     {
         if(n>1)
         {
             int m=n/2;
             for(int i=lo;i<lo+m;i++)
             {
                 compare(i,i+m,dir);

             }
             bitonicMerge(lo,m,dir);
             bitonicMerge(lo+m,m,dir);
         }
     }

private : void compare(int i,int j, bool dir)
          {
              if(dir==a[i]>a[j])
              {
                  exchange(i,j);

              }

          }

private : void exchange(int i,int j)
          {
            /*  cout<<a[i]<<" "<<a[j]<<endl;*/
              int t=a[i];
              a[i]=a[j];
              a[j]=t;
              /*cout<<a[i]<<" "<<a[j]<<endl<<endl;*/
          }

   private :string convertInt(int number)
   {
   stringstream ss;//create a stringstream
   ss << number;//add number to the stream
   return ss.str();//return a string with the contents of the stream
}
 };


 BitonicSort::BitonicSort() :
tbb::filter(/*serial=*/false)
 {}

 /*override*/void* BitonicSort::operator()( void* item ) {

int num_elem=2048;
size_t *max = new size_t[num_elem];
concurrent_queue<string>& queue = *static_cast<concurrent_queue<string>*>(item);
concurrent_queue<string> outQueue;
string line;
while(!queue.try_pop(line));  
istringstream iss(line);
int i=0;
do
{
    string sub;
    iss >> sub;
    max[i]=atoi(sub.c_str());;
    i++;
} while (iss);

sort(max,num_elem);

string out;

for(int i=0;i<num_elem;i++)
{
    out.append(convertInt(max[i]).append(" "));
}

outQueue.push(out);

return &outQueue;
 }


 int main() {

tbb::pipeline pipeline;

FileReaderFilter reader("sample.txt");
pipeline.add_filter(reader);

BitonicSort sorter;
pipeline.add_filter(sorter);

FileWriterFilter writer("test.txt");
pipeline.add_filter(writer);

pipeline.run(3);


pipeline.clear();

system("PAUSE");
 }
Was it helpful?

Solution

I found it! It was quite a trivial error. I have declared the 2nd concurrent_queue as a method variable in the operator method of the sorter filter of the pipeline. Thus every time the operator method executes the queue is reinitialized invalidating the pointer sent to the writer filter. the queue has to be a class variable of the sorter filter and everything works fine. There was another bug with the file writer which has been changed in the following. `

   #include <string>
   #include <algorithm>
   #include <fstream>
   #include "tbb\parallel_for.h"
   #include "tbb\blocked_range.h"
   #include "tbb\pipeline.h"
   #include "tbb\concurrent_queue.h"
   #include "tbb\task_scheduler_init.h"
   #include "tbb\tbb_thread.h"
   #include "tbb\task.h"
   #include <iostream>
   #include <sstream>

  using namespace tbb;
  using namespace std;


 // Filter that writes lines to the output file.
 class FileWriterFilter: public tbb::filter {

public:
int count;
FileWriterFilter(FILE* outFile);

private:
    FILE* outFile;

/*override*/void* operator()( void* item );
};

FileWriterFilter::FileWriterFilter(FILE* outFile) :
tbb::filter(/*is_serial=*/true),
outFile(outFile),count(0)
{
}

/*override*/void* FileWriterFilter::operator()( void* item ) {

tbb::concurrent_queue<string> &outQueue = *static_cast<tbb::concurrent_queue<string>*>   (item);
string outLine;

while(!outQueue.try_pop(outLine))
    this_tbb_thread::yield();

fprintf(outFile,outLine.append("\n").c_str());

count++;
if(count==10000){
    cout<<"over"<<endl;

}
return NULL;

 }


class FileReaderFilter: public tbb::filter {
public:
    FileReaderFilter(char* inPath);

private:
int count;
ifstream ifs;
tbb::concurrent_queue<string> queue;
/*override*/ void* operator()(void*);

 };

FileReaderFilter::FileReaderFilter(char* inPath ) :
filter(/*is_serial=*/true),
ifs(inPath),count(0)
{
}

/*override*/void* FileReaderFilter::operator()(void*) {

string temp;
count++;
if(count<=10000){
    if( getline( ifs, temp ))
    {

           queue.push(temp);

    }
    return &queue;
 }
else{
    return NULL;
}
}


class bitonicMerger : public tbb::task{
int lo;
int n;
bool dir;
size_t* a;
private : static const  bool ASCENDING=true, DESCENDING=false;

public:
    bitonicMerger(int lo_,int n_,bool dir_,size_t* a_): lo(lo_), n(n_),dir(dir_), a(a_) {}


    task* execute() {
    if(n>1)
    {                   
        int m=n/2;
        for(int i=lo;i<lo+m;i++)
        {
         compare(i,i+m,dir);
        }       

        int count = 1;
        tbb::task_list list;
        ++count;
        list.push_back( *new( allocate_child() ) bitonicMerger(lo,m,dir,a) );
        ++count;
        list.push_back( *new( allocate_child() ) bitonicMerger(lo+m,m,dir,a) );
        set_ref_count(count);
        spawn_and_wait_for_all(list);
    }
    return NULL;
}

   private : void compare(int i,int j, bool dir)
          {
              if(dir==a[i]>a[j])
              {
                  exchange(i,j);

              }

          }

private : void exchange(int i,int j)
          {

              int t=a[i];
              a[i]=a[j];
              a[j]=t;

          }

   };


 class bitonicSorter : public tbb::task{
int lo;
int n;
bool dir;
size_t* a;

private : static const  bool ASCENDING=true, DESCENDING=false;

public:
    bitonicSorter(int lo_,int n_,bool dir_,size_t* a_): lo(lo_), n(n_),dir(dir_), a(a_) {}

task* execute() {
    if(n>1)
    {                   
        int m=n/2;
        int count = 1;
        tbb::task_list list;
        ++count;
        list.push_back( *new( allocate_child() ) bitonicSorter(lo,m,ASCENDING,a) );
        ++count;
        list.push_back( *new( allocate_child() ) bitonicSorter(lo+m,m,DESCENDING,a) );
        set_ref_count(count);
        spawn_and_wait_for_all(list);



        count = 1;
        tbb::task_list list1;
        ++count;
        list1.push_back( *new( allocate_child() ) bitonicMerger(lo,n,dir,a) );
        set_ref_count(count);
        spawn_and_wait_for_all(list1);

    }
    return NULL;
}

 };




 class TBitonicSort : public tbb::filter{


public:
    TBitonicSort();
/*override*/void* operator()( void* item );


size_t *a;

private : static const  bool ASCENDING=true, DESCENDING=false;
private : tbb::concurrent_queue<string> outQueue;

public :void sort(size_t *b,int n)
{
    a=b;        
    bitonicSorter& tt = *new(tbb::task::allocate_root())   bitonicSorter(0,n,ASCENDING,a);
    tbb::task::spawn_root_and_wait(tt); 
}




};

string convertInt(int number)
{
   stringstream ss;//create a stringstream
   ss << number;//add number to the stream
   return ss.str();//return a string with the contents of the stream
}




TBitonicSort::TBitonicSort() :
filter(/*is_serial=*/true) 
{}

/*override*/void* TBitonicSort::operator()( void* item ) {

int num_elem=2048;
size_t *max = new size_t[num_elem];
tbb::concurrent_queue<string>& queue = *static_cast<tbb::concurrent_queue<string>*>(item);

string line;

while(!queue.try_pop(line))
    this_tbb_thread::yield();   

istringstream iss(line);
int i=0;
do
{
    string sub;
    iss >> sub;
    max[i]=atoi(sub.c_str());;
    i++;
} while (iss);

sort(max,num_elem);

string out;


for(int i=0;i<num_elem;i++)
{
    out.append(convertInt(max[i]).append(" "));
}


outQueue.push(out);

return &outQueue;
}

int run_pipe(int threads)
{
    FILE* output_file = fopen("test.txt","w");
    if( !output_file ) {
        perror( "test.txt" );
        return 0;
    }
    char* input_file="sample.txt";


    tbb::pipeline pipeline;

    FileReaderFilter reader(input_file);
    pipeline.add_filter(reader);

    TBitonicSort sorter;
    pipeline.add_filter(sorter);

    FileWriterFilter writer(output_file);
    pipeline.add_filter(writer);

    tbb::tick_count t0 = tbb::tick_count::now();
    pipeline.run(threads);
    tbb::tick_count t1 = tbb::tick_count::now();

    fclose( output_file );
    pipeline.clear();

    if(threads==1){
         printf("serial run   time = %g\n", (t1-t0).seconds());
    }
    else{
        printf("parallel run time = %g\n", (t1-t0).seconds());
    }

    return 0;
 }

int main() {

int threads[2]={1,3};


for(int i=0;i<2;i++)
{
    run_pipe(threads[i]);
}
system("PAUSE");
}
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top