I have a migration job and I need to validate the target data when done. To notify the admin of the success/failure of validations, I use a counter to compare the number of rows from table Foo in Database1 to the number of rows from table Foo in Database2.
Each row from Database2 is validated against the corresponding row in Database1. To speed up the process, I use a Parallel.ForEach loop.
My initial problem was that the count was always different from what I expected. I later found that the += and -= operations are not thread-safe (not atomic). To correct the problem, I updated the code to use Interlocked.Increment on the counter variable. This code prints a count that is closer to the actual count, but still, it seems to be different on each execution and it doesn't give the result I expect :
Private countObjects As Integer
Private Sub MyMainFunction()
Dim objects As List(Of MyObject)
'Query with Dapper, unrelevant to the problem.
Using connection As New System.Data.SqlClient.SqlConnection("aConnectionString")
objects = connection.Query("SELECT * FROM Foo") 'Returns around 81000 rows.
End Using
Parallel.ForEach(objects, Sub(u) MyParallelFunction(u))
Console.WriteLine(String.Format("Count : {0}", countObjects)) 'Prints "Count : 80035" or another incorrect count, which seems to differ on each execution of MyMainFunction.
End Sub
Private Sub MyParallelFunction(obj As MyObject)
Interlocked.Increment(countObjects) 'Breakpoint Hit Count is at around 81300 or another incorrect number when done.
'Continues executing unrelated code using obj...
End Sub
After some experiments with other ways of making the increment thread-safe, I found that wrapping the increment in a SyncLock on a dummy reference object gives the expected result :
Private countObjects As Integer
Private locker As SomeType
Private Sub MyMainFunction()
locker = New SomeType()
Dim objects As List(Of MyObject)
'Query with Dapper, unrelevant to the problem.
Using connection As New System.Data.SqlClient.SqlConnection("aConnectionString")
objects = connection.Query("SELECT * FROM Foo") 'Returns around 81000 rows.
End Using
Parallel.ForEach(objects, Sub(u) MyParallelFunction(u))
Console.WriteLine(String.Format("Count : {0}", countObjects)) 'Prints "Count : 81000".
End Sub
Private Sub MyParallelFunction(obj As MyObject)
SyncLock locker
countObjects += 1 'Breakpoint Hit Count is 81000 when done.
End SyncLock
'Continues executing unrelated code using obj...
End Sub
Why doesn't the first code snippet work as expected? The most confusing thing is the Breakpoint Hit Count giving unexpected results.
Is my understanding of Interlocked.Increment or of atomic operations flawed? I would prefer not to use SyncLock on a dummy object, and I hope there's a way to do it cleanly.
Update:
- I run the example in
Debugmode onAny CPU. - I use
ThreadPool.SetMaxThreads(60, 60)upper in the stack, because I'm querying an Access database at some point. Could this cause a problem? - Could the call to
Incrementmess with theParallel.ForEachloop, forcing it to exit before all tasks are done?
Update 2 (Methodology):
- My tests are executed with code as close as possible to what is displayed here, with the exception of object types and query string.
- The query always give the same number of results, and I always verify
objects.Counton a breakpoint before continuing toParallel.ForEach. - The only code that changes in between the executions is
Interlocked.Incrementreplaced bySyncLock lockerandcountObjects += 1.
Update 3
I created a SSCCE by copying my code in a new console app and replacing external classes and code.
This is the Main method of the console app:
Sub Main()
Dim oClass1 As New Class1
oClass1.MyMainFunction()
End Sub
This is the definition of Class1:
Imports System.Threading
Public Class Class1
Public Class Dummy
Public Sub New()
End Sub
End Class
Public Class MyObject
Public Property Id As Integer
Public Sub New(p_Id As Integer)
Id = p_Id
End Sub
End Class
Public Property countObjects As Integer
Private locker As Dummy
Public Sub MyMainFunction()
locker = New Dummy()
Dim objects As New List(Of MyObject)
For i As Integer = 1 To 81000
objects.Add(New MyObject(i))
Next
Parallel.ForEach(objects, Sub(u As MyObject)
MyParallelFunction(u)
End Sub)
Console.WriteLine(String.Format("Count : {0}", countObjects)) 'Interlock prints an incorrect count, different in each execution. SyncLock prints the correct count.
Console.ReadLine()
End Sub
'Interlocked
Private Sub MyParallelFunction(ByVal obj As MyObject)
Interlocked.Increment(countObjects)
End Sub
'SyncLock
'Private Sub MyParallelFunction(ByVal obj As MyObject)
' SyncLock locker
' countObjects += 1
' End SyncLock
'End Sub
End Class
I still note the same behavior when switching MyParallelFunction from Interlocked.Increment to SyncLock.
Interlocked.Incrementon a property will always be broken. Effectively, the VB compiler re-writes it as:Thus breaking any threading guarantees provided by
Increment. Change it to be a field. VB will re-write any property passed as aByRefparameter to code that resembles the above.