Question

First, could someone with 1500+ "reputation" please create a tag for "ContinueWith" (and tag this question with it)? Thanks!

Sorry for the length of this post but I don't want to waste the time of anyone trying to help me because I left out relevant details. That said, it may still happen. :)

Now the details. I am working on a service that subscribes to a couple of ActiveMQ queue topics. Two of the topics are somewhat related. One is a "company update" and one is a "product update". The "ID" for both is the CompanyID. The company topic includes the data in the product topic. Required because other subscribers need the product data but don't want/need to subscribe to the product topic. Since my service is multi-threaded (requirement beyond our discretion), as the messages arrive I add a Task to process each one in a ConcurrentDictionary using AddOrUpdate where the update parm is simply a ContinueWith (see below). Done to prevent simultaneous updates which could happen because these topics and subscribers are "durable" so if my listener service goes offline (whatever reason) we could end with multiple messages (company and/or product) for the same CompanyID.

Now, my actual question (finally!) After the Task (whether just one task, or the last in a chain of ContinueWith tasks) is finished, I want to remove it from the ConcurrentDictionary (obviously). How? I have thought of and gotten some ideas from coworkers but I don't really like any of them. I am not going to list the ideas because your answer might be one of those ideas I have but don't like but it may end up being the best one.

I have tried to compress the code snippet to prevent you from having to scroll up and down too much, unlike my description. :)

nrtq = Not Relevant To Question

public interface IMessage
{
  long CompantId { get; set; }
  void Process();
}
public class CompanyMessage : IMessage
{ //implementation, nrtq }
public class ProductMessage : IMessage
{ //implementation, nrtq }

public class Controller
{
  private static ConcurrentDictionary<long, Task> _workers = new ConcurrentDictionary<long, Task>();
  //other needed declarations, nrtq

  public Controller(){//constructor stuff, nrtq }

  public StartSubscribers()
  {
    //other code, nrtq
    _companySubscriber.OnMessageReceived += HandleCompanyMsg;
    _productSubscriber.OnMessageReceived += HandleProductMsg;
  }

  private void HandleCompanyMsg(string msg)
  {
    try {
      //other code, nrtq
      QueueItUp(new CompanyMessage(message));
    } catch (Exception ex) { //other code, nrtq }
  }

  private void HandleProductMsg(string msg)
  {
    try {
      //other code, nrtq
      QueueItUp(new ProductMessage(message));
    } catch (Exception ex) { //other code, nrtq }
  }

  private static void QueueItUp(IMessage message)
  {
    _workers.AddOrUpdate(message.CompanyId,
      x => {
        var task = new Task(message.Process);
        task.Start();
        return task;
      },
      (x, y) => y.ContinueWith((z) => message.Process())
    );
  }

Thanks!

Was it helpful?

Solution

I won't "Accept" this answer for a while because I am eager to see if anyone else can come up with a better solution.

A coworker came up with a solution which I tweaked a little bit. Yes, I am aware of the irony (?) of using the lock statement with a ConcurrentDictionary. I don't really have the time right now to see if there would be a better collection type to use. Basically, instead of just doing a ContinueWith() for existing tasks, we replace the task with itself plus another task tacked on the end using ContinueWith().

What difference does that make? Glad you asked! :) If we had just done a ContinueWith() then the !worker.Value.IsCompleted would return true as soon as the first task in the chain is completed. However, by replacing the task with two (or more) chained tasks, then as far as the collection is concerned, there is only one task and the !worker.Value.IsCompleted won't return true until all tasks in the chain are complete.

I admit I was a little concerned about replacing a task with itself+(new task) because what if the task happened to be running while it is being replaced. Well, I tested the living daylights out of this and did not run into any problems. I believe what is happening is that since task is running in its own thread and the collection is just holding a pointer to it, the running task is unaffected. By replacing it with itself+(new task) we maintain the pointer to the executing thread and get the "notification" when it is complete so that the next task can "continue" or the IsCompleted returns true.

Also, the way the "clean up" loop works, and where it is located, means that we will have "completed" tasks hanging around in the collection but only until the next time the "clean up" runs which is the next time a message is received. Again, I did a lot of testing to see if I could cause a memory problem due to this but my service never used more than 20 MB of RAM, even while processing hundreds of messages per second. We would have to receive some pretty big messages and have a lot of long running tasks for this to ever cause a problem but it is something to keep in mind as your situation may differ.

As above, in the code below, nrtq = not relevant to question.

public interface IMessage
{
  long CompantId { get; set; }
  void Process();
}
public class CompanyMessage : IMessage
{ //implementation, nrtq }
public class ProductMessage : IMessage
{ //implementation, nrtq }

public class Controller
{
  private static ConcurrentDictionary<long, Task> _workers = new ConcurrentDictionary<long, Task>();
  //other needed declarations, nrtq

  public Controller(){//constructor stuff, nrtq }

  public StartSubscribers()
  {
    //other code, nrtq
    _companySubscriber.OnMessageReceived += HandleCompanyMsg;
    _productSubscriber.OnMessageReceived += HandleProductMsg;
  }

  private void HandleCompanyMsg(string msg)
  {
    //other code, nrtq
    QueueItUp(new CompanyMessage(message));
  }

  private void HandleProductMsg(string msg)
  {
    //other code, nrtq
    QueueItUp(new ProductMessage(message));
  }

  private static void QueueItUp(IMessage message)
  {
    lock(_workers)
    {
      foreach (var worker in Workers)
      {
        if (!worker.Value.IsCompleted) continue;
        Task task;
        Workers.TryRemove(worker.Key, out task);
      }
      var id = message.CompanyId;
      if (_workers.ContainsKey(id))
        _workers[id] = _workers[id].ContinueWith(x => message.Process());
      else
      {
        var task = new Task(y => message.Process(), id);
        _workers.TryAdd(id, task);
        task.Start();
      }
    }
  }
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top