Question

I am simulating a udp server which receives multiple request messages via udp client. I need to spawn 2 threads in this udp server sample code. thread 1 will receive all the udp request as and when they arrive and sent it to thread 2 via message queue for processing.

thread 1(SndMq) is able to receive the request message at via udp and also able to put it in message queue using a structure. But the problem is the thread 2(RcvMq) is not able to read even single message in the message queue. Please suggest any solution.

Error part (thread 2 function):

void* RcvMq(void*) {

        time_t t2;
        time(&t2);

        msgst *msg = new msgst;

        int mqid = msgget((key_t)12345, 0666 | IPC_CREAT);
        if (mqid == -1) {
                printf("thread 2: msgget failed with error: %d\n", errno);
                exit(EXIT_FAILURE);
        }
        else
            printf("\nthread 2: got MQ id: %d\n", mqid);

        long receive_m_type = 123;

        printf("thread 2: going to enter while(1) loop\n");
        while(1) {
                printf("\nthread 2: inside while(1) loop, going to msgrcv()\n");
                if ( msgrcv(mqid, (void *)&msg, BUFSIZE, receive_m_type, 0) < 0 ) {
                        //printf("\nthread 2: msgrcv failed with error: [%s]\n", strerror(errno));
                        printf("\nthread 2: msgrcv failed with error\n");
                        exit(EXIT_FAILURE);
                }
                else {
                        printf("thread 2: going to ctime()\n");
                        printf("\nthread 2: DeQueued at: %s\n", ctime(&t2));
                }
                printf("\nthread 2: going to print struct's buffer\n");
                printf("thread 2: Message received at MQ rcv is: %s \n", msg->buffer);
        }
}

code for udp server simulator

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <time.h>
#include <netdb.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <sys/types.h>
#include <sys/msg.h>
#include<pthread.h>

#define PORT 5000
#define BUFSIZE 2048

pthread_t sndtid;
pthread_t rcvtid;

typedef struct msgst  {
        long int mtype;
        char buffer[BUFSIZE];
    }msgst;

void* SndMq(void*) {

        time_t t1;
        time(&t1);
        struct sockaddr_in myaddr;  /* our address */
        struct sockaddr_in remaddr; /* remote address */
        socklen_t addrlen = sizeof(remaddr);    /* length of addresses */
        int recvlen;                /* # bytes received */
        int fd;                     /* our socket */
        int rc, on=1;
        char buf[BUFSIZE];  /* receive buffer */
        size_t buflen;
        /* create a UDP socket: socket, setsockopt, bind */
        if ((fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
                perror("cannot create socket\n");
                return 0;
        }
        if((rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *)&on, sizeof(on))) < 0) {
                perror("thread 1: UDP Server setsockopt() - ERROR");
                close(fd);
                exit(-1);
        }
        /* bind the socket to any valid IP address and a specific port */
        memset((char *)&myaddr, 0, sizeof(myaddr));
        myaddr.sin_family = AF_INET;
        myaddr.sin_addr.s_addr = htonl(INADDR_ANY);
        //myaddr.sin_port = htons(SERVICE_PORT);
        myaddr.sin_port = htons(PORT);
        if (    bind(fd, (struct sockaddr *)&myaddr, sizeof(myaddr) ) < 0) {
                perror("bind failed");
                return 0;
        }

        printf("\nthread 1: Server waiting on port %d\n", PORT);

        //MsgQ implementation:
        int mqid = msgget((key_t)12345, 0666 | IPC_CREAT);
        if(mqid == -1) {
                printf("thread 1: msgget() failed, errno: %d\n",errno );
        }
        else
            printf("thread 1: created MQ id: %d\n", mqid);

        /* now loop, receiving data and printing what we received */
        for (;;) {

                //rcv from udp client
                recvlen = recvfrom(fd, buf, BUFSIZE, 0, (struct sockaddr *)&remaddr, &addrlen);
                printf("\nthread 1: received %d bytes\n", recvlen);
                if (recvlen > 0) {
                        buf[recvlen] = 0;
                        printf("thread 1: received message from UDP client: %s\n", buf);
                }

                //send received msg via MsgQ
                msgst *msgptr = new msgst;
                msgptr->mtype = 123;
                strcpy(msgptr->buffer, buf);
                //msgptr->buffer=buf;
                //memcpy(msgptr->buffer, buf, BUFSIZE);
                buflen = strlen(msgptr->buffer);

                if (msgsnd(mqid, (void *)&msgptr, buflen, IPC_NOWAIT) == -1) {
                        printf("thread 1: msgsnd failed\n");
                        exit(EXIT_FAILURE);
                }
                printf("thread 1: added Req to MsgQ at: %s\n", ctime(&t1));
        }/* never exits */
    }
