I am simulating a udp server which receives multiple request messages via udp client. I need to spawn 2 threads in this udp server sample code. thread 1 will receive all the udp request as and when they arrive and sent it to thread 2 via message queue for processing.
thread 1(SndMq) is able to receive the request message at via udp and also able to put it in message queue using a structure. But the problem is the thread 2(RcvMq) is not able to read even single message in the message queue. Please suggest any solution.
Error part (thread 2 function):
void* RcvMq(void*) {
time_t t2;
time(&t2);
msgst *msg = new msgst;
int mqid = msgget((key_t)12345, 0666 | IPC_CREAT);
if (mqid == -1) {
printf("thread 2: msgget failed with error: %d\n", errno);
exit(EXIT_FAILURE);
}
else
printf("\nthread 2: got MQ id: %d\n", mqid);
long receive_m_type = 123;
printf("thread 2: going to enter while(1) loop\n");
while(1) {
printf("\nthread 2: inside while(1) loop, going to msgrcv()\n");
if ( msgrcv(mqid, (void *)&msg, BUFSIZE, receive_m_type, 0) < 0 ) {
//printf("\nthread 2: msgrcv failed with error: [%s]\n", strerror(errno));
printf("\nthread 2: msgrcv failed with error\n");
exit(EXIT_FAILURE);
}
else {
printf("thread 2: going to ctime()\n");
printf("\nthread 2: DeQueued at: %s\n", ctime(&t2));
}
printf("\nthread 2: going to print struct's buffer\n");
printf("thread 2: Message received at MQ rcv is: %s \n", msg->buffer);
}
}
code for udp server simulator
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <time.h>
#include <netdb.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <sys/types.h>
#include <sys/msg.h>
#include<pthread.h>
#define PORT 5000
#define BUFSIZE 2048
pthread_t sndtid;
pthread_t rcvtid;
typedef struct msgst {
long int mtype;
char buffer[BUFSIZE];
}msgst;
void* SndMq(void*) {
time_t t1;
time(&t1);
struct sockaddr_in myaddr; /* our address */
struct sockaddr_in remaddr; /* remote address */
socklen_t addrlen = sizeof(remaddr); /* length of addresses */
int recvlen; /* # bytes received */
int fd; /* our socket */
int rc, on=1;
char buf[BUFSIZE]; /* receive buffer */
size_t buflen;
/* create a UDP socket: socket, setsockopt, bind */
if ((fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
perror("cannot create socket\n");
return 0;
}
if((rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *)&on, sizeof(on))) < 0) {
perror("thread 1: UDP Server setsockopt() - ERROR");
close(fd);
exit(-1);
}
/* bind the socket to any valid IP address and a specific port */
memset((char *)&myaddr, 0, sizeof(myaddr));
myaddr.sin_family = AF_INET;
myaddr.sin_addr.s_addr = htonl(INADDR_ANY);
//myaddr.sin_port = htons(SERVICE_PORT);
myaddr.sin_port = htons(PORT);
if ( bind(fd, (struct sockaddr *)&myaddr, sizeof(myaddr) ) < 0) {
perror("bind failed");
return 0;
}
printf("\nthread 1: Server waiting on port %d\n", PORT);
//MsgQ implementation:
int mqid = msgget((key_t)12345, 0666 | IPC_CREAT);
if(mqid == -1) {
printf("thread 1: msgget() failed, errno: %d\n",errno );
}
else
printf("thread 1: created MQ id: %d\n", mqid);
/* now loop, receiving data and printing what we received */
for (;;) {
//rcv from udp client
recvlen = recvfrom(fd, buf, BUFSIZE, 0, (struct sockaddr *)&remaddr, &addrlen);
printf("\nthread 1: received %d bytes\n", recvlen);
if (recvlen > 0) {
buf[recvlen] = 0;
printf("thread 1: received message from UDP client: %s\n", buf);
}
//send received msg via MsgQ
msgst *msgptr = new msgst;
msgptr->mtype = 123;
strcpy(msgptr->buffer, buf);
//msgptr->buffer=buf;
//memcpy(msgptr->buffer, buf, BUFSIZE);
buflen = strlen(msgptr->buffer);
if (msgsnd(mqid, (void *)&msgptr, buflen, IPC_NOWAIT) == -1) {
printf("thread 1: msgsnd failed\n");
exit(EXIT_FAILURE);
}
printf("thread 1: added Req to MsgQ at: %s\n", ctime(&t1));
}/* never exits */
}
void* RcvMq(void*) {
time_t t2;
time(&t2);
msgst *msg = new msgst;
int mqid = msgget((key_t)12345, 0666 | IPC_CREAT);
if (mqid == -1) {
printf("thread 2: msgget failed with error: %d\n", errno);
exit(EXIT_FAILURE);
}
else
printf("\nthread 2: got MQ id: %d\n", mqid);
long receive_m_type = 123;
printf("thread 2: going to enter while(1) loop\n");
while(1) {
printf("\nthread 2: inside while(1) loop, going to msgrcv()\n");
if ( msgrcv(mqid, (void *)&msg, BUFSIZE, receive_m_type, 0) < 0 ) {
//printf("\nthread 2: msgrcv failed with error: [%s]\n", strerror(errno));
printf("\nthread 2: msgrcv failed with error\n");
exit(EXIT_FAILURE);
}
else {
printf("thread 2: going to ctime()\n");
printf("\nthread 2: DeQueued at: %s\n", ctime(&t2));
}
printf("\nthread 2: going to print struct's buffer\n");
printf("thread 2: Message received at MQ rcv is: %s \n", msg->buffer);
}
}
int main() {
int rtn=0;
rtn = pthread_create(&(sndtid), NULL, &SndMq, NULL);
if (rtn != 0)
printf("\ncan't create thread SndMq, error:[%s]", strerror(rtn));
else
printf("\n\ncreated SndMq thread\n");
rtn = pthread_create(&(rcvtid), NULL, &RcvMq, NULL);
if (rtn != 0)
printf("\ncan't create thread RcvMq :[%s]", strerror(rtn));
else
printf("created RcvMq thread\n");
printf("main(): All threads spawned successfully, now waiting for threads to end...\n");
pthread_join(sndtid, NULL);
pthread_join(rcvtid, NULL);
printf("\nThreads ended, main() exiting now...\n");
return 0;
}
output:
created SndMq thread
created RcvMq thread
main(): All threads spawned successfully, now waiting for threads to end...
thread 2: got MQ id: 6422528
thread 2: going to enter while(1) loop
thread 2: inside while(1) loop, going to msgrcv()
thread 1: Server waiting on port 5000
thread 1: created MQ id: 6422528
thread 1: received 1500 bytes
thread 1: received message from UDP client: <TransactionId>1</TransactionId>
thread 1: added Req to MsgQ at: Sat Apr 5 18:16:45 2014
thread 1: received 1500 bytes
thread 1: received message from UDP client: <TransactionId>2</TransactionId>
thread 1: added Req to MsgQ at: Sat Apr 5 18:16:45 2014
thread 1: received 1500 bytes
thread 1: received message from UDP client: <TransactionId>3</TransactionId>
thread 1: added Req to MsgQ at: Sat Apr 5 18:16:45 2014
thread 1: received 1500 bytes
thread 1: received message from UDP client: <TransactionId>4</TransactionId>
thread 1: added Req to MsgQ at: Sat Apr 5 18:16:45 2014
thread 1: received 1500 bytes
thread 1: received message from UDP client: <TransactionId>5</TransactionId>
thread 1: added Req to MsgQ at: Sat Apr 5 18:16:45 2014
thread 1: received 1500 bytes
thread 1: received message from UDP client: <TransactionId>6</TransactionId>
thread 1: added Req to MsgQ at: Sat Apr 5 18:16:45 2014
thread 1: received 1500 bytes
thread 1: received message from UDP client: <TransactionId>7</TransactionId>
thread 1: added Req to MsgQ at: Sat Apr 5 18:16:45 2014
thread 1: received 1500 bytes
thread 1: received message from UDP client: <TransactionId>8</TransactionId>
thread 1: added Req to MsgQ at: Sat Apr 5 18:16:45 2014
thread 1: received 1500 bytes
thread 1: received message from UDP client: <TransactionId>9</TransactionId>
thread 1: added Req to MsgQ at: Sat Apr 5 18:16:45 2014
thread 1: received 1500 bytes
thread 1: received message from UDP client: <TransactionId>10</TransactionId>
thread 1: added Req to MsgQ at: Sat Apr 5 18:16:45 2014