Domanda

Cercando di utilizzare TThreadqueue (generics.collections) in un singolo produttore schema di consumo multipli. (Delphi-xe). L'idea è di spingere gli oggetti in una coda e lasciare che diversi fili dei lavoratori drenano la coda.

Tuttavia, non funziona come previsto. Quando due o più thread di lavoratori chiamano Popitem, le violazioni dell'accesso vengono lanciate dal Tthreadqueue.

Se la chiamata a Popitem viene serializzata con una sezione critica, tutto va bene.

Sicuramente il Tthreadqueue dovrebbe essere in grado di gestire più consumatori, quindi mi manca qualcosa o è un bug puro in Tthreadqueue?

Ecco un semplice esempio per produrre l'errore.

program TestThreadedQueue;

{$APPTYPE CONSOLE}

uses
//  FastMM4 in '..\..\..\FastMM4\FastMM4.pas',
  Windows,
  Messages,
  Classes,
  SysUtils,
  SyncObjs,
  Generics.Collections;

type TThreadTaskMsg =
       class(TObject)
         private
           threadID  : integer;
           threadMsg : string;
         public
           Constructor Create( ID : integer; const msg : string);
       end;

type TThreadReader =
       class(TThread)
         private
           fPopQueue   : TThreadedQueue<TObject>;
           fSync       : TCriticalSection;
           fMsg        : TThreadTaskMsg;
           fException  : Exception;
           procedure DoSync;
           procedure DoHandleException;
         public
           Constructor Create( popQueue : TThreadedQueue<TObject>;
                               sync     : TCriticalSection);
           procedure Execute; override;
       end;

Constructor TThreadReader.Create( popQueue : TThreadedQueue<TObject>;
                                  sync     : TCriticalSection);
begin
  fPopQueue:=            popQueue;
  fMsg:=                 nil;
  fSync:=                sync;
  Self.FreeOnTerminate:= FALSE;
  fException:=           nil;

  Inherited Create( FALSE);
end;

procedure TThreadReader.DoSync ;
begin
  WriteLn(fMsg.threadMsg + ' ' + IntToStr(fMsg.threadId));
end;

procedure TThreadReader.DoHandleException;
begin
  WriteLn('Exception ->' + fException.Message);
end;

procedure TThreadReader.Execute;
var signal : TWaitResult;
begin
  NameThreadForDebugging('QueuePop worker');
  while not Terminated do
  begin
    try
      {- Calling PopItem can return empty without waittime !? Let other threads in by sleeping. }
      Sleep(20);
      {- Serializing calls to PopItem works }
      if Assigned(fSync) then fSync.Enter;
      try
        signal:= fPopQueue.PopItem( TObject(fMsg));
      finally
        if Assigned(fSync) then fSync.Release;
      end;
      if (signal = wrSignaled) then
      begin
        try
          if Assigned(fMsg) then
          begin
            fMsg.threadMsg:= '<Thread id :' +IntToStr( Self.threadId) + '>';
            fMsg.Free; // We are just dumping the message in this test
            //Synchronize( Self.DoSync);
            //PostMessage( fParentForm.Handle,WM_TestQueue_Message,Cardinal(fMsg),0);
          end;
        except
          on E:Exception do begin
          end;
        end;
      end;
      except
       FException:= Exception(ExceptObject);
      try
        if not (FException is EAbort) then
        begin
          {Synchronize(} DoHandleException; //);
        end;
      finally
        FException:= nil;
      end;
   end;
  end;
end;

Constructor TThreadTaskMsg.Create( ID : Integer; Const msg : string);
begin
  Inherited Create;

  threadID:= ID;
  threadMsg:= msg;
end;

var
    fSync : TCriticalSection;
    fThreadQueue : TThreadedQueue<TObject>;
    fReaderArr : array[1..4] of TThreadReader;
    i : integer;

begin
  try
    IsMultiThread:= TRUE;

    fSync:=        TCriticalSection.Create;
    fThreadQueue:= TThreadedQueue<TObject>.Create(1024,1,100);
    try
      {- Calling without fSync throws exceptions when two or more threads calls PopItem
         at the same time }
      WriteLn('Creating worker threads ...');
      for i:= 1 to 4 do fReaderArr[i]:= TThreadReader.Create( fThreadQueue,Nil);
      {- Calling with fSync works ! }
      //for i:= 1 to 4 do fReaderArr[i]:= TThreadReader.Create( fThreadQueue,fSync);
       WriteLn('Init done. Pushing items ...');

      for i:= 1 to 100 do fThreadQueue.PushItem( TThreadTaskMsg.Create( i,''));

      ReadLn;

    finally
      for i:= 1 to 4 do fReaderArr[i].Free;
      fThreadQueue.Free;
      fSync.Free;
    end;

  except
    on E: Exception do
      begin
        Writeln(E.ClassName, ': ', E.Message);
        ReadLn;
      end;
  end;
end.

Aggiornare : L'errore in Tmonitor che ha causato lo sradamento di Tthreadqueue è fissato in Delphi XE2.

Aggiornamento 2 : Il test di cui sopra ha stressato la coda nello stato vuoto. Darian Miller ha scoperto che sottolineare la coda a pieno stato, poteva ancora riprodurre l'errore in XE2. L'errore è ancora una volta nel tmonitor. Vedi la sua risposta di seguito per ulteriori informazioni. E anche un collegamento al QC101114.

Aggiornamento 3 : Con Delphi-Xe2 Update 4 c'era una soluzione annunciata per TMonitor Ciò curerebbe i problemi TThreadedQueue. I miei test finora non sono in grado di riprodurre alcun errore TThreadedQueue più. Testatori singoli testati/thread di consumo multipli quando la coda è vuota e piena. Ha anche testato più produttori/più consumatori. Ho variato i thread del lettore e i thread di scrittori da 1 a 100 senza problemi. Ma conoscendo la storia, sfido gli altri a rompersi TMonitor.

È stato utile?

Soluzione

Bene, è difficile essere sicuri senza molti test, ma sembra certamente che questo sia un bug, sia a Tthreadqueue che a Tmonitor. In entrambi i casi è nella RTL e non nel tuo codice. Dovresti archiviare questo come un rapporto QC e utilizzare il tuo esempio sopra come codice "Come riprodurre".

Altri suggerimenti

Ti consiglio di usare Omnithreadlibrary http://www.thedelphigeek.com/search/label/omnithreadlibrary Quando si lavora con thread, parallelismo, ecc. Primoz ha fatto un ottimo lavoro e sul sito troverai molta documentazione utile.

Il tuo esempio sembra funzionare bene sotto XE2, ma se riempiamo la coda non riesce con AV su un Pushitem. (Testato in XE2 Update1)

Per riprodurre, basta aumentare la creazione di attività da 100 a 1100 (la profondità della coda è stata impostata a 1024)

for i:= 1 to 1100 do fThreadQueue.PushItem( TThreadTaskMsg.Create( i,''));

Questo muore per me ogni volta su Windows 7. Inizialmente ho provato una spinta continua per stressarlo, e non è riuscito al loop 30 ... poi al loop 16 ... quindi a 65 quindi a intervalli diversi ma ha costantemente fallito ad alcuni punto.

  iLoop := 0;
  while iLoop < 1000 do
  begin
    Inc(iLoop);
    WriteLn('Loop: ' + IntToStr(iLoop));  
    for i:= 1 to 100 do fThreadQueue.PushItem( TThreadTaskMsg.Create( i,''));
  end;

Ho cercato la lezione di Tthreadqueue ma non sembra averlo nel mio D2009. Non mi ucciderò esattamente per questo - il supporto del thread di Delphi è sempre stato err .. errm ... "non ottimale" e sospetto che Tthreadqueue non sia diverso :)

Perché utilizzare generici per oggetti PC (produttore / consumatore)? Un semplice discendente TobjectQueue andrà bene - lo userà da decenni - funziona bene con più produttori/consumatori:

unit MinimalSemaphorePCqueue;

{ Absolutely minimal P-C queue based on TobjectQueue and a semaphore.

The semaphore count reflects the queue count
'push' will always succeed unless memory runs out, then you're stuft anyway.
'pop' has a timeout parameter as well as the address of where any received
object is to be put.
'pop' returns immediately with 'true' if there is an object on the queue
available for it.
'pop' blocks the caller if the queue is empty and the timeout is not 0.
'pop' returns false if the timeout is exceeded before an object is available
from the queue.
'pop' returns true if an object is available from the queue before the timeout
is exceeded.
If multiple threads have called 'pop' and are blocked because the queue is
empty, a single 'push' will make only one of the waiting threads ready.


Methods to push/pop from the queue
A 'semaHandle' property that can be used in a 'waitForMultipleObjects' call.
When the handle is signaled, the 'peek' method will retrieve the queued object.
}
interface

uses
  Windows, Messages, SysUtils, Classes,syncObjs,contnrs;


type

pObject=^Tobject;


TsemaphoreMailbox=class(TobjectQueue)
private
  countSema:Thandle;
protected
  access:TcriticalSection;
public
  property semaHandle:Thandle read countSema;
  constructor create; virtual;
  procedure push(aObject:Tobject); virtual;
  function pop(pResObject:pObject;timeout:DWORD):boolean;  virtual;
  function peek(pResObject:pObject):boolean;  virtual;
  destructor destroy; override;
end;


implementation

{ TsemaphoreMailbox }

constructor TsemaphoreMailbox.create;
begin
{$IFDEF D2009}
   inherited Create;
{$ELSE}
  inherited create;
{$ENDIF}
  access:=TcriticalSection.create;
  countSema:=createSemaphore(nil,0,maxInt,nil);
end;

destructor TsemaphoreMailbox.destroy;
begin
  access.free;
  closeHandle(countSema);
  inherited;
end;

function TsemaphoreMailbox.pop(pResObject: pObject;
  timeout: DWORD): boolean;
// dequeues an object, if one is available on the queue.  If the queue is empty,
// the caller is blocked until either an object is pushed on or the timeout
// period expires
begin // wait for a unit from the semaphore
  result:=(WAIT_OBJECT_0=waitForSingleObject(countSema,timeout));
  if result then // if a unit was supplied before the timeout,
  begin
    access.acquire;
    try
      pResObject^:=inherited pop; // get an object from the queue
    finally
      access.release;
    end;
  end;
end;

procedure TsemaphoreMailbox.push(aObject: Tobject);
// pushes an object onto the queue.  If threads are waiting in a 'pop' call,
// one of them is made ready.
begin
  access.acquire;
  try
    inherited push(aObject); // shove the object onto the queue
  finally
    access.release;
  end;
  releaseSemaphore(countSema,1,nil); // release one unit to semaphore
end;

function TsemaphoreMailbox.peek(pResObject: pObject): boolean;
begin
  access.acquire;
  try
    result:=(count>0);
    if result then pResObject^:=inherited pop; // get an object from the queue
  finally
    access.release;
  end;
end;
end.

Non credo che Tthreadqueue dovrebbe supportare più consumatori. È un FIFO, secondo il file di aiuto. Ho l'impressione che ci sia un thread che spinge e un altro (solo uno!).

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top