Frage

Ich möchte einen multithreaded UDP Server in C / Linux entwickeln. Der Service läuft auf einem einzigen Port x, so gibt es nur die Möglichkeit, ein einzelnes UDP-Sockets, um es zu binden. Zu arbeiten, um bei hohen Belastungen, habe ich n Fäden (statisch definiert), etwa 1 Thread pro CPU. Arbeit an den Thread zugestellt werden kann epoll_wait verwenden, so Gewinde auf Anfrage mit ‚EPOLLET aufgeweckt bekommen | EPOLLONESHOT‘. Ich habe ein Codebeispiel angebracht:

static int epfd;
static sig_atomic_t sigint = 0;

...

/* Thread routine with epoll_wait */
static void *process_clients(void *pevents)
{
    int rc, i, sock, nfds;
    struct epoll_event ep, *events = (struct epoll_event *) pevents;

    while (!sigint) {
        nfds = epoll_wait(epfd, events, MAX_EVENT_NUM, 500);

        for (i = 0; i < nfds; ++i) {
           if (events[i].data.fd < 0)
                continue;

           sock = events[i].data.fd;

           if((events[i].events & EPOLLIN) == EPOLLIN) {
               printf("Event dispatch!\n");
               handle_request(sock); // do a recvfrom
           } else
               whine("Unknown poll event!\n");

           memset(&ep, 0, sizeof(ep));
           ep.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
           ep.data.fd = sock;

           rc = epoll_ctl(epfd, EPOLL_CTL_MOD, sock, &ep);
           if(rc < 0)
               error_and_die(EXIT_FAILURE, "Cannot add socket to epoll!\n");
       }
    }

    pthread_exit(NULL);
}

int main(int argc, char **argv)
{
    int rc, i, cpu, sock, opts;
    struct sockaddr_in sin;
    struct epoll_event ep, *events;
    char *local_addr = "192.168.1.108";
    void *status;
    pthread_t *threads = NULL;
    cpu_set_t cpuset;

    threads = xzmalloc(sizeof(*threads) * MAX_THRD_NUM);
    events = xzmalloc(sizeof(*events) * MAX_EVENT_NUM);

    sock = socket(PF_INET, SOCK_DGRAM, 0);
    if (sock < 0)
        error_and_die(EXIT_FAILURE, "Cannot create socket!\n");

    /* Non-blocking */
    opts = fcntl(sock, F_GETFL);
    if(opts < 0)
        error_and_die(EXIT_FAILURE, "Cannot fetch sock opts!\n");
    opts |= O_NONBLOCK;
    rc = fcntl(sock, F_SETFL, opts);
    if(rc < 0)
        error_and_die(EXIT_FAILURE, "Cannot set sock opts!\n");

    /* Initial epoll setup */
    epfd = epoll_create(MAX_EVENT_NUM);
    if(epfd < 0)
        error_and_die(EXIT_FAILURE, "Error fetching an epoll descriptor!\n");

    memset(&ep, 0, sizeof(ep));
    ep.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
    ep.data.fd = sock;

    rc = epoll_ctl(epfd, EPOLL_CTL_ADD, sock, &ep);
    if(rc < 0)
        error_and_die(EXIT_FAILURE, "Cannot add socket to epoll!\n");

    /* Socket binding */
    sin.sin_family = AF_INET;
    sin.sin_addr.s_addr = inet_addr(local_addr);
    sin.sin_port = htons(port_xy);

    rc = bind(sock, (struct sockaddr *) &sin, sizeof(sin));
    if (rc < 0)
        error_and_die(EXIT_FAILURE, "Problem binding to port! "
                      "Already in use?\n");

    register_signal(SIGINT, &signal_handler);

    /* Thread initialization */
    for (i = 0, cpu = 0; i < MAX_THRD_NUM; ++i) {
        rc = pthread_create(&threads[i], NULL, process_clients, events);
        if (rc != 0)
            error_and_die(EXIT_FAILURE, "Cannot create pthread!\n");

        CPU_ZERO(&cpuset);
        CPU_SET(cpu, &cpuset);

        rc = pthread_setaffinity_np(threads[i], sizeof(cpuset), &cpuset);
        if (rc != 0)
            error_and_die(EXIT_FAILURE, "Cannot create pthread!\n");

        cpu = (cpu + 1) % NR_CPUS_ON;
    }

    printf("up and running!\n");

    /* Thread joining */
    for (i = 0; i < MAX_THRD_NUM; ++i) {
        rc = pthread_join(threads[i], &status);
        if (rc != 0)
            error_and_die(EXIT_FAILURE, "Error on thread exit!\n");
    }

    close(sock);
    xfree(threads);
    xfree(events);

    printf("shut down!\n");

    return 0;
}

Ist dies der richtige Weg, um dieses Szenario mit epoll der Handhabung? Sollte die Funktion _handle_request_ so schnell wie möglich zurück, weil für diese Zeit die Eventqueue für den Socket blockiert ist?!

Danke für die Antworten!

War es hilfreich?

Lösung

Wie Sie nur ein einziges UDP-Sockets verwenden, gibt es keinen Punkt epoll mit -. Nur stattdessen eine Blockierung Recvfrom verwenden

Nun, je nach Protokoll Sie behandeln müssen - wenn Sie jedes UDP-Paket einzeln verarbeiten können - man kann tatsächlich Recvfrom gleichzeitig von mehreren Threads (in einem Thread-Pool) nennen. Das Betriebssystem wird darauf achten, dass genau ein Thread das UDP-Paket empfangen wird. Dieser Thread kann dann tun, was es in handle_request tun muss.

Wenn Sie jedoch die UDP-Pakete in einer bestimmten Reihenfolge verarbeiten müssen, werden Sie wahrscheinlich haben nicht so viele Möglichkeiten, Ihr Programm parallalise ...

Andere Tipps

Nein, das wird nicht so funktionieren, Sie wollen. Haben Worker-Threads Ereignisse ankommen durch eine epoll Schnittstelle verarbeiten, benötigen Sie eine andere Architektur.

Beispiel Design (es gibt mehrere Möglichkeiten, dies zu tun) Verwendung:. SysV / POSIX Semaphore

  • Haben Sie den Master-Thread laichen n subthreads und eine Semaphore, dann blockieren epolling Ihre Steckdosen (oder was auch immer).

  • Lassen Sie jeden subthread Block auf Down-ing die Semaphore.

  • Wenn der Master-Thread deblockiert, es speichert die Ereignisse in einiger globalen Struktur und ups die Semaphore einmal pro Ereignis.

  • Die subthreads entsperren, verarbeitet die Ereignisse, Block wieder, wenn die Semaphore auf 0 zurück.

Sie können ein Rohr unter allen Threads gemeinsam verwenden, um die von der Semaphore sehr ähnliche Funktionalität zu erreichen. Dies würde Sie auf select() Block anstelle des Semaphore, mit dem Sie die Fäden können auf ein anderes Ereignis wecken (Timeouts, andere Rohre usw.)

Sie können auch diese Steuerung umkehren, und haben die Master-Thread aufwachen, wenn seine Arbeiter Aufgaben verlangen. Ich denke, dass der obige Ansatz ist besser für Ihren Fall, though.

Lizenziert unter: CC-BY-SA mit Zuschreibung
Nicht verbunden mit StackOverflow
scroll top