Question

I have a process where my main thread is reading a file and splitting it into parts. Those parts then require further processing. I would like to utilize any available threads so that the downstream processing is utilizing as much CPU (or as many cores) as possible. I don't want to create an excessive backlog from the main thread, so I need the main thread to wait to add to the queue until there is another available thread.

I see many articles like VB.NET 4.0: Looking to execute multiple threads, but wait until all threads are completed before resuming, but they are waiting for all threads to complete, whereas I just need any threads to be available

Is this something I can tackle with the Task Parallel Library, or should I be manually creating threads and monitoring a threadpool?

Using Reader As New StreamReader(FileName)
    Do
        CurrentBlockSize = Reader.ReadBlock(CurrentBuffer, 0, BufferSize)

        RunningBuffer &= New String(CurrentBuffer)

        If RunningBuffer.Contains(RowDelimiter) Then
            LineParts = RunningBuffer.Split(RowDelimiter)

            For I As Integer = 0 To LineParts.Count - 1
                If I < LineParts.Count - 1 Then

                    'Make synchronous call that blocks until' 
                    'another thread is available to process the line'
                    AddLineToTheProcessingQueue(CurrentLine)

                Else
                    RunningBuffer = LineParts(I)
                End If
            Next
        End If

    Loop While CurrentBlockSize = BufferSize
End Using
Was it helpful?

Solution

Paste this code into a new Console Application.

Imports System.Threading

Module Module1

    ' I just picked 6 randomly, not sure what is a good strategy for picking this number
    ' also, not sure what is the difference between a Worker Thread and a Completion thread
    Const MaxWorkerThreads As Integer = 6
    Const MaxCompletionPortThreads As Integer = 6

    Sub Main()

        ThreadPool.SetMaxThreads(MaxWorkerThreads, MaxCompletionPortThreads)

        Dim availableWorkerThreads As Integer
        Dim availableCompletionPortThreads As Integer

        For i As Integer = 0 To 100

            ' GetAvailableThreads returns results via output parameters
            ThreadPool.GetAvailableThreads(availableWorkerThreads, availableCompletionPortThreads)

            Dim tries As Integer = 0

            Do While (availableWorkerThreads = 0)
                ' this loop does not execute if there are available threads
                ' you may want to add a fail-safe to check "tries" in case the child threads get stuck
                tries += 1

                Console.WriteLine(String.Format("waiting to start item {0}, attempt {1}, available threads: {2}, {3}", i, tries, availableWorkerThreads, availableCompletionPortThreads))

                ' failure to call Sleep will make your program unresponsive
                Thread.Sleep(1000)

                ' call GetAvailableThreads again for the next test at the top of the loop
                ThreadPool.GetAvailableThreads(availableWorkerThreads, availableCompletionPortThreads)
            Loop

            ' this is how you pass parameters to a thread created through QueueUserWorkItem
            Dim parameters As Object() = {i}
            ThreadPool.QueueUserWorkItem(AddressOf DoWork, parameters)
            ' According to MSDN, you must Sleep after calling QueueUserWorkItem, or else the current thread will just exit
            Thread.Sleep(500)

        Next

    End Sub

    Sub DoWork(parameters As Object())
        Dim itemNumber = parameters(0)
        Dim sleepLength = itemNumber * 1000
        Console.WriteLine(String.Format("Item: {0} - sleeping for {1} miliseconds.", itemNumber, sleepLength))
        Thread.Sleep(sleepLength)
        Console.WriteLine(String.Format("Item: {0} - done sleeping.", itemNumber))
    End Sub

End Module

OTHER TIPS

I'm not sure why exactly do you want to do this, but you can achieve something very similar by using BlockingCollection or a dataflow block with BoundedCapacity set.

For example, if you set the capacity to 1 and your consumers are busy at the moment, you won't be able to add a second item to the queue until one of the consumers finishes its current work and removes that item from the queue. And both versions give you a way to wait until you can add another item to the queue.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top