Domanda

I have to send a minor amount of data (~1k bytes) from thread A to all threads threads B-1 to threads B-n.

My current implementation is rather complex:

Use GHashTable to map queues to thread ids. Put all threads B-x into a waiting state via GCond and g_cond_wait(_until). Push a pointer to the data all threads should receive into each queue from thread A, broadcast the update via g_cond_broadcast (they all use the same GCond instance). If a thread decides to finish (i.e. remote DC), first remove the Queue from the GHashTable, clear the queue and destroy contents. There are some more details I left out (like race conditions, intermediate ref/unref around the wait block).

Is this a sane approach? How can I improve this. It does not "feel" efficient at all.

Just for reference, attached some draft code:

typedef struct {
    //TODO verify this is not too stupid
    // if we use that mutex too often, all parallel foo is pointless
    GMutex mutex;
    GHashTable *hashmap; //full of queues
    gint refs;
    GDestroyNotify fx_ref;
    GDestroyNotify fx_unref;
} Foo;


Foo *
foo_new (GDestroyNotify fx_ref, GDestroyNotify fx_unref)
{
    Foo *foo;

    foo = g_new0 (Foo, 1);
    g_assert (foo);
    g_mutex_init (&(foo->mutex));
    foo->hashmap = g_hash_table_new_full ();
    foo->refs = 1;
    foo->fx_ref = fx_ref; //just asume this increases the refcount atomically
    foo->fx_unref = fx_unref; //"" decreases ""
    return foo;
}

void
foo_register_thread (Foo *obj, gint threadid)
{
    AQueue *aq;

    foo_lock (obj);
    aq = a_queue_new ((GDestroyNotify)i_do_unref);

    g_hash_table_insert (obj->hashmap, id, aq);
    foo_unlock (obj);
}

void
foo_unregister_thread (Foo *obj, gint threadid)
{
    AQueue *aq;

    foo_lock (obj);
    g_hash_table_remove (obj->hashmap, id);
    // broadcast _after_ removing the queue from the hashtable,
    // so the thread wakes up and quits its foo_thread_wait_until_ready call
    g_cond_broadcast (obj->cond);
    foo_unlock (obj);
    // allow somebody to sneak in
    foo_lock (obj);
    a_queue_unref (aq)
    foo_unlock (obj);
}

void
foo_enqueue (Foo *obj, gpointer data)
{
    GHashTableIter iter;
    gint key;
    GAsyncQueue *queue;

    //wave after wave, not wave intermixing 
    g_mutex_lock (&obj->mutex);

    g_hash_table_iter_init (iter, obj->ht);
    while (g_hash_table_iter_next (&iter, &id, &queue)) {
        if (foo->fx_ref)
            foo->fx_ref (data);
        g_queue_push_tail (queue, data);
    }
    g_cond_broadcast (cond);

    g_mutex_unlock (&obj->mutex);
}


gpointer
foo_thread_pop (Foo *obj, gint id)
{
    AQueue *aq;
    gpointer data = NULL;

    g_return_val_if_fail (obj, NULL);
    g_return_val_if_fail (id>0, NULL);

    foo_lock (obj);
    aq = g_hash_table_lookup (obj->hashmap, id);
    if (aq) {
        data = g_queue_pop_head ((GQueue*)aq);
    }
    foo_unlock (obj);
    return data;
}


/**
 * wait until the queue gets removed or until data is ready to be read
 */
gpointer
foo_thread_wait_until_ready (Foo *obj, gint id)
{
    gpointer data = NULL;
    AQueue *aq;

    foo_lock (obj);
    aq = (AQueue*)g_hash_table_lookup (obj->hashmap, id);
    if (!aq)
        return NULL;

    // just in case stuff gets cleaned up in the meantime
    a_queue_ref (aq);


    while (g_queue_peek_head ((GQueue*)aq)==NULL) {
        g_cond_wait_until (&(obj->cond), &(obj->mutex))
        // make sure queue still exists, if not this means this thread is dying
        if (g_hash_table_lookup (obj->hashmap, id) != (gpointer)aq)
            break;
    }

    data = g_queue_pop_head ((GQueue*)aq);

    a_queue_unref (aq);

    foo_unlock (obj);

    return data;
}


void
foo_destroy (Foo *obj)
{
    g_return_if_fail (obj);
    g_mutex_clear (&obj->mutex);
    g_cond_clear (&obj->cond);
}

void
foo_unref (Foo *obj)
{
    g_return_if_fail (obj);
    if (g_atomic_int_dec_and_test (&obj->refs))
        foo_destroy (obj);
}

void
foo_ref (Foo *obj)
{
    g_return_if_fail (obj);
    g_atomic_int_inc (&obj->refs);
}

void
foo_lock (Foo *obj)
{
    g_return_if_fail (obj);
    g_atomic_int_inc (&obj->refs);
    g_mutex_lock (&obj->mutex);
}

void
foo_unlock (Foo *obj)
{
    g_return_if_fail (obj);
    g_mutex_unlock (&obj->mutex);
    foo_unref (obj);
}
È stato utile?

Soluzione

I did something similar using fork. Here is a high-level rundown of the algorithm:

Hypothesis: each thread do not need to broadcast back memory to the parent. Each requests can be treated in any order.

  • before multithreading (forking), create an array of pointers to struct (the struct provides all the necessary info for treating a queued request). Allocate a shared, mutex-controlled integer with 0 as an initial value.
  • fork for as many times as you want. (I used the number of logical CPUs +1 since the parent is mostly idle).

Then every forks goes as follow:

  • if the shared integer is not locked, lock it and increment it. unlock it. treat the element in the array corresponding to the prior to increment value.
  • if locked, try again later.
  • if the value is larger than the global number of requests, exit.

The parent simply waits that all threads are done.

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top