처리 평 파일에서 덩어리를 사용하여 여러 쓰레드를 사용하여 생산자/소비자 패턴과 현재 권장되는 버전은 여기에서 찾으로 SQL DB

StackOverflow https://stackoverflow.com/questions/2066503

문제

나는 당신이 나와 함께.고 싶어 많은 정보를 제공 할 수 있습니다.주요 문제를 만드는 방법 구조(스럼)에 의해 사용되는 여러 쓰레드는 것이 값을 사용하여 프로세스 중 하나 큰 평면 파일을 가능하게 할 순환시할 때까지 전체 파일을 처리됩니다.파일이 있는 경우에는 100.000 기록 처리할 수 있는 5 개의 스레드를 사용하여 2.000 행 덩어리 다음 각 스레드 10 덩어리하는 과정입니다.

나의 목표는 데이터 이동에 편평한 파일(을 가진 헤더가...서브 헤더...세부,세부사항,세부사항,...세부사항,SubFooter,서브 헤더...세부,세부사항,세부사항,...세부사항,SubFooter, 서브 헤더...세부,세부사항,세부사항,...세부사항,SubFooter Footer 구조)로 OLTP DB 있는 복 모드 간단(가능한 전체)3 테이블:1 을 나타내는 서브 헤더의 독특한 키에 존재하는 서브 헤더 줄,2 중간 테이블 SubheaderGroup 대표하는 그룹의 세부 사항에서 행 덩어리의 2000 기록(필요가 있는 서브 헤더의 Id PK 로 FK 및 제 3 을 나타내는 세부 행 FK 을 가리키는 서브 헤더 제공합니다.

나는 하동 트랜잭션 관리할 수 있기 때문에 있는 수천만의 세부 행 와 나를 사용하여 특수 필드를 0 으로 설정 대상에 테이블을 로드하는 동안 다음의 끝에서 파일을 처리하고 있는 트랜잭션 업데이트는 이 값을 변경하는 1 할 수 있는 신호는 다른 응용 프로그램 로딩이 완료됩니다.

하고 싶어 들어온이 평면 파일을 여러 개의 동일한 조각(동일한 번호의 행)을 처리할 수 있는 여러 쓰레드와 수입을 사용하여 현재 권장되는 버전은 여기에서 찾을 사용하여 IDataReader 에서 생성되는 대상에 테이블 메타데이터).

내가 사용하려는 생산자/소비자의 패턴(로에서 설명 아래 링크-pdf 분석 및 샘플 코드)를 사용하여 현재 권장되는 버전은 여기에서 찾으로 SqlBulkCopyOptions.TableLock 옵션입니다.http://sqlblog.com/blogs/alberto_ferrari/archive/2009/11/30/sqlbulkcopy-performance-analysis.aspx 이는 패턴을 생성할 수 있습니 다수 생산자 및 이와 동등한 소비자의 수는 가입 할 필요가 생산자를 소비하는 행이 있습니다.

에 TestSqlBulkCopy 프로젝트 DataProducer.cs 파일이 있는 방법을 시뮬레이션하는 생산의 수천 명의 기록이다.

public void Produce (DataConsumer consumer, int numberOfRows) {
    int bufferSize = 100000;
    int numberOfBuffers = numberOfRows / bufferSize;

    for (int bufferNumber = 0; bufferNumber < numberOfBuffers; bufferNumber++) {
        DataTable buffer = consumer.GetBufferDataTable ();

        for (int rowNumber = 0; rowNumber < bufferSize; rowNumber++) {
            object[] values = GetRandomRow (consumer);
            buffer.Rows.Add (values);
        }
        consumer.AddBufferDataTable (buffer);
    }
}

이 방법은 실행의 컨텍스트에서 새로운 스레드가 있습니다.나는 원하는 이 새로운 스레드를 읽을 만한 덩어리를 원래의 평면 파일을 다른 스레드가 strart 처리 다음에 덩어리.소비자는 다음 데이터 이동(양수되는 그들)SQL 서버에 DB 를 사용하여 현재 권장되는 버전은 여기에서 찾 ADO.NET 클래스입니다.

