Question

I have a number of threads which retrieve data from the list of servers. The list of servers are downloaded from the server resolver every 5 mins. My threads for processing data should use only the server that has minimal response time. The response time of each server can be significantly different from request to request. So in time frame between updating a list of servers I should verify a response time from each server.

My initial approach was create two additional threads: The first to update a server list, the second to verify a response time from each server and sort a list of servers according to the response time of them.

I tried to use BlockingCollection<T> which was designed to connect producers and consumers, but in my task I have two concurrent consumers and also BlockingCollection<T> doesn't have native ability to insert items for creating a prioritized list of servers.

ConcurrentStack<T> or ConcurrentQueue<T> also cannot be used as is because they are non-blocking like as BlockingCollection<T> and they require additional mechanisms of blocking threads which require items from queues.

Please help me solve this task.

Was it helpful?

Solution

With the advent of .NET 6, a new class PriorityQueue<TElement, TPriority> has become available. This is not a thread-safe collection, but nevertheless it can be used quite easily as the backing storage of a IProducerConsumerCollection<T> implementation, which in turn can become the underlying data store of a BlockingCollection<T> class. Below is such a implementation, containing the minimal logic required in order to do the job:

public class ProducerConsumerPriorityQueue<TElement, TPriority>
    : IProducerConsumerCollection<(TElement, TPriority)>
{
    private readonly PriorityQueue<TElement, (TPriority, long)> _queue;
    private long _index = 0;

    public ProducerConsumerPriorityQueue(IComparer<TPriority> comparer = default)
    {
        comparer ??= Comparer<TPriority>.Default;
        _queue = new(Comparer<(TPriority, long)>.Create((x, y) =>
        {
            int result = comparer.Compare(x.Item1, y.Item1);
            if (result == 0) result = x.Item2.CompareTo(y.Item2);
            return result;
        }));
    }

    public int Count { get { lock (_queue) return _queue.Count; } }

    public bool TryAdd((TElement, TPriority) item)
    {
        lock (_queue) _queue.Enqueue(item.Item1, (item.Item2, ++_index));
        return true;
    }

    public bool TryTake(out (TElement, TPriority) item)
    {
        lock (_queue)
        {
            if (_queue.TryDequeue(out var element, out var priority))
            {
                item = (element, priority.Item1); return true;
            }
            item = default; return false;
        }
    }

    public bool IsSynchronized => false;

    public object SyncRoot => throw new NotSupportedException();

    public (TElement, TPriority)[] ToArray()
        => throw new NotSupportedException();

    public void CopyTo((TElement, TPriority)[] array, int index)
        => throw new NotSupportedException();

    public void CopyTo(Array array, int index)
        => throw new NotSupportedException();

    public IEnumerator<(TElement, TPriority)> GetEnumerator()
        => throw new NotSupportedException();

    IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
}

Only the members Count, TryAdd and TryTake are implemented, but they are enough. They are implemented in a thread-safe way, because it is required by the documentation of the BlockingCollection<T> class:

IProducerConsumerCollection<T> represents a collection that allows for thread-safe adding and removal of data.

Usage example:

var collection = new BlockingCollection<(string Server, int Priority)>(
    new ProducerConsumerPriorityQueue<string, int>());

collection.Add(("Server-A", 20));
collection.Add(("Server-B", 10));
collection.Add(("Server-C", 20));
collection.CompleteAdding();

foreach (var (server, priority) in collection.GetConsumingEnumerable())
{
    Console.WriteLine($"Server: {server}, Priority: {priority}");
}

Output:

Server: Server-B, Priority: 10
Server: Server-A, Priority: 20
Server: Server-C, Priority: 20

Online demo.

The insertion order for items with equal priority is preserved.

OTHER TIPS

Consider using two or more BlockingCollections and use the TakeFromAny to listen to all of the queues. The method (although I didn't see it mentioned in the documentation) prefers to take elements from the first queue in the array of queues it is listening to.

Unfortunately this is not built in and you might need to implement your own. To help you get started there is already an implementation of this in msdn samples which can be used with BlockingCollection.

http://blogs.msdn.com/pfxteam/archive/2010/04/04/9990342.aspx

For an efficient implementation you need to base it on heap data structure. Here is a nice article though the class does not implement IProducerConsumerCollection:

http://www.codeproject.com/Articles/126751/Priority-queue-in-C-with-the-help-of-heap-data-str

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top