void* RcvMq(void*) {

        time_t t2;
        time(&t2);

        msgst *msg = new msgst;

        int mqid = msgget((key_t)12345, 0666 | IPC_CREAT);
        if (mqid == -1) {
                printf("thread 2: msgget failed with error: %d\n", errno);
                exit(EXIT_FAILURE);
        }
        else
            printf("\nthread 2: got MQ id: %d\n", mqid);

        long receive_m_type = 123;

        printf("thread 2: going to enter while(1) loop\n");
        while(1) {
                printf("\nthread 2: inside while(1) loop, going to msgrcv()\n");
                if ( msgrcv(mqid, (void *)&msg, BUFSIZE, receive_m_type, 0) < 0 ) {
                        //printf("\nthread 2: msgrcv failed with error: [%s]\n", strerror(errno));
                        printf("\nthread 2: msgrcv failed with error\n");
                        exit(EXIT_FAILURE);
                }
                else {
                        printf("thread 2: going to ctime()\n");
                        printf("\nthread 2: DeQueued at: %s\n", ctime(&t2));
                }
                printf("\nthread 2: going to print struct's buffer\n");
                printf("thread 2: Message received at MQ rcv is: %s \n", msg->buffer);
        }
}
int main() {

        int rtn=0;

        rtn = pthread_create(&(sndtid), NULL, &SndMq, NULL);
        if (rtn != 0)
                printf("\ncan't create thread SndMq, error:[%s]", strerror(rtn));
        else
                printf("\n\ncreated SndMq thread\n");

        rtn = pthread_create(&(rcvtid), NULL, &RcvMq, NULL);
        if (rtn != 0)
                printf("\ncan't create thread RcvMq :[%s]", strerror(rtn));
        else
                printf("created RcvMq thread\n");

        printf("main(): All threads spawned successfully, now waiting for threads to end...\n");
        pthread_join(sndtid, NULL);
        pthread_join(rcvtid, NULL);
        printf("\nThreads ended, main() exiting now...\n");

        return 0;
}

output:

created SndMq thread
created RcvMq thread
main(): All threads spawned successfully, now waiting for threads to end...

thread 2: got MQ id: 6422528
thread 2: going to enter while(1) loop

thread 2: inside while(1) loop, going to msgrcv()

thread 1: Server waiting on port 5000
thread 1: created MQ id: 6422528

thread 1: received 1500 bytes
thread 1: received message from UDP client: <TransactionId>1</TransactionId>
thread 1: added Req to MsgQ at: Sat Apr  5 18:16:45 2014


thread 1: received 1500 bytes
thread 1: received message from UDP client: <TransactionId>2</TransactionId>
thread 1: added Req to MsgQ at: Sat Apr  5 18:16:45 2014


thread 1: received 1500 bytes
thread 1: received message from UDP client: <TransactionId>3</TransactionId>
thread 1: added Req to MsgQ at: Sat Apr  5 18:16:45 2014


thread 1: received 1500 bytes
thread 1: received message from UDP client: <TransactionId>4</TransactionId>
thread 1: added Req to MsgQ at: Sat Apr  5 18:16:45 2014


thread 1: received 1500 bytes
thread 1: received message from UDP client: <TransactionId>5</TransactionId>
thread 1: added Req to MsgQ at: Sat Apr  5 18:16:45 2014


thread 1: received 1500 bytes
thread 1: received message from UDP client: <TransactionId>6</TransactionId>
thread 1: added Req to MsgQ at: Sat Apr  5 18:16:45 2014


thread 1: received 1500 bytes
thread 1: received message from UDP client: <TransactionId>7</TransactionId>
thread 1: added Req to MsgQ at: Sat Apr  5 18:16:45 2014


thread 1: received 1500 bytes
thread 1: received message from UDP client: <TransactionId>8</TransactionId>
thread 1: added Req to MsgQ at: Sat Apr  5 18:16:45 2014


thread 1: received 1500 bytes
thread 1: received message from UDP client: <TransactionId>9</TransactionId>
thread 1: added Req to MsgQ at: Sat Apr  5 18:16:45 2014


thread 1: received 1500 bytes
thread 1: received message from UDP client: <TransactionId>10</TransactionId>
thread 1: added Req to MsgQ at: Sat Apr  5 18:16:45 2014
Was it helpful?

Solution

(1) Your primary problem is a C error. You are passing the address of a ptr (a ptr to ptr) rather than the pointer.

msgst *msgptr = new msgst;
//...

if (msgsnd(mqid, (void *)&msgptr, buflen, IPC_NOWAIT) == -1) 

Should be

if (msgsnd(mqid, (void *) msgptr, buflen, IPC_NOWAIT) == -1)

As a result you are writing whatever random junk is at &msgptr and since you are writing msgtype 123 and then trying to read msgtype 123 later (but wrote junk) you will never (or very rarely!) find that message type.

You make the same mistake on the message receive side:

msgst *msg = new msgst;
//.......

if ( msgrcv(mqid, (void *)&msg, BUFSIZE, receive_m_type, 0) < 0 ) 

(2) You probably didn't get to it yet but you have a memory leak because you new the buffer but never delete it. You don't really need dynamic memory allocation here anyway. It would be easier to just have the buffer allocated on the stack because it just going to be copied by msgsnd anyway and then you are done with it. This would have also eliminated the problem you ran into with (1).

(3) You are going to run into problems with IPC_NOWAIT on the msgsnd. It will be very easy to fill the queue on the producer side before they get processed by the consumer. At the very least you should be checking for EAGAIN and taking whatever action is appropriate when the write fails.

OTHER TIPS

Probably not related to the issue asked about.

However in SndMq() if recvfrom() receives BUFSIZE bytes then the code writes out of buf's bounds here:

    buf[recvlen] = 0;

Update:

To fix this changes this line

recvlen = recvfrom(fd, buf, BUFSIZE, 0, (struct sockaddr *)&remaddr, &addrlen);

to become

recvlen = recvfrom(fd, buf, BUFSIZE - 1, 0, (struct sockaddr *)&remaddr, &addrlen);
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top