그래서 여기에 질문에 대한 주요 프로그램으로 영향을 주고 lineFrom 을 lineTo 가 처리해야 하는 각 스레드를 생각해야 하는 동안 일어날 스레드 생성.두 번째 해결책은 아마도 스레드 공유하는 일부 구조와 사용하여 독특한 무언가가 그들에(같은 스레드 번호나 순번)조회 공유 구조물(아마도 스택과 팝의 값(잠금 스택을 하는 동안 그것은)다음 다음 스레드가 다음 픽업이 다음 값입니다.주요 프로그램을 선택할 것으로 평 파일의 크기를 결정하는 덩어리로 만들었다.

그래서 수 있는 누군가가 제공하는 코드 조각,의사는 대구에 어떻게 여러 스레드 과정을 하나의 파일만 고유의 일부는 파일입니까?

감사합니다, Rad

도움이 되었습니까?

해결책

무슨 일이 잘 나를 위해 사용하는 큐를 잡아 처리 되지 않은 작품과 사전을 추적의 작업에서-항공편:

  1. 작업자를 생성하는 클래스 름,시인,그리고 라 계산 고 있 업데이트하는 방법 는 데이터베이스를 삽입합니다.전달된 이 방법 작업자를 사용하여 신호를 수 있습니다.
  2. 부하 큐의 인스턴스와 작업자 클래스를,하나에 대한 각각의 덩어리.
  3. 산란 발송자에는 스레드를 큐에서 해제하 작업자를 들어,출시 업데이트 방법,그리고 추가 작업자 인스턴스로 사전에 키로 사용하여 해당 실의 ManagedThreadId.이 할 때까지 당신의 최대 허용 가능한 스레드 개수에 도달으로 주목에 의해 사전입니다.계산합니다.차 대기까지 스레드가 완료되 다음 시작합니다.거기에 여러 가지 방법을 기다립니다.
  4. 각 스레드가 완료되어,그것의 콜백 제거에서 ManagedThreadId 사전입니다.는 경우에 스레드를 종료 때문에 오류(예: 연결 시간 제한)다음 콜백을 삽입업자 니다.이 좋은 장소입니다 를 업데이트 UI.
  5. 귀하의 UI 를 표시할 수 있습 활성 스레드,총,진행 상황과 시간당 덩어리.할 수 있는 사용자의 수를 조정 활성 스레드,처리가 일시 중지,오류 표시,또는 중니다.
  6. 큐와 사전에 비어 있는,당신은 끝났어.

데모드로 콘솔 응용 프로그램:

using System;
using System.Collections.Generic;
using System.Threading;

namespace threadtest
{
    public delegate void DoneCallbackDelegate(int idArg, bool successArg, string messageArg);

    class Program
    {
        static void Main(string[] args)
        {
            Supervisor supv = new Supervisor();
            supv.LoadQueue();
            supv.Dispatch();
        }
    }

    public class Supervisor
    {
        public Queue<Worker> pendingWork = new Queue<Worker>();
        public Dictionary<int, Worker> activeWork = new Dictionary<int, Worker>();

        private object pendingLock = new object();
        private object activeLock = new object();

        private int maxThreads = 200;

        public void LoadQueue()
        {
            for (int i = 0; i < 1000; i++)
            {
                Worker worker = new Worker();
                worker.Callback = new DoneCallbackDelegate(WorkerFinished);
                lock (pendingLock)
                {
                    pendingWork.Enqueue(worker);
                }
            }
        }

        public void Dispatch()
        {
            int activeThreadCount;

            while (true)
            {
                lock (activeLock) { activeThreadCount = activeWork.Count; }
                while (true)
                {
                    lock (activeLock)
                    {
                        if (activeWork.Count == maxThreads) break;
                    }
                    lock (pendingWork)
                    {
                        if (pendingWork.Count > 0)
                        {
                            Worker worker = pendingWork.Dequeue();
                            Thread thread = new Thread(new ThreadStart(worker.DoWork));
                            thread.IsBackground = true;
                            worker.ThreadId = thread.ManagedThreadId;
                            lock (activeLock) { activeWork.Add(worker.ThreadId, worker); }
                            thread.Start();
                        }
                        else
                        {
                            break;
                        }
                    }
                }
                Thread.Sleep(200); // wait to see if any workers are done (many ways to do this)

                lock (pendingLock)
                    lock (activeLock)
                    {
                        if ((pendingWork.Count == 0) && (activeWork.Count == 0)) break;
                    }
            }
        }

        // remove finished threads from activeWork, resubmit if necessary, and update UI
        public void WorkerFinished(int idArg, bool successArg, string messageArg)
        {
            lock (pendingLock)
                lock (activeLock)
                {
                    Worker worker = activeWork[idArg];
                    activeWork.Remove(idArg);
                    if (!successArg)
                    {
                        // check the message or something to see if you should resubmit thread
                        pendingWork.Enqueue(worker);
                    }
                    // update UI
                    int left = Console.CursorLeft;
                    int top = Console.CursorTop;
                    Console.WriteLine(string.Format("pending:{0} active:{1}        ", pendingWork.Count, activeWork.Count));
                    Console.SetCursorPosition(left, top);
                }
        }
    }

    public class Worker
    {
        // this is where you put in your problem-unique stuff
        public int ThreadId { get; set; }

        DoneCallbackDelegate callback;
        public DoneCallbackDelegate Callback { set { callback = value; } }

        public void DoWork()
        {
            try
            {
                Thread.Sleep(new Random().Next(500, 5000)); // simulate some effort
                callback(ThreadId, true, null);
            }
            catch (Exception ex)
            {
                callback(ThreadId, false, ex.ToString());
            }
        }
    }
}
라이센스 : CC-BY-SA ~와 함께 속성
제휴하지 않습니다 StackOverflow
scroll top