Question

I'd like to develop a multithreaded UDP server in C/Linux. The service is running on a single port x, thus there's only the possibility to bind a single UDP socket to it. In order to work under high loads, I have n threads (statically defined), say 1 thread per CPU. Work could be delivered to the thread using epoll_wait, so threads get woken up on demand with 'EPOLLET | EPOLLONESHOT'. I've attached a code example:

static int epfd;
static sig_atomic_t sigint = 0;

...

/* Thread routine with epoll_wait */
static void *process_clients(void *pevents)
{
    int rc, i, sock, nfds;
    struct epoll_event ep, *events = (struct epoll_event *) pevents;

    while (!sigint) {
        nfds = epoll_wait(epfd, events, MAX_EVENT_NUM, 500);

        for (i = 0; i < nfds; ++i) {
           if (events[i].data.fd < 0)
                continue;

           sock = events[i].data.fd;

           if((events[i].events & EPOLLIN) == EPOLLIN) {
               printf("Event dispatch!\n");
               handle_request(sock); // do a recvfrom
           } else
               whine("Unknown poll event!\n");

           memset(&ep, 0, sizeof(ep));
           ep.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
           ep.data.fd = sock;

           rc = epoll_ctl(epfd, EPOLL_CTL_MOD, sock, &ep);
           if(rc < 0)
               error_and_die(EXIT_FAILURE, "Cannot add socket to epoll!\n");
       }
    }

    pthread_exit(NULL);
}

int main(int argc, char **argv)
{
    int rc, i, cpu, sock, opts;
    struct sockaddr_in sin;
    struct epoll_event ep, *events;
    char *local_addr = "192.168.1.108";
    void *status;
    pthread_t *threads = NULL;
    cpu_set_t cpuset;

    threads = xzmalloc(sizeof(*threads) * MAX_THRD_NUM);
    events = xzmalloc(sizeof(*events) * MAX_EVENT_NUM);

    sock = socket(PF_INET, SOCK_DGRAM, 0);
    if (sock < 0)
        error_and_die(EXIT_FAILURE, "Cannot create socket!\n");

    /* Non-blocking */
    opts = fcntl(sock, F_GETFL);
    if(opts < 0)
        error_and_die(EXIT_FAILURE, "Cannot fetch sock opts!\n");
    opts |= O_NONBLOCK;
    rc = fcntl(sock, F_SETFL, opts);
    if(rc < 0)
        error_and_die(EXIT_FAILURE, "Cannot set sock opts!\n");

    /* Initial epoll setup */
    epfd = epoll_create(MAX_EVENT_NUM);
    if(epfd < 0)
        error_and_die(EXIT_FAILURE, "Error fetching an epoll descriptor!\n");

    memset(&ep, 0, sizeof(ep));
    ep.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
    ep.data.fd = sock;

    rc = epoll_ctl(epfd, EPOLL_CTL_ADD, sock, &ep);
    if(rc < 0)
        error_and_die(EXIT_FAILURE, "Cannot add socket to epoll!\n");

    /* Socket binding */
    sin.sin_family = AF_INET;
    sin.sin_addr.s_addr = inet_addr(local_addr);
    sin.sin_port = htons(port_xy);

    rc = bind(sock, (struct sockaddr *) &sin, sizeof(sin));
    if (rc < 0)
        error_and_die(EXIT_FAILURE, "Problem binding to port! "
                      "Already in use?\n");

    register_signal(SIGINT, &signal_handler);

    /* Thread initialization */
    for (i = 0, cpu = 0; i < MAX_THRD_NUM; ++i) {
        rc = pthread_create(&threads[i], NULL, process_clients, events);
        if (rc != 0)
            error_and_die(EXIT_FAILURE, "Cannot create pthread!\n");

        CPU_ZERO(&cpuset);
        CPU_SET(cpu, &cpuset);

        rc = pthread_setaffinity_np(threads[i], sizeof(cpuset), &cpuset);
        if (rc != 0)
            error_and_die(EXIT_FAILURE, "Cannot create pthread!\n");

        cpu = (cpu + 1) % NR_CPUS_ON;
    }

    printf("up and running!\n");

    /* Thread joining */
    for (i = 0; i < MAX_THRD_NUM; ++i) {
        rc = pthread_join(threads[i], &status);
        if (rc != 0)
            error_and_die(EXIT_FAILURE, "Error on thread exit!\n");
    }

    close(sock);
    xfree(threads);
    xfree(events);

    printf("shut down!\n");

    return 0;
}

Is this the proper way of handling this scenario with epoll? Should the function _handle_request_ return as fast as possible, because for this time the eventqueue for the socket is blocked?!

Thanks for replies!

Was it helpful?

Solution

As you are only using a single UDP socket, there is no point using epoll - just use a blocking recvfrom instead.

Now, depending on the protocol you need to handle - if you can process each UDP packet individually - you can actually call recvfrom concurrently from multiple threads (in a thread pool). The OS will take care that exactly one thread will receive the UDP packet. This thread can then do whatever it needs to do in handle_request.

However, if you need to process the UDP packets in a particular order, you'll probably not have that many opportunities to parallalise your program...

OTHER TIPS

No, this will not work the way you want to. To have worker threads process events arriving through an epoll interface, you need a different architecture.

Example design (there are several ways to do this) Uses: SysV/POSIX semaphores.

  • Have the master thread spawn n subthreads and a semaphore, then block epolling your sockets (or whatever).

  • Have each subthread block on down-ing the semaphore.

  • When the master thread unblocks, it stores the events in some global structure and ups the semaphore once per event.

  • The subthreads unblock, process the events, block again when the semaphore returns to 0.

You can use a pipe shared among all threads to achieve very similar functionality to that of the semaphore. This would let you block on select() instead of the semaphore, which you can use to wake the threads up on some other event (timeouts, other pipes, etc.)

You can also reverse this control, and have the master thread wake up when its workers demand tasks. I think the above approach is better for your case, though.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top