Pregunta

I must be doing something wrong somewhere because i am getting duplicate items in my concurrentbag, here is the chain of events

  var listings = new ConcurrentBag<JSonListing>();
  Parallel.ForEach(Types, ParallelOptions, (type, state) =>
  {
      ...
      status = ProcessType(listings, status, type, state);
      ....
   });

  private GeocodeResults ProcessType(ConcurrentBag<JSonListing> listings, GeocodeResults status, XElement type, ParallelLoopState state)
  {
      ....
      AddListingsToList(results, type, listings);
      .... 
  }

private void AddListingsToList(dynamic results, XElement type, ConcurrentBag<JSonListing> listings)
    {

        var typeMustMatch = type.Attribute("TypeMustMatch").Value;
        var typeID = Convert.ToInt32(type.Attribute("ID").Value);

        foreach (var result in results.results)
        {


            var foundListing = listings.SingleOrDefault(x => x.ID == result.id);
            if (foundListing != null)
            {
                var typeIDs = foundListing.TypeIDs.Split("|".ToCharArray(), StringSplitOptions.RemoveEmptyEntries).ToList();
                if (!typeIDs.Contains(typeID.ToString()))
                {
                    foundListing.TypeIDs += "|" + typeID;
                }
            }
            else
            {
                var listing = new JSonListing { ID = result.id, ReferenceNumber = result.reference, TypeIDs = typeID.ToString(), TypeMustMatch = typeMustMatch };
                listings.Add(listing);
            }




        }


    }

The add listing should guarantee that if the element alread exists, not to add another one of the same ID but instead update some property. Now the error i get is

System.InvalidOperationException: Sequence contains more than one matching element
at System.Linq.Enumerable.SingleOrDefault[TSource](IEnumerable`1 source, Func`2 predicate)
at LocalSearch.Processor.CityProcessor.AddListingsToList(Object results, XElement type, ConcurrentBag`1 listings) in d:\Projects\ListingLocator2\Code\LocalSearch.Processor\Processors.cs:line 310
at CallSite.Target(Closure , CallSite , CityProcessor , Object , XElement , ConcurrentBag`1 )
at LocalSearch.Processor.CityProcessor.ProcessType(ConcurrentBag`1 listings, GeocodeResults status, XElement type, ParallelLoopState state) in d:\Projects\ListingLocator2\Code\LocalSearch.Processor\Processors.cs:line 249
at LocalSearch.Processor.CityProcessor.<>c__DisplayClass4.b__0(XElement type, ParallelLoopState state) in d:\Projects\ListingLocator2\Code\LocalSearch.Processor\Processors.cs:line 137

¿Fue útil?

Solución

ConcurrentBag guarantees that each operation on it is thread-safe when considered on its own. It does not guarantee that multiple operations in succession will be treated as an atomic group.

As a result your code has a race condition: you check if the bag already contains some item X, but two threads can run the test concurrently, decide that the item isn't there, and proceed to add it. End result: two copies of the item end up being in the bag.

It looks like your use case would be better implemented by using a ConcurrentDictionary instead and leveraging the TryAdd method, which is atomic. Alternatively you could put a lock() around the bag to make everything inside the block operate atomically, but then you don't really need a concurrent collection and can use a straight List instead.

Otros consejos

This doesn't mean that Concurrent Bag isn't thread safe, it simply means that your code isn't

You are checking for a value in concurrent bag and then adding a new item if the previouse check failed.

However because there are no locks in your code, two jobs could simultaneously perform the following;

THREAD 1        THREAD 2
=-=-=-=-=-=-=-=-=-=-=-=-
Check Exists
                Check Exists
Add New
                Add New

You need to lock round your check and add routines.

Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top