Вопрос

I have a migration job and I need to validate the target data when done. To notify the admin of the success/failure of validations, I use a counter to compare the number of rows from table Foo in Database1 to the number of rows from table Foo in Database2.

Each row from Database2 is validated against the corresponding row in Database1. To speed up the process, I use a Parallel.ForEach loop.

My initial problem was that the count was always different from what I expected. I later found that the += and -= operations are not thread-safe (not atomic). To correct the problem, I updated the code to use Interlocked.Increment on the counter variable. This code prints a count that is closer to the actual count, but still, it seems to be different on each execution and it doesn't give the result I expect :

Private countObjects As Integer

Private Sub MyMainFunction()
    Dim objects As List(Of MyObject)

    'Query with Dapper, unrelevant to the problem.
    Using connection As New System.Data.SqlClient.SqlConnection("aConnectionString")
        objects = connection.Query("SELECT * FROM Foo") 'Returns around 81000 rows.
    End Using

    Parallel.ForEach(objects, Sub(u) MyParallelFunction(u))

    Console.WriteLine(String.Format("Count : {0}", countObjects)) 'Prints "Count : 80035" or another incorrect count, which seems to differ on each execution of MyMainFunction.
End Sub

Private Sub MyParallelFunction(obj As MyObject)
    Interlocked.Increment(countObjects) 'Breakpoint Hit Count is at around 81300 or another incorrect number when done.

    'Continues executing unrelated code using obj...
End Sub

After some experiments with other ways of making the increment thread-safe, I found that wrapping the increment in a SyncLock on a dummy reference object gives the expected result :

Private countObjects As Integer
Private locker As SomeType

Private Sub MyMainFunction()
    locker = New SomeType()
    Dim objects As List(Of MyObject)

    'Query with Dapper, unrelevant to the problem.
    Using connection As New System.Data.SqlClient.SqlConnection("aConnectionString")
        objects = connection.Query("SELECT * FROM Foo") 'Returns around 81000 rows.
    End Using

    Parallel.ForEach(objects, Sub(u) MyParallelFunction(u))

    Console.WriteLine(String.Format("Count : {0}", countObjects)) 'Prints "Count : 81000".
End Sub

Private Sub MyParallelFunction(obj As MyObject)
    SyncLock locker
        countObjects += 1 'Breakpoint Hit Count is 81000 when done.
    End SyncLock

    'Continues executing unrelated code using obj...
End Sub

Why doesn't the first code snippet work as expected? The most confusing thing is the Breakpoint Hit Count giving unexpected results.

Is my understanding of Interlocked.Increment or of atomic operations flawed? I would prefer not to use SyncLock on a dummy object, and I hope there's a way to do it cleanly.

Update:

  • I run the example in Debug mode on Any CPU.
  • I use ThreadPool.SetMaxThreads(60, 60) upper in the stack, because I'm querying an Access database at some point. Could this cause a problem?
  • Could the call to Increment mess with the Parallel.ForEach loop, forcing it to exit before all tasks are done?

Update 2 (Methodology):

  • My tests are executed with code as close as possible to what is displayed here, with the exception of object types and query string.
  • The query always give the same number of results, and I always verify objects.Count on a breakpoint before continuing to Parallel.ForEach.
  • The only code that changes in between the executions is Interlocked.Increment replaced by SyncLock locker and countObjects += 1.

Update 3

I created a SSCCE by copying my code in a new console app and replacing external classes and code.

This is the Main method of the console app:

Sub Main()
    Dim oClass1 As New Class1
    oClass1.MyMainFunction()
End Sub

This is the definition of Class1:

Imports System.Threading

Public Class Class1

    Public Class Dummy
        Public Sub New()
        End Sub
    End Class

    Public Class MyObject
        Public Property Id As Integer

        Public Sub New(p_Id As Integer)
            Id = p_Id
        End Sub
    End Class

    Public Property countObjects As Integer
    Private locker As Dummy

    Public Sub MyMainFunction()
        locker = New Dummy()
        Dim objects As New List(Of MyObject)

        For i As Integer = 1 To 81000
            objects.Add(New MyObject(i))
        Next

        Parallel.ForEach(objects, Sub(u As MyObject)
                                      MyParallelFunction(u)
                                  End Sub)

        Console.WriteLine(String.Format("Count : {0}", countObjects)) 'Interlock prints an incorrect count, different in each execution. SyncLock prints the correct count.
        Console.ReadLine()
    End Sub

    'Interlocked
    Private Sub MyParallelFunction(ByVal obj As MyObject)
        Interlocked.Increment(countObjects)
    End Sub

    'SyncLock
    'Private Sub MyParallelFunction(ByVal obj As MyObject)
    '    SyncLock locker
    '        countObjects += 1
    '    End SyncLock
    'End Sub

End Class

I still note the same behavior when switching MyParallelFunction from Interlocked.Increment to SyncLock.

Это было полезно?

Решение

Interlocked.Increment on a property will always be broken. Effectively, the VB compiler re-writes it as:

Value = <value from Property>
Interlocked.Increment(Value)
<Property> = Value

Thus breaking any threading guarantees provided by Increment. Change it to be a field. VB will re-write any property passed as a ByRef parameter to code that resembles the above.

Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top