Question

I like the simplicity of the Parallel.For and Parallel.ForEach extension methods in the TPL. I was wondering if there was a way to take advantage of something similar or even with the slightly more advanced Tasks.

Below is a typical usage for the SqlDataReader, and I was wondering if it was possible and if so how to replace the while loop below with something in the TPL. Because the reader can't provide a fixed number of iterations the For extension method is not possible which leaves dealing with Tasks I would gather. I was hoping someone may have tackled this already and worked out some do's and don''s with ADO.net.

using (SqlConnection conn = new SqlConnection("myConnString"))
using (SqlCommand comm = new SqlCommand("myQuery", conn))
{
    conn.Open();

    SqlDataReader reader = comm.ExecuteReader();

    if (reader.HasRows)
    {
        while (reader.Read())
        {
            // Do something with Reader
        }
    }
}
Was it helpful?

Solution

You're almost there. Wrap the code you posted in a function with this signature:

IEnumerable<IDataRecord> MyQuery()

and then replace your // Do something with Reader code with this:

yield return reader;

Now you have something that works in a single thread. Unfortunately, as you read through the query results it's return a reference to the same object each time, and the object just mutates itself for each iteration. This means that if you try to run it in parallel you'll get some really odd results as parallel reads mutate the object used in different threads. You need code to take a copy of the record to send to your parallel loop.

At this point, though, what I like to do is skip the extra copy of the record and go straight to a strongly-typed class. More than that, I like to use a generic method to do it:

IEnumerable<T> GetData<T>(Func<IDataRecord, T> factory, string sql, Action<SqlParameterCollection> addParameters)
{
    using (var cn = new SqlConnection("My connection string"))
    using (var cmd = new SqlCommand(sql, cn))
    {
        addParameters(cmd.Parameters);

        cn.Open();
        using (var rdr = cmd.ExecuteReader())
        {
            while (rdr.Read())
            {
                yield return factory(rdr);
            }
        }
    }
}

Assuming your factory methods create a copy as expected, this code should be safe to use in a Parallel.ForEach loop. Calling the method would look something like this (assuming a an Employee class with a static factory method named "Create"):

var UnderPaid = GetData<Employee>(Employee.Create, 
       "SELECT * FROM Employee WHERE AnnualSalary <= @MinSalary", 
       p => {
           p.Add("@MinSalary", SqlDbType.Int).Value = 50000;
       });
Parallel.ForEach(UnderPaid, e => e.GiveRaise());

Important Update:
I'm not as confident in this code as I once was. A separate thread could still mutate the reader while another thread is in the process of making it's copy. I could put a lock around that, but I'm also concerned that another thread could call update the reader after the original has itself called Read() but before it begins to make the copy. Therefore, the critical section here consists of the entire while loop... and at this point, you're back to single-threaded again. I expect there is a way to modify this code to work as expected for multi-threaded scenarios, but it will need more study.

OTHER TIPS

You're going to have difficulty replacing that while loop directly. SqlDataReader is not a thread safe class, so you cannot use it directly from multiple threads.

That being said, you could potentially process the data you read using the TPL. There are a few options, here. The easiest might be to make your own IEnumerable<T> implementation that works on the reader, and returns a class or struct containing your data. You could then use PLINQ or a Parallel.ForEach statement to process your data in parallel:

public IEnumerable<MyDataClass> ReadData()
{
    using (SqlConnection conn = new SqlConnection("myConnString"))
    using (SqlCommand comm = new SqlCommand("myQuery", conn))
    {
        conn.Open();

        SqlDataReader reader = comm.ExecuteReader();

        if (reader.HasRows)
        {
            while (reader.Read())
            {
                yield return new MyDataClass(... data from reader ...);
            }
        }
    }
}

Once you have that method, you can process this directly, via PLINQ or TPL:

Parallel.ForEach(this.ReadData(), data =>
{
    // Use the data here...
});

Or:

this.ReadData().AsParallel().ForAll(data => 
{
    // Use the data here...
});
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top