Question

After doing some research, I'm resorting to any feedback regarding how to effectively remove two items off a Concurrent collection. My situation involves incoming messages over UDP which are currently being placed into a BlockingCollection. Once there are two Users in the collection, I need to safely Take two users and process them. I've seen several different techniques including some ideas listed below. My current implementation is below but I'm thinking there's a cleaner way to do this while ensuring that Users are processed in groups of two. That's the only restriction in this scenario.

Current Implementation:

    private int userQueueCount = 0;
    public BlockingCollection<User> UserQueue = new BlockingCollection<User>();

    public void JoinQueue(User u)
    {
           UserQueue.Add(u);
           Interlocked.Increment(ref userQueueCount);

           if (userQueueCount > 1)
           {
               IEnumerable<User> users = UserQueue.Take(2);
               if(users.Count==2) {
                 Interlocked.Decrement(ref userQueueCount);
                 Interlocked.Decrement(ref userQueueCount);
                 ... do some work with users but if only one 
                 is removed I'll run into problems
               }

           }
    }

What I would like to do is something like this but I cannot currently test this in a production situation to ensure integrity.

 Parallel.ForEach(UserQueue.Take(2), (u) => { ... });

Or better yet:

    public void JoinQueue(User u)
    {
           UserQueue.Add(u);
           // if needed? increment
           Interlocked.Increment(ref userQueueCount);
           UserQueue.CompleteAdding();
    }

Then implement this somewhere:

        Task.Factory.StartNew(() =>
        {
            while (userQueueCount > 1) OR (UserQueue.Count > 1) If it's safe?
            {
                IEnumerable<User> users = UserQueue.Take(2);
                ... do stuff
            }

        });

The problem with this is that i'm not sure I can guarantee that between the condition (Count > 1) and the Take(2) that i'm ensuring the UserQueue has at least two items to process? Incoming UDP messages are processed in parallel so I need a way to safely pull items off of the Blocking/Concurrent Collection in pairs of two.

Is there a better/safer way to do this?

Revised Comments: The intented goal of this question is really just to achieve a stable/thread safe method of processing items off of a Concurrent Collection in .Net 4.0. It doesn't have to be pretty, it just has to be stable in the task of processing items in unordered pairs of twos in a parallel environment.

Was it helpful?

Solution

Here is what I'd do in rough Code:

ConcurrentQueuequeue = new ConcurrentQueue(); //can use a BlockingCollection too (as it's just a blocking ConcurrentQueue by default anyway)

public void OnUserStartedGame(User joiningUser)
{
   User waitingUser;
   if (this.gameQueue.TryDequeue(out waitingUser)) //if there's someone waiting, we'll get him
      this.MatchUsers(waitingUser, joiningUser);
   else
      this.QueueUser(joiningUser); //it doesn't matter if there's already someone in the queue by now because, well, we are using a queue and it will sort itself out.
}

private void QueueUser(User user)
{
   this.gameQueue.Enqueue(user);
}

private void MatchUsers(User first, User second)
{
   //not sure what you do here
}

The basic idea being that if someone's wants to start a game and there's someone in your queue, you match them and start a game - if there's no-one, add them to the queue. At best you'll only have one user in the queue at a time, but if not, well, that's not too bad either because as other users start games, the waiting ones will gradually removed and no new ones added until the queue is empty again.

OTHER TIPS

If I could not put pairs of users into the collection for some reason, I would use ConcurrentQueue and try to TryDequeue 2 items at a time, if I can get only one - put it back. Wait as necessary.

I think the easiest solution here is to use locking: you will have one lock for all consumers (producers won't use any locks), which will make sure you always take the users in the correct order:

User firstUser;
User secondUser;

lock (consumerLock)
{
    firstUser = userQueue.Take();
    secondUser = userQueue.Take();
}

Process(firstUser, secondUser);

Another option, would be to have two queues: one for single users and one for pairs of users and have a process that transfers them from the first queue to the second one.

If you don't mind having wasting another thread, you can do this with two BlockingCollections:

while (true)
{
    var firstUser = incomingUsers.Take();
    var secondUser = incomingUsers.Take();

    userPairs.Add(Tuple.Create(firstUser, secondUser));
}

You don't have to worry about locking here, because the queue for single users will have only one consumer, and the consumers of pairs can now use simple Take() safely.

If you do care about wasting a thread and can use TPL Dataflow, you can use BatchBlock<T>, which combines incoming items into batches of n items, where n is configured at the time of creation of the block, so you can set it to 2.

May this can helpd

public static IList<T> TakeMulti<T>(this BlockingCollection<T> me, int count = 100) where T : class
{
    T last = null;
    if (me.Count == 0)
    {
        last = me.Take(); // blocking when queue is empty
    }

    var result = new List<T>(count);

    if (last != null)
    {
        result.Add(last);
    }

    //if you want to take more item on this time.
    //if (me.Count < count / 2)
    //{
    //    Thread.Sleep(1000);
    //}

    while (me.Count > 0 && result.Count <= count)
    {
        result.Add(me.Take());
    }

    return result;
}
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top