How to guarantee read() actually sends 100% of data sent by write() through named pipes

StackOverflow https://stackoverflow.com/questions/904866

  •  05-09-2019
  •  | 
  •  

Question

I've got the following two programs, one acting as a reader and the other as a writer. The writer seems to only send about 3/4 of the data correctly to be read by the reader. Is there any way to guarantee that all the data is being sent? I think I've got it set up so that it reads and writes reliably, but it still seems to miss 1/4 of the data.

Heres the source of the writer

#define pipe "/tmp/testPipe"

using namespace std;

queue<string> sproutFeed;


ssize_t r_write(int fd, char *buf, size_t size) {
   char *bufp;
   size_t bytestowrite;
   ssize_t byteswritten;
   size_t totalbytes;

   for (bufp = buf, bytestowrite = size, totalbytes = 0;
        bytestowrite > 0;
        bufp += byteswritten, bytestowrite -= byteswritten) {
      byteswritten = write(fd, bufp, bytestowrite);
            if(errno == EPIPE)
            {
            signal(SIGPIPE,SIG_IGN);
            }
      if ((byteswritten) == -1 && (errno != EINTR))
         return -1;
      if (byteswritten == -1)
         byteswritten = 0;
      totalbytes += byteswritten;
   }
   return totalbytes;
}


void* sendData(void *thread_arg)
{

int fd, ret_val, count, numread;
string word;
char bufpipe[5];


ret_val = mkfifo(pipe, 0777); //make the sprout pipe

if (( ret_val == -1) && (errno != EEXIST)) 
{
    perror("Error creating named pipe");
    exit(1);
}   
while(1)
{
    if(!sproutFeed.empty())
    {
        string s;
        s.clear();
        s = sproutFeed.front();
        int sizeOfData = s.length();
        snprintf(bufpipe, 5, "%04d\0", sizeOfData); 
        char stringToSend[strlen(bufpipe) + sizeOfData +1];
        bzero(stringToSend, sizeof(stringToSend));                  
        strncpy(stringToSend,bufpipe, strlen(bufpipe));         
        strncat(stringToSend,s.c_str(),strlen(s.c_str()));
        strncat(stringToSend, "\0", strlen("\0"));                  
        int fullSize = strlen(stringToSend);            
        signal(SIGPIPE,SIG_IGN);

        fd = open(pipe,O_WRONLY);
        int numWrite = r_write(fd, stringToSend, strlen(stringToSend) );
        cout << errno << endl;
        if(errno == EPIPE)
        {
        signal(SIGPIPE,SIG_IGN);
        }

        if(numWrite != fullSize )
        {               
            signal(SIGPIPE,SIG_IGN);
            bzero(bufpipe, strlen(bufpipe));
            bzero(stringToSend, strlen(stringToSend));
            close(fd);
        }
        else
        {
            signal(SIGPIPE,SIG_IGN);
            sproutFeed.pop();
            close(fd);
            bzero(bufpipe, strlen(bufpipe));
            bzero(stringToSend, strlen(stringToSend));
        }                   
    }
    else
    {
        if(usleep(.0002) == -1)
        {
            perror("sleeping error\n");
        }
    }
}

}

int main(int argc, char *argv[])
{
    signal(SIGPIPE,SIG_IGN);
    int x;
    for(x = 0; x < 100; x++)
    {
        sproutFeed.push("All ships in the sea sink except for that blue one over there, that one never sinks. Most likley because it\'s blue and thats the mightiest colour of ship. Interesting huh?");
    }
    int rc, i , status;
    pthread_t threads[1];       
    printf("Starting Threads...\n");
    pthread_create(&threads[0], NULL, sendData, NULL);
    rc = pthread_join(threads[0], (void **) &status);

}

Heres the source of the reader

#define pipe "/tmp/testPipe"

char dataString[50000];
using namespace std;
char *getSproutItem();

void* readItem(void *thread_arg)
{
    while(1)
    {
        x++;
        char *s = getSproutItem();
        if(s != NULL)
        {
            cout << "READ IN: " << s << endl;
        }
    }
}


ssize_t r_read(int fd, char *buf, size_t size) {
   ssize_t retval;
   while (retval = read(fd, buf, size), retval == -1 && errno == EINTR) ;
   return retval;
}


char * getSproutItem()
{
    cout << "Getting item" << endl;
    char stringSize[4];
    bzero(stringSize, sizeof(stringSize));
    int fd = open(pipe,O_RDONLY);
    cout << "Reading" << endl;

    int numread = r_read(fd,stringSize, sizeof(stringSize));


    if(errno == EPIPE)
    {
        signal(SIGPIPE,SIG_IGN);

    }
    cout << "Read Complete" << endl;

    if(numread > 1)
    {

        stringSize[numread] = '\0'; 
        int length = atoi(stringSize);
        char recievedString[length];
        bzero(recievedString, sizeof(recievedString));
        int numread1 = r_read(fd, recievedString, sizeof(recievedString));
        if(errno == EPIPE)
        {


signal(SIGPIPE,SIG_IGN);
    }       
    if(numread1 > 1)
    {
        recievedString[numread1] = '\0';
        cout << "DATA RECIEVED: " << recievedString << endl;
        bzero(dataString, sizeof(dataString));
        strncpy(dataString, recievedString, strlen(recievedString));
        strncat(dataString, "\0", strlen("\0"));
        close(fd);  
        return dataString;
    }
    else
    {
        return NULL;
    }

}
else
{
    return NULL;
}

close(fd);

}

int main(int argc, char *argv[])
{
        int rc, i , status;
        pthread_t threads[1];       
        printf("Starting Threads...\n");
        pthread_create(&threads[0], NULL, readItem, NULL);
        rc = pthread_join(threads[0], (void **) &status); 

}
Was it helpful?

Solution

You are definitely using signals the wrong way. Threads are completely unnecessary here - at least in the code provided. String calculations are just weird. Get this book and do not touch the keyboard until you finished reading :)

OTHER TIPS

The general method used to send data through named pipes is to tack on a header with the length of the payload. Then you read(fd, header_len); read(rd, data_len); Note the latter read() will need to be done in a loop until data_len is read or eof. Note also if you've multiple writers to a named pipe then the writes are atomic (as long as a reasonable size) I.E. multiple writers will not case partial messages in the kernel buffers.

It's difficult to say what is going on here. Maybe you are getting an error returned from one of your system calls? Are you sure that you are successfully sending all of the data?

You also appear to have some invalid code here:

    int length = atoi(stringSize);
    char recievedString[length];

This is a syntax error, since you cannot create an array on the stack using a non-constanct expression for the size. Maybe you are using different code in your real version?

Do you need to read the data in a loop? Sometimes a function will return a portion of the available data and require you to call it repeatedly until all of the data is gone.

Some system calls in Unix can also return EAGAIN if the system call is interrupted - you are not handling this case by the looks of things.

You are possibly getting bitten by POSIX thread signal handling semantics in your reader main thread. The POSIX standard allows for a POSIX thread to receive the signal, not necessarily the thread you expect. Block signals where not wanted. signal(SIG_PIPE,SIG_IGN) is your friend. Add one to reader main.

POSIX thread handling semantics, putting the POS into POSIX. ( but it does make it easier to implement POSIX threads.)

Examine the pipe in /tmp with ls ? is it not empty ?

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top