Question

I have a small thread pool example for learn use:

It takes work structs which represent a simple calculation (+, -, *, /) of two values. Now the worker threads will pull these work structs out of the queue and execute them.

So much to the theory. The problem now is that when I want to use the helper function submit_work() somehow the queue resolving in the worker thread does not work (the data pulled out is invalid and when I use QUEUE_REMOVE it does an segmentation fault).

How can it be that by just using an helper function the whole concept does not work anymore?

pool.c

I mean pushing data manually works fine...

#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#include "../queue/queue.h"

#define MAX_THREADS 1

pthread_t threads[MAX_THREADS];

pthread_cond_t cond;   

pthread_mutex_t mutex;

/**
 * QUEUE is a "void * arr[2]".
 * The Queue itself is similar to the circularly linked list in linux
 */
QUEUE queue;

struct work_s {
    int a;
    int b;
    int type;
    QUEUE node;
};

void * worker();
void submit_work(int a, int b, int type);

int main() {
    QUEUE_INIT(&queue);

    pthread_cond_init(&cond, NULL);
    pthread_mutex_init(&mutex, NULL);

    struct work_s work[2];

    /* 5 + 7 */
    work[0].a = 5;
    work[0].b = 7;
    work[0].type = 1;

    /* 3 x 3 */
    work[1].a = 3;
    work[1].b = 3;
    work[1].type = 3;

    /* initialize there queue nodes */
    QUEUE_INIT(&work[0].node);
    QUEUE_INIT(&work[1].node);

    /* insert both tasks into the work queue */
    QUEUE_INSERT_TAIL(&queue, &work[0].node);
    QUEUE_INSERT_TAIL(&queue, &work[1].node);

    /* this does actually the same as above but causes a segmentation fault. */
    submit_work(5, 6, 3);

    for (int i = 0; i < MAX_THREADS; i++)
        pthread_create(&threads[i], NULL, worker, NULL);
    for (int i = 0; i < MAX_THREADS; i++)
        pthread_join(threads[i], NULL);
    for (int i = 0; i < MAX_THREADS; i++)
        pthread_detach(threads[i]);

    pthread_mutex_destroy(&mutex);
    pthread_cond_destroy(&cond);

    return 0;
}

void submit_work(int a, int b, int type) {
    struct work_s work;

    work.a = a;
    work.b = b;
    work.type = type;

    pthread_mutex_lock(&mutex);

    QUEUE_INIT(&work.node);
    QUEUE_INSERT_TAIL(&queue, &work.node);

    pthread_mutex_unlock(&mutex);

    pthread_cond_signal(&cond);
}

void * worker() { 
    /* a pointer to a queue node */
    QUEUE * q;

    int result;

    struct work_s * work;

    /* infinite loop */
    for (;;) {
        while (QUEUE_EMPTY(&queue)) {
            pthread_cond_wait(&cond, &mutex);
        }

        pthread_mutex_lock(&mutex);

        q = QUEUE_HEAD(&queue);

        /* HERE THE SEGMENTSTION FAULT OCCURS when using submit_work */
        QUEUE_REMOVE(q);

        pthread_mutex_unlock(&mutex);

        /* set the work pointer to the work struct we have pulled from queue */
        work = QUEUE_DATA(q, struct work_s, node);

        /* PRINTS INCORRECT DATA on submit_work() */
        printf("received work type %d with a %d and b %d \n", work->a, work->b, work->type);

        if (work->type == 0) {
            break;
        }

        switch (work->type) {
            case 1:
                result = work->a + work->b;
                printf("%d + %d = %d\n", work->a, work->b, result);
                break;
            case 2:
                result = work->a - work->b;
                printf("%d - %d = %d\n", work->a, work->b, result);
                break;
            case 3:
                result = work->a * work->b;
                printf("%d * %d = %d\n", work->a, work->b, result);
                break;
            case 4:
                result = work->a / work->b;
                printf("%d / %d = %d\n", work->a, work->b, result);
                break;
        }
    }


    pthread_exit(NULL);
}

queue.h

/* Copyright (c) 2013, Ben Noordhuis <info@bnoordhuis.nl>
 *
 * Permission to use, copy, modify, and/or distribute this software for any
 * purpose with or without fee is hereby granted, provided that the above
 * copyright notice and this permission notice appear in all copies.
 *
 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
 */

#ifndef QUEUE_H_
#define QUEUE_H_

typedef void *QUEUE[2];

/* Private macros. */
#define QUEUE_NEXT(q)       ((*(q))[0])
#define QUEUE_PREV(q)       ((*(q))[1])
#define QUEUE_PREV_NEXT(q)  (QUEUE_NEXT((QUEUE *) QUEUE_PREV(q)))
#define QUEUE_NEXT_PREV(q)  (QUEUE_PREV((QUEUE *) QUEUE_NEXT(q)))

/* Public macros. */
#define QUEUE_DATA(ptr, type, field)                                          \
  ((type *) ((char *) (ptr) - ((long) &((type *) 0)->field)))

#define QUEUE_FOREACH(q, h)                                                   \
  for ((q) = (*(h))[0]; (q) != (h); (q) = (*(q))[0])

#define QUEUE_EMPTY(q)                                                        \
  (QUEUE_NEXT(q) == (q))

#define QUEUE_HEAD(q)                                                         \
  (QUEUE_NEXT(q))

#define QUEUE_INIT(q)                                                         \
  do {                                                                        \
    QUEUE_NEXT(q) = (q);                                                      \
    QUEUE_PREV(q) = (q);                                                      \
  }                                                                           \
  while (0)

#define QUEUE_ADD(h, n)                                                       \
  do {                                                                        \
    QUEUE_PREV_NEXT(h) = QUEUE_NEXT(n);                                       \
    QUEUE_NEXT_PREV(n) = QUEUE_PREV(h);                                       \
    QUEUE_PREV(h) = QUEUE_PREV(n);                                            \
    QUEUE_PREV_NEXT(h) = (h);                                                 \
  }                                                                           \
  while (0)

#define QUEUE_SPLIT(h, q, n)                                                  \
  do {                                                                        \
    QUEUE_PREV(n) = QUEUE_PREV(h);                                            \
    QUEUE_PREV_NEXT(n) = (n);                                                 \
    QUEUE_NEXT(n) = (q);                                                      \
    QUEUE_PREV(h) = QUEUE_PREV(q);                                            \
    QUEUE_PREV_NEXT(h) = (h);                                                 \
    QUEUE_PREV(q) = (n);                                                      \
  }                                                                           \
  while (0)

#define QUEUE_INSERT_HEAD(h, q)                                               \
  do {                                                                        \
    QUEUE_NEXT(q) = QUEUE_NEXT(h);                                            \
    QUEUE_PREV(q) = (h);                                                      \
    QUEUE_NEXT_PREV(q) = (q);                                                 \
    QUEUE_NEXT(h) = (q);                                                      \
  }                                                                           \
  while (0)

#define QUEUE_INSERT_TAIL(h, q)                                               \
  do {                                                                        \
    QUEUE_NEXT(q) = (h);                                                      \
    QUEUE_PREV(q) = QUEUE_PREV(h);                                            \
    QUEUE_PREV_NEXT(q) = (q);                                                 \
    QUEUE_PREV(h) = (q);                                                      \
  }                                                                           \
  while (0)

#define QUEUE_REMOVE(q)                                                       \
  do {                                                                        \
    QUEUE_PREV_NEXT(q) = QUEUE_NEXT(q);                                       \
    QUEUE_NEXT_PREV(q) = QUEUE_PREV(q);                                       \
  }                                                                           \
  while (0)

#endif /* QUEUE_H_ */

If I should also copy the QUEUE.h file leave a comment. Else you find it here

Bodo

Was it helpful?

Solution

The problem is that you are inserting a local variable of submit_work() onto the queue (the queue routines do not make a copy of the object they are queueing). So once submit_work() returns, the data in the structure that was queued is no longer valid (and likely changes the link pointers to garbage).

This doesn't occur for the first two items enqueued because their lifetime is still valid while the thread functions execute since main() blocks in the pthread_join() calls.

You should be queueing items that are dynamically allocated so their lifetime extends past the lfe of the function that queues them. Of course, then you also need to have the functions that dequeue the items free the memory. Of course this means that you should also dynamically allocate the items enqueued directly by main().

OTHER TIPS

You execute this loop in worker() without holding the mutex:

while (QUEUE_EMPTY(&queue)) {
    pthread_cond_wait(&cond, &mutex);
}

pthread_cond_wait() can only be called while holding the mutex you pass to it - it'll also return with that mutex locked, so you shouldn't immediately call pthread_mutex_lock(&mutex) after that loop - it's already locked.

So that bit of code should look something like:

pthread_mutex_lock(&mutex);
while (QUEUE_EMPTY(&queue)) {
    pthread_cond_wait(&cond, &mutex);
}

// don't call pthread_mutex_lock(&mutex) here...
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top