I have to send a minor amount of data (~1k bytes) from thread A
to all threads threads B-1
to threads B-n
.
My current implementation is rather complex:
Use GHashTable
to map queues to thread id
s. Put all threads B-x
into a waiting state via GCond
and g_cond_wait(_until)
. Push a pointer to the data all threads should receive into each queue from thread A
, broadcast the update via g_cond_broadcast
(they all use the same GCond
instance).
If a thread decides to finish (i.e. remote DC), first remove the Queue from the GHashTable
, clear the queue and destroy contents. There are some more details I left out (like race conditions, intermediate ref/unref around the wait block).
Is this a sane approach? How can I improve this. It does not "feel" efficient at all.
Just for reference, attached some draft code:
typedef struct {
//TODO verify this is not too stupid
// if we use that mutex too often, all parallel foo is pointless
GMutex mutex;
GHashTable *hashmap; //full of queues
gint refs;
GDestroyNotify fx_ref;
GDestroyNotify fx_unref;
} Foo;
Foo *
foo_new (GDestroyNotify fx_ref, GDestroyNotify fx_unref)
{
Foo *foo;
foo = g_new0 (Foo, 1);
g_assert (foo);
g_mutex_init (&(foo->mutex));
foo->hashmap = g_hash_table_new_full ();
foo->refs = 1;
foo->fx_ref = fx_ref; //just asume this increases the refcount atomically
foo->fx_unref = fx_unref; //"" decreases ""
return foo;
}
void
foo_register_thread (Foo *obj, gint threadid)
{
AQueue *aq;
foo_lock (obj);
aq = a_queue_new ((GDestroyNotify)i_do_unref);
g_hash_table_insert (obj->hashmap, id, aq);
foo_unlock (obj);
}
void
foo_unregister_thread (Foo *obj, gint threadid)
{
AQueue *aq;
foo_lock (obj);
g_hash_table_remove (obj->hashmap, id);
// broadcast _after_ removing the queue from the hashtable,
// so the thread wakes up and quits its foo_thread_wait_until_ready call
g_cond_broadcast (obj->cond);
foo_unlock (obj);
// allow somebody to sneak in
foo_lock (obj);
a_queue_unref (aq)
foo_unlock (obj);
}
void
foo_enqueue (Foo *obj, gpointer data)
{
GHashTableIter iter;
gint key;
GAsyncQueue *queue;
//wave after wave, not wave intermixing
g_mutex_lock (&obj->mutex);
g_hash_table_iter_init (iter, obj->ht);
while (g_hash_table_iter_next (&iter, &id, &queue)) {
if (foo->fx_ref)
foo->fx_ref (data);
g_queue_push_tail (queue, data);
}
g_cond_broadcast (cond);
g_mutex_unlock (&obj->mutex);
}
gpointer
foo_thread_pop (Foo *obj, gint id)
{
AQueue *aq;
gpointer data = NULL;
g_return_val_if_fail (obj, NULL);
g_return_val_if_fail (id>0, NULL);
foo_lock (obj);
aq = g_hash_table_lookup (obj->hashmap, id);
if (aq) {
data = g_queue_pop_head ((GQueue*)aq);
}
foo_unlock (obj);
return data;
}
/**
* wait until the queue gets removed or until data is ready to be read
*/
gpointer
foo_thread_wait_until_ready (Foo *obj, gint id)
{
gpointer data = NULL;
AQueue *aq;
foo_lock (obj);
aq = (AQueue*)g_hash_table_lookup (obj->hashmap, id);
if (!aq)
return NULL;
// just in case stuff gets cleaned up in the meantime
a_queue_ref (aq);
while (g_queue_peek_head ((GQueue*)aq)==NULL) {
g_cond_wait_until (&(obj->cond), &(obj->mutex))
// make sure queue still exists, if not this means this thread is dying
if (g_hash_table_lookup (obj->hashmap, id) != (gpointer)aq)
break;
}
data = g_queue_pop_head ((GQueue*)aq);
a_queue_unref (aq);
foo_unlock (obj);
return data;
}
void
foo_destroy (Foo *obj)
{
g_return_if_fail (obj);
g_mutex_clear (&obj->mutex);
g_cond_clear (&obj->cond);
}
void
foo_unref (Foo *obj)
{
g_return_if_fail (obj);
if (g_atomic_int_dec_and_test (&obj->refs))
foo_destroy (obj);
}
void
foo_ref (Foo *obj)
{
g_return_if_fail (obj);
g_atomic_int_inc (&obj->refs);
}
void
foo_lock (Foo *obj)
{
g_return_if_fail (obj);
g_atomic_int_inc (&obj->refs);
g_mutex_lock (&obj->mutex);
}
void
foo_unlock (Foo *obj)
{
g_return_if_fail (obj);
g_mutex_unlock (&obj->mutex);
foo_unref (obj);
}