Question

I have created some async methods and have been adding records to a dataTable in those methods, and then bulk copying the dataTable after the async methods are done. I've found now that dataTables are not thread-safe, and that is the source of my problem, as I was noticing one or two records not actually getting inserted. I have something like this:

    private async void yea()
    {
        DataTable t = new DataTable();      
        //Fill the data table with its columns
        IEnumerable<Task<string>> results = items.Select(q => AsyncFunction(q.id, t));
        Task<string[]> allTasks = Task.WhenAll(results);
        string[] allResults = await results;

        using (SqlConnection conn = new SqlConnection(_connString))
        {
            conn.Open();
            using (SqlBulkCopy bc = new SqlBulkCopy(conn))
            {
                bc.BatchSize = 1000;
                bc.DestinationTableName = tableName;
                bc.WriteToServer(t);
            }
        }
    }

    public async Task<string> AsyncFunction(int id, DataTable t)
    {
        //await another Async function
        DataRow dr = t.NewRow();
        dr["ID"] = id;
        //Many More columns
        t.Rows.Add(dr);
        return "success";
    }

Like I said my problem is that the bulk copy often misses a few records. How can I bulk copy all of the records without losing them to non-thread safe calls?

Was it helpful?

Solution 2

Just do one datatable per task, when you go to upload you can concat them all together again, your code is not complete (for example where does Result come from and how is it used) but here is the basics of how you can do it and you will need to adjust to your code.

DataTable t = new DataTable();
//Fill the data table with its columns
IEnumerable<Task<Result>> results = items.Select(q => AsyncFunction(q.id, t);

Task<NewResult[]> allTasks = Task.WhenAll(results); //This line is unnecessary with the code available.

NewResult[] allResults = await results;


using (SqlConnection conn = new SqlConnection(_connString))
{
    conn.Open();
    using (SqlBulkCopy bc = new SqlBulkCopy(conn))
    {
        bc.BatchSize = 1000;
        bc.DestinationTableName = tableName;

        //Joins all of the data rows from all of the generated tables in to a single array.
        DataRow[] allRows = allResults.SelectMany(a=>a.LocalDataTable.AsEnumerable().ToArray();

        bc.WriteToServer(allRows);
    }
}


public async Task<NewResult> AsyncFunction(int id, DataTable template)
{
    DataTable localDataTable = template.Clone(); //DataTable is thread safe for read operations like .Clone()        

    //Do some stuff
    DataRow dr = t.NewRow();
    dr["ID"] = id
    //Many More columns
    t.Rows.Add(dr);

    //Be sure you have a "await" somewhere in that removed section or else this code will not be multi-threaded.

    return new NewResult("success", localDataTable);    
}

You could also call .CreateDataReader() on each class and then write a wrapper that would concatenate multiple IDataReaders together so you don't need to allocate the extra DataRow[] or just call SqlBulkCopy multiple times, once for each data table.

OTHER TIPS

Don't mutate the DataTable within the async method. Have the async method compute the row, return it, and then add all of the rows once you have the results:

foreach(var row in await Task.WhenAll(items.Select(q => AsyncFunction(q.id, t))
    t.Rows.Add(row);
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top