Question

I'm stuck on a problem and am wondering if I just have coded something incorrectly. The application polls every few seconds and grabs every record from a table whose sole purpose is to signify what records to act upon.

Please note I've left out the error handling code for space and readability

    //Producing Thread, this is triggered every 5 seconds... UGH, I hate timers

    foreach (var Record in GetRecordsFromDataBase())  // returns a dictionary
    {
        if (!ConcurrentDictionary.Contains(Record.Key))
            ConcurrentDictionary.TryAdd(Record.Key, Record.Value);
    }

This code works great, with the irritating fact that it may/will select the same record multiple times until said record(s) is/are processed. By processed, each selected record is being written into its own newly created, uniquely named file. Then a stored procedure is called for that record's key to remove it from the database at which point that particular key is removed from the ConcurrentDictionary.

    // Consuming Thread, located within another loop to allow
    // the below code to continue to cycle until instructed
    // to terminate

    while (!ConcurrentDictionary.IsEmpty)
    {
        var Record = ConcurrentDictionary.Take(1).First();
        WriteToNewFile(Record.Value);
        RemoveFromDatabase(Record.Key);
        ConcurrentDictionary.TryRemove(Record.Key);
    }

For a throughput test I added 20k+ records into the table and then turned the application loose. I was quite surprised when I noticed 22k+ files that continued to increase well into 100k+ territory.

What am I doing wrong??? Have I completely misunderstood what the concurrent dictionary is used for? Did I forget a semi-colon somewhere?

Was it helpful?

Solution

First, eliminate the call to Contains. TryAdd already checks for duplicates, and returns false if the item is already present.

foreach (var Record in GetRecordsFromDataBase())  // returns a dictionary
{
        ConcurrentDictionary.TryAdd(Record.Key, Record.Value);
}

The next problem I see is that I don't think that ConcurrentDictionary.Take(1).First() is a good way to get an item from the dictionary since it isn't atomic. I think you want to use a BlockingCollection() instead. It is specifically designed for implementing a producer-consumer pattern.

Lastly, I think your problems don't really have to do with the Dictionary, but with the database. The dictionary itself is thread-safe, but your dictioanry is not atomic with the database. So suppose record A is in the database. GetRecordsFromDataBase() pulls it and adds it to the dictionary. Then it begins processing record A (I assume this is in another thread). Then, that first loop again calls GetRecordsFromDataBase() and gets record A again. Simultaneously, record A is processed and removed from the database. But it's too late! GetRecordsFromDataBase() already grabbed it! So that initial loop adds it to the dictionary again, after it has been removed.

I think you may need to take records that are to be processed, and move them into another table entirely. That way, they won't get picked-up a second time. Doing this at the C# level, rather than the database level, is going to be a problem. Either that, or you don't want to be adding records to the queue while processing records.

OTHER TIPS

What am I doing wrong???

The foreach (add) loop is trying to add any record not in the database to the dictionary.

The while (remove) loop is removing items from the database and then the dictionary, also writing them to file.

This logic looks correct. But there is a race:

GetRecordsFromDataBase(); // returns records 1 through 10.

switch context to remove loop.

    WriteToNewFile(Record.Value);    // write record 5
    RemoveFromDatabase(Record.Key);  // remove record 5 from db
    ConcurrentDictionary.TryRemove(Record.Key); // remove record 5 from dictionary

switch back to add loop

 ConcurrentDictionary.TryAdd(Record.Key, Record.Value); // add record 5 even though it is not in the DB becuase it was part of the records returned by ConcurrentDictionary.TryAdd(Record.Key, Record.Value);;

After the item is removed the foreach loop adds it again. This is why your file count is multiplying.

foreach (var Record in GetRecordsFromDataBase())  // returns a dictionary
    {
        if (!ConcurrentDictionary.Contains(Record.Key)) // this if is not required. try add will do.
            ConcurrentDictionary.TryAdd(Record.Key, Record.Value);
    }

Try something like this: add loop:

   foreach (var Record in GetRecordsFromDataBase())  // returns a dictionary
            {
               if (ConcurrentDictionary.TryAdd(Record.Key, false)) // only adds the record if it has not been processed.
               {
                   ConcurrentQueue.Enque(record) // enqueue the record
               } 
            }

Remove loop

var record;//   you will need to specify the type

    if (ConcurrentQueue.TryDequeue(record))
    {
         if (ConcurrentDictionary.TryUpdate(record.key,true,false)) // update the value from true to false
         {
            WriteToNewFile(Record.Value);    // write record 5
            RemoveFromDatabase(Record.Key);  // remove record 5 from db
         }
    }

This will leave items in the dictionary for each record processed. You can remove them from the dictionary eventually but multithreading involving a db can be tricky.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top