L'elaborazione di un file flat in blocchi che utilizzano thread multipli utilizzando produttore / modello del consumatore e SqlBulkCopy in SQL Server DB

StackOverflow https://stackoverflow.com/questions/2066503

Domanda

Spero che pazienza con me. Ho voluto fornire quante più informazioni possibile. Il problema principale è come creare una struttura (come una pila) che verrà utilizzato da più thread che pop un valore e usarlo per elaborare una lima grande piana e possibilmente fare ripetutamente bicicletta finché l'intero file viene elaborato. Se un file ha 100.000 record che possono essere elaborati da 5 fili utilizzando 2.000 blocchi di riga quindi ogni thread otterrà 10 blocchi di processo.

Il mio obiettivo è quello di spostare i dati in un file flat (con Header ... Subheader ... Dettaglio, Dettaglio, Dettaglio, ... Dettaglio, Subfooter, Subheader ... Dettaglio, Dettaglio, Dettaglio, ... Dettaglio , Subfooter, Subheader ... Dettaglio, Dettaglio, Dettaglio, ... Dettaglio, Subfooter, struttura piè di pagina) in OLTP DB che ha modalità di recupero per semplice (possibile completa) in 3 tabelle: 1 ° che rappresenta attualmente chiave univoca di Subheader in fila Subheader, secondo un intermedio tavolo SubheaderGroup, che rappresenta il raggruppamento di righe di dettaglio in blocchi di 2000 record (deve avere di Subheader Identità PK come FK e 3 ° che rappresenta le righe dettaglio con FK che punta a Subheader PK.

io sto facendo la gestione delle transazioni manuale dato che posso avere decine di migliaia di righe di dettaglio e sto usando un campo speciale che è impostato su 0 in tabelle di destinazione durante il carico e poi alla fine della elaborazione dei file che sto facendo un upate transazionale modifica di questo valore a 1, che può segnalare altra applicazione che il carico finito.

Voglio tagliare questo file flat in più parti uguali (stesso numero di righe) che possono essere trattati con più thread e importati utilizzando SqlBulkCopy utilizzando IDataReader che viene creato dai metadati tabella di destinazione).

Voglio usare produttore / modello di consumo (come spiegato nel link qui sotto - l'analisi pdf e codice di esempio) per utilizzare l'opzione SqlBulkCopy SqlBulkCopyOptions.TableLock con. http://sqlblog.com/blogs/ alberto_ferrari / archive / 2009/11/30 / SqlBulkCopy-prestazioni-analysis.aspx Questo modello consente la creazione di più produttori e il numero equivalente di consumatori hanno bisogno di iscriversi ai produttori di consumare la riga.

Nel progetto TestSqlBulkCopy, il file DataProducer.cs c'è un metodo che simula la produzione di migliaia di record.

public void Produce (DataConsumer consumer, int numberOfRows) {
    int bufferSize = 100000;
    int numberOfBuffers = numberOfRows / bufferSize;

    for (int bufferNumber = 0; bufferNumber < numberOfBuffers; bufferNumber++) {
        DataTable buffer = consumer.GetBufferDataTable ();

        for (int rowNumber = 0; rowNumber < bufferSize; rowNumber++) {
            object[] values = GetRandomRow (consumer);
            buffer.Rows.Add (values);
        }
        consumer.AddBufferDataTable (buffer);
    }
}

Questo metodo viene eseguito nel contesto di un nuovo filo. Voglio che questo nuovo thread per leggere solo un pezzo unico di file flat originale e un altro thread sarà strart elaborare il prossimo pezzo. I consumatori sarebbero quindi spostare i dati (che viene pompato a loro) a SQL Server database utilizzando la classe ADO.NET SqlBulkCopy.

Quindi la domanda qui è di circa dettare programma principale quello lineFrom per lineTo dovrebbe essere elaborato da ogni thread e penso che dovrebbe accadere durante la creazione thread. Seconda soluzione è probabilmente per i thread di condividere qualche struttura e utilizzare qualcosa di unico per loro (come il numero di thread o numero di sequenza) di ricercare una struttura condivisa (possibilmente uno stack e pop un valore (bloccaggio uno stack mentre lo fa) e poi il prossimo thread poi pick-up il valore successivo. il programma principale sceglierà nel file flat e determinare la dimensione dei pezzi e ha creato la pila.

Quindi, qualcuno può fornire alcuni frammenti di codice, pseudo cod su come le discussioni più sarebbe elaborare un file e ottenere solo una parte unica di quel file?

Grazie, Rad

È stato utile?

Soluzione

ciò che ha funzionato bene per me è quello di utilizzare una coda per tenere il lavoro non trasformati e un dizionario per tenere traccia del lavoro in volo:

  1. Creare una classe lavoratrice che prende il il nome del file, linea di partenza, e numero di linee ed ha un metodo di aggiornamento che fa gli inserti di database. Passare un metodo di callback che il lavoratore utilizza per segnalare quando il suo fare.
  2. Caricare una coda con le istanze del lavoratore di classe, uno per ogni blocco.
  3. genera un thread dispatcher che Ritiri dalla coda di un esempio operaio, lancia il suo aggiornamento metodo e aggiunge l'istanza lavoratore in un dizionario, calettato da ManagedThreadId del suo filo. Fai questo fino a quando il filo massimo consentito conteggio è raggiunto, come rilevato dalla Dictionary.Count. il dispatcher attende che un thread termina e poi lancia un'altra. Ci sono diversi modi per esso aspettare.
  4. Come ogni thread termina, la sua callback rimuove la sua ManagedThreadId dalla Dizionario. Se il filo si chiude causa di un errore (ad esempio timeout di connessione), allora la richiamata può reinserire il lavoratore nella coda. Questo è un buon posto per aggiornare l'interfaccia utente.
  5. l'interfaccia utente può mostrare thread attivi, il progresso totale, e il tempo per ogni pezzo. Può permettere all'utente di regolare il numero di thread attivi, elaborazione pausa, mostra errori o interrompere presto.
  6. Quando la coda e Dizionario sono vuote, il gioco è fatto.

Codice Demo come una console app:

using System;
using System.Collections.Generic;
using System.Threading;

namespace threadtest
{
    public delegate void DoneCallbackDelegate(int idArg, bool successArg, string messageArg);

    class Program
    {
        static void Main(string[] args)
        {
            Supervisor supv = new Supervisor();
            supv.LoadQueue();
            supv.Dispatch();
        }
    }

    public class Supervisor
    {
        public Queue<Worker> pendingWork = new Queue<Worker>();
        public Dictionary<int, Worker> activeWork = new Dictionary<int, Worker>();

        private object pendingLock = new object();
        private object activeLock = new object();

        private int maxThreads = 200;

        public void LoadQueue()
        {
            for (int i = 0; i < 1000; i++)
            {
                Worker worker = new Worker();
                worker.Callback = new DoneCallbackDelegate(WorkerFinished);
                lock (pendingLock)
                {
                    pendingWork.Enqueue(worker);
                }
            }
        }

        public void Dispatch()
        {
            int activeThreadCount;

            while (true)
            {
                lock (activeLock) { activeThreadCount = activeWork.Count; }
                while (true)
                {
                    lock (activeLock)
                    {
                        if (activeWork.Count == maxThreads) break;
                    }
                    lock (pendingWork)
                    {
                        if (pendingWork.Count > 0)
                        {
                            Worker worker = pendingWork.Dequeue();
                            Thread thread = new Thread(new ThreadStart(worker.DoWork));
                            thread.IsBackground = true;
                            worker.ThreadId = thread.ManagedThreadId;
                            lock (activeLock) { activeWork.Add(worker.ThreadId, worker); }
                            thread.Start();
                        }
                        else
                        {
                            break;
                        }
                    }
                }
                Thread.Sleep(200); // wait to see if any workers are done (many ways to do this)

                lock (pendingLock)
                    lock (activeLock)
                    {
                        if ((pendingWork.Count == 0) && (activeWork.Count == 0)) break;
                    }
            }
        }

        // remove finished threads from activeWork, resubmit if necessary, and update UI
        public void WorkerFinished(int idArg, bool successArg, string messageArg)
        {
            lock (pendingLock)
                lock (activeLock)
                {
                    Worker worker = activeWork[idArg];
                    activeWork.Remove(idArg);
                    if (!successArg)
                    {
                        // check the message or something to see if you should resubmit thread
                        pendingWork.Enqueue(worker);
                    }
                    // update UI
                    int left = Console.CursorLeft;
                    int top = Console.CursorTop;
                    Console.WriteLine(string.Format("pending:{0} active:{1}        ", pendingWork.Count, activeWork.Count));
                    Console.SetCursorPosition(left, top);
                }
        }
    }

    public class Worker
    {
        // this is where you put in your problem-unique stuff
        public int ThreadId { get; set; }

        DoneCallbackDelegate callback;
        public DoneCallbackDelegate Callback { set { callback = value; } }

        public void DoWork()
        {
            try
            {
                Thread.Sleep(new Random().Next(500, 5000)); // simulate some effort
                callback(ThreadId, true, null);
            }
            catch (Exception ex)
            {
                callback(ThreadId, false, ex.ToString());
            }
        }
    }
}
Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top