El procesamiento de un archivo plano en trozos utilizando varios subprocesos mediante productor/consumidor patrón y SqlBulkCopy en SQL Server DB

StackOverflow https://stackoverflow.com/questions/2066503

Pregunta

Espero que tengan paciencia conmigo.Yo quería proporcionar tanta información como pueda.El principal problema es cómo crear una estructura (como una pila) que será utilizado por varios subprocesos que se abrirá un valor y se usa para el proceso de un gran archivo plano y, posiblemente, hacer bicicleta de nuevo y de nuevo hasta que el archivo se procesa.Si un archivo tiene 100.000 registros que pueden ser procesados por 5 hilos utilizando 2.000 fila trozos a continuación, cada subproceso se obtenga un 10 trozos de proceso.

Mi objetivo es mover los datos en un archivo plano (con Cabecera...Subheader...Detalles, Detalles, Detalles, ...Detalle, SubFooter, Subheader...Detalles, Detalles, Detalles, ...Detalle, SubFooter, Subheader...Detalles, Detalles, Detalles, ...Detalle, SubFooter, Pie de página estructura) en OLTP DB que tiene el modo de recuperación Simple (posible) en 3 tablas:1er representa Subheader de la clave única presente en Subheader fila, 2ª una tabla intermedia SubheaderGroup, en representación de la agrupación de filas de detalle en trozos de 2000 registros (necesita tener Subheader la Identidad de PK como FK y 3 de representación de las filas de Detalle con FK apunta a Subheader PK.

Estoy haciendo manual de transacciones de gestión, ya que puede tener decenas de miles de filas de Detalle y estoy usando un campo especial que se establece en 0 en las tablas de destino durante la carga y, a continuación, en el final del archivo de procesamiento estoy haciendo una transaccional upate cambiar este valor a 1, que puede ser una señal de otra aplicación en la que la carga ha terminado.

Quiero cortar este archivo plano en varios trozos iguales (mismo número de filas) que pueden ser procesadas con múltiples hilos importados y el uso de SqlBulkCopy utilizando IDataReader que se crea a partir de la tabla de Destino de metadatos).

Quiero usar productor/consumidor patrón (como se explica en el enlace de más abajo - pdf análisis y el ejemplo de código) para el uso de SqlBulkCopy con SqlBulkCopyOptions.TableLock opción.http://sqlblog.com/blogs/alberto_ferrari/archive/2009/11/30/sqlbulkcopy-performance-analysis.aspx Este patrón permite la creación de múltiples productores y el número equivalente de los consumidores necesitan para suscribirse a los productores para consumir la fila.

En TestSqlBulkCopy proyecto, DataProducer.cs archivo no es un método que simula la producción de miles de registros.

public void Produce (DataConsumer consumer, int numberOfRows) {
    int bufferSize = 100000;
    int numberOfBuffers = numberOfRows / bufferSize;

    for (int bufferNumber = 0; bufferNumber < numberOfBuffers; bufferNumber++) {
        DataTable buffer = consumer.GetBufferDataTable ();

        for (int rowNumber = 0; rowNumber < bufferSize; rowNumber++) {
            object[] values = GetRandomRow (consumer);
            buffer.Rows.Add (values);
        }
        consumer.AddBufferDataTable (buffer);
    }
}

Este método se ejecutará en el contexto de un nuevo hilo.Quiero que este nuevo hilo para leer sólo un único trozo de plano original del archivo y otro hilo strart el procesamiento del siguiente fragmento.Los consumidores, a continuación, mover los datos (que se bombea a ellos) para SQL Server DB utilizando SqlBulkCopy ADO.NET clase.

Así que la pregunta aquí es sobre el programa principal dictando lo que lineFrom a lineTo deben ser procesados por cada hilo y creo que debería suceder durante la creación de subprocesos.La segunda solución es, probablemente, para los hilos para compartir un poco de estructura y uso algo único para ellos (como el número de hilo o el número de secuencia) para la búsqueda de una estructura compartida (posiblemente una pila de pop y un valor (bloqueo de una pila, mientras que hacerlo) y, a continuación, siguiente hilo, a continuación, recoger el valor siguiente.El programa principal recogida en el archivo plano y determinar el tamaño de los trozos y se creó la pila.

Así que alguien puede proporcionar algunos fragmentos de código, pseudo bacalao sobre cómo múltiples subprocesos podría procesar un archivo y obtener sólo una única porción de ese archivo?

Gracias, Rad

¿Fue útil?

Solución

Qué ha funcionado bien para mí es utilizar una cola para contener sin procesar de trabajo y un diccionario para mantener un seguimiento del trabajo en vuelo:

  1. Crear una clase trabajadora que lleva a la el nombre de archivo, línea de salida, y el número de línea y tiene un método de actualización que ¿la base de datos inserta.Pasar de un método de devolución de llamada que el el trabajador utiliza para la señal cuando su hecho.
  2. Carga de una Cola con instancias del trabajador clase, uno para cada parte.
  3. Generar un despachador de hilo que dequeues un trabajador de la instancia, lanza su actualización método, y añade que el trabajador instancia en un Diccionario, cerrado por su hilo ManagedThreadId.Hacer esto hasta su máxima permitida hilo el conde se alcanza, como lo señaló el De diccionario.El recuento.El despachador espera hasta que un hilo de acabados y, a continuación, inicia otro.Hay varias maneras para que espere.
  4. Como cada subproceso termina, su devolución de llamada quita su ManagedThreadId de la De diccionario.Si el hilo se cierra debido a un error (como tiempo de espera de conexión), el de devolución de llamada puede volver a insertar el trabajador en la Cola.Este es un buen lugar para actualizar su interfaz de usuario.
  5. Su interfaz de usuario puede mostrar hilos activos, el total de progreso, y el tiempo de cada fragmento.Puede permitir al usuario ajustar el número de hilos activos, procesamiento en pausa, muestran los errores, o parada antes.
  6. Cuando la Cola y en el Diccionario están vacías, ya está hecho.

Código de demostración como una aplicación de consola:

using System;
using System.Collections.Generic;
using System.Threading;

namespace threadtest
{
    public delegate void DoneCallbackDelegate(int idArg, bool successArg, string messageArg);

    class Program
    {
        static void Main(string[] args)
        {
            Supervisor supv = new Supervisor();
            supv.LoadQueue();
            supv.Dispatch();
        }
    }

    public class Supervisor
    {
        public Queue<Worker> pendingWork = new Queue<Worker>();
        public Dictionary<int, Worker> activeWork = new Dictionary<int, Worker>();

        private object pendingLock = new object();
        private object activeLock = new object();

        private int maxThreads = 200;

        public void LoadQueue()
        {
            for (int i = 0; i < 1000; i++)
            {
                Worker worker = new Worker();
                worker.Callback = new DoneCallbackDelegate(WorkerFinished);
                lock (pendingLock)
                {
                    pendingWork.Enqueue(worker);
                }
            }
        }

        public void Dispatch()
        {
            int activeThreadCount;

            while (true)
            {
                lock (activeLock) { activeThreadCount = activeWork.Count; }
                while (true)
                {
                    lock (activeLock)
                    {
                        if (activeWork.Count == maxThreads) break;
                    }
                    lock (pendingWork)
                    {
                        if (pendingWork.Count > 0)
                        {
                            Worker worker = pendingWork.Dequeue();
                            Thread thread = new Thread(new ThreadStart(worker.DoWork));
                            thread.IsBackground = true;
                            worker.ThreadId = thread.ManagedThreadId;
                            lock (activeLock) { activeWork.Add(worker.ThreadId, worker); }
                            thread.Start();
                        }
                        else
                        {
                            break;
                        }
                    }
                }
                Thread.Sleep(200); // wait to see if any workers are done (many ways to do this)

                lock (pendingLock)
                    lock (activeLock)
                    {
                        if ((pendingWork.Count == 0) && (activeWork.Count == 0)) break;
                    }
            }
        }

        // remove finished threads from activeWork, resubmit if necessary, and update UI
        public void WorkerFinished(int idArg, bool successArg, string messageArg)
        {
            lock (pendingLock)
                lock (activeLock)
                {
                    Worker worker = activeWork[idArg];
                    activeWork.Remove(idArg);
                    if (!successArg)
                    {
                        // check the message or something to see if you should resubmit thread
                        pendingWork.Enqueue(worker);
                    }
                    // update UI
                    int left = Console.CursorLeft;
                    int top = Console.CursorTop;
                    Console.WriteLine(string.Format("pending:{0} active:{1}        ", pendingWork.Count, activeWork.Count));
                    Console.SetCursorPosition(left, top);
                }
        }
    }

    public class Worker
    {
        // this is where you put in your problem-unique stuff
        public int ThreadId { get; set; }

        DoneCallbackDelegate callback;
        public DoneCallbackDelegate Callback { set { callback = value; } }

        public void DoWork()
        {
            try
            {
                Thread.Sleep(new Random().Next(500, 5000)); // simulate some effort
                callback(ThreadId, true, null);
            }
            catch (Exception ex)
            {
                callback(ThreadId, false, ex.ToString());
            }
        }
    }
}
Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top