O processamento de um arquivo simples em pedaços usando múltiplas threads usando o modelo produtor/consumidor padrão e SqlBulkCopy em banco de dados SQL Server

StackOverflow https://stackoverflow.com/questions/2066503

Pergunta

Eu espero que você vai ter comigo.Eu queria fornecer o máximo de informações possível.O principal problema é como criar uma estrutura (como uma pilha) que vai ser utilizada por vários segmentos que irá aparecer um valor e usá-lo para processar um grande plano de arquivo e, possivelmente, andar de bicicleta novamente e novamente, até que todo o arquivo é processado.Se um ficheiro tem de 100.000 registros que podem ser processados por 5 threads usando 2.000 linha de blocos em seguida, cada thread irá obter 10 blocos para o processo.

Meu objetivo é mover os dados em um arquivo simples (com Cabeçalho...Subcabeçalho...Detalhe, Detalhe, Detalhe ...Detalhe, SubFooter, Subcabeçalho...Detalhe, Detalhe, Detalhe ...Detalhe, SubFooter, Subcabeçalho...Detalhe, Detalhe, Detalhe ...Detalhe, SubFooter, Rodapé estrutura) em OLTP DB que tem o modo de recuperação Simples (possível Completo) em 3 tabelas:1º representando Subcabeçalho de chave exclusivo presentes no Subcabeçalho de linha, 2 de uma tabela intermediária SubheaderGroup, que representam o agrupamento das linhas de detalhes em blocos de registros de 2000 (precisa ter Subcabeçalho de Identidade PK como seus FK e 3º representando as linhas de Detalhes com FK apontando para Subcabeçalho de PK.

Eu estou fazendo o manual de gerenciamento de transação, pois eu posso ter dezenas de milhares de linhas de Detalhes e eu estou usando um campo especial que é definido para 0 nas tabelas de destino durante a carga e, em seguida, no final do processamento do arquivo que eu estou fazendo um transacional atualização de alterar este valor para 1, que pode ser um sinal de outra aplicação que o carregamento terminou.

Eu quero cortar este plano arquivo em várias partes iguais (mesmo número de linhas) que podem ser processados com vários threads e importados usando SqlBulkCopy usando IDataReader que é criado a partir da tabela de Destino de metadados).

Eu quero usar o produtor/consumidor padrão (como explicado no link abaixo - pdf análise e código de exemplo) para usar SqlBulkCopy com SqlBulkCopyOptions.TableLock opção.http://sqlblog.com/blogs/alberto_ferrari/archive/2009/11/30/sqlbulkcopy-performance-analysis.aspx Este padrão permite a criação de vários produtores e o equivalente ao número de consumidores que precisa para se inscrever para os produtores de consumir a linha.

Em TestSqlBulkCopy projeto, DataProducer.cs ficheiro não é um método que simula a produção de milhares de registros.

public void Produce (DataConsumer consumer, int numberOfRows) {
    int bufferSize = 100000;
    int numberOfBuffers = numberOfRows / bufferSize;

    for (int bufferNumber = 0; bufferNumber < numberOfBuffers; bufferNumber++) {
        DataTable buffer = consumer.GetBufferDataTable ();

        for (int rowNumber = 0; rowNumber < bufferSize; rowNumber++) {
            object[] values = GetRandomRow (consumer);
            buffer.Rows.Add (values);
        }
        consumer.AddBufferDataTable (buffer);
    }
}

Este método será executado no contexto de um novo thread.Eu quero esse novo thread para ler apenas um único bloco de arquivo plano original e outro thread vai strart o processamento do próximo bloco.Os consumidores, em seguida, mover dados (que é bombeada para eles) para banco de dados SQL Server usando SqlBulkCopy ADO.NET de classe.

Portanto, a questão aqui é sobre o principal programa de ditar o que lineFrom para lineTo deve ser processado por cada segmento e acho que deve acontecer durante a criação da thread.Segunda solução é, provavelmente, para threads para compartilhar alguns estrutura e usar algo exclusivo para eles (como o número de threads ou número de sequência) para a pesquisa de uma estrutura compartilhada (possivelmente uma pilha de pop e um valor (bloqueio de uma pilha enquanto a fazê-lo) e, em seguida, a próxima thread irá, em seguida, recolha o valor seguinte.O programa principal irá escolher para o plano de arquivo e determinar o tamanho dos pedaços e criou a pilha.

Então, alguém pode fornecer alguns trechos de código, pseudo bacalhau em como vários threads para o processamento de um arquivo e ter apenas uma única porção desse arquivo?

Obrigado, Rad

Foi útil?

Solução

O que funcionou bem para mim é usar uma fila para manter de trabalho não processados e um dicionário para acompanhar o trabalho em voo:

  1. Criar uma classe de trabalhadores que leva a nome de ficheiro, linha de largada e a linha de contagem e tem um método de atualização que o banco de dados insere.Passar um método de retorno de chamada que o trabalhador usa para sinalizar quando o seu feito.
  2. Carregar uma Fila com instâncias do trabalhador classe, um para cada bloco.
  3. Gerar um despachante que dequeues um instância de trabalho, lança a sua atualização o método, e adiciona a instância de trabalho em um Dicionário, introduzidos por sua thread ManagedThreadId.Fazer isso até o máximo permitido thread contagem é atingido, como observou o Dicionário.Contagem.O dispatcher espera até que uma thread termina e, em seguida, inicia outro.Há várias maneiras para que ele aguarde.
  4. Como cada thread termina, seu retorno de chamada remove sua ManagedThreadId do Dicionário.Se o thread sai devido a um erro (tal como tempo limite de conexão), o de retorno de chamada pode reinserir o trabalhador na Fila.Este é um bom lugar para atualizar a INTERFACE do usuário.
  5. Sua INTERFACE de usuário pode mostrar segmentos ativos, o total de progresso, e vez por bloco.Ele pode permitir que o usuário ajuste o número de segmentos ativos, pause o processamento, mostrar os erros, ou deixar de início.
  6. Quando a Fila e o Dicionário estiver vazia, você está feito.

Demonstração de código como um aplicativo de console:

using System;
using System.Collections.Generic;
using System.Threading;

namespace threadtest
{
    public delegate void DoneCallbackDelegate(int idArg, bool successArg, string messageArg);

    class Program
    {
        static void Main(string[] args)
        {
            Supervisor supv = new Supervisor();
            supv.LoadQueue();
            supv.Dispatch();
        }
    }

    public class Supervisor
    {
        public Queue<Worker> pendingWork = new Queue<Worker>();
        public Dictionary<int, Worker> activeWork = new Dictionary<int, Worker>();

        private object pendingLock = new object();
        private object activeLock = new object();

        private int maxThreads = 200;

        public void LoadQueue()
        {
            for (int i = 0; i < 1000; i++)
            {
                Worker worker = new Worker();
                worker.Callback = new DoneCallbackDelegate(WorkerFinished);
                lock (pendingLock)
                {
                    pendingWork.Enqueue(worker);
                }
            }
        }

        public void Dispatch()
        {
            int activeThreadCount;

            while (true)
            {
                lock (activeLock) { activeThreadCount = activeWork.Count; }
                while (true)
                {
                    lock (activeLock)
                    {
                        if (activeWork.Count == maxThreads) break;
                    }
                    lock (pendingWork)
                    {
                        if (pendingWork.Count > 0)
                        {
                            Worker worker = pendingWork.Dequeue();
                            Thread thread = new Thread(new ThreadStart(worker.DoWork));
                            thread.IsBackground = true;
                            worker.ThreadId = thread.ManagedThreadId;
                            lock (activeLock) { activeWork.Add(worker.ThreadId, worker); }
                            thread.Start();
                        }
                        else
                        {
                            break;
                        }
                    }
                }
                Thread.Sleep(200); // wait to see if any workers are done (many ways to do this)

                lock (pendingLock)
                    lock (activeLock)
                    {
                        if ((pendingWork.Count == 0) && (activeWork.Count == 0)) break;
                    }
            }
        }

        // remove finished threads from activeWork, resubmit if necessary, and update UI
        public void WorkerFinished(int idArg, bool successArg, string messageArg)
        {
            lock (pendingLock)
                lock (activeLock)
                {
                    Worker worker = activeWork[idArg];
                    activeWork.Remove(idArg);
                    if (!successArg)
                    {
                        // check the message or something to see if you should resubmit thread
                        pendingWork.Enqueue(worker);
                    }
                    // update UI
                    int left = Console.CursorLeft;
                    int top = Console.CursorTop;
                    Console.WriteLine(string.Format("pending:{0} active:{1}        ", pendingWork.Count, activeWork.Count));
                    Console.SetCursorPosition(left, top);
                }
        }
    }

    public class Worker
    {
        // this is where you put in your problem-unique stuff
        public int ThreadId { get; set; }

        DoneCallbackDelegate callback;
        public DoneCallbackDelegate Callback { set { callback = value; } }

        public void DoWork()
        {
            try
            {
                Thread.Sleep(new Random().Next(500, 5000)); // simulate some effort
                callback(ThreadId, true, null);
            }
            catch (Exception ex)
            {
                callback(ThreadId, false, ex.ToString());
            }
        }
    }
}
Licenciado em: CC-BY-SA com atribuição
Não afiliado a StackOverflow
scroll top