Tthreadqueue non in grado di più consumatori?
-
27-10-2019 - |
Domanda
Cercando di utilizzare TThreadqueue (generics.collections) in un singolo produttore schema di consumo multipli. (Delphi-xe). L'idea è di spingere gli oggetti in una coda e lasciare che diversi fili dei lavoratori drenano la coda.
Tuttavia, non funziona come previsto. Quando due o più thread di lavoratori chiamano Popitem, le violazioni dell'accesso vengono lanciate dal Tthreadqueue.
Se la chiamata a Popitem viene serializzata con una sezione critica, tutto va bene.
Sicuramente il Tthreadqueue dovrebbe essere in grado di gestire più consumatori, quindi mi manca qualcosa o è un bug puro in Tthreadqueue?
Ecco un semplice esempio per produrre l'errore.
program TestThreadedQueue;
{$APPTYPE CONSOLE}
uses
// FastMM4 in '..\..\..\FastMM4\FastMM4.pas',
Windows,
Messages,
Classes,
SysUtils,
SyncObjs,
Generics.Collections;
type TThreadTaskMsg =
class(TObject)
private
threadID : integer;
threadMsg : string;
public
Constructor Create( ID : integer; const msg : string);
end;
type TThreadReader =
class(TThread)
private
fPopQueue : TThreadedQueue<TObject>;
fSync : TCriticalSection;
fMsg : TThreadTaskMsg;
fException : Exception;
procedure DoSync;
procedure DoHandleException;
public
Constructor Create( popQueue : TThreadedQueue<TObject>;
sync : TCriticalSection);
procedure Execute; override;
end;
Constructor TThreadReader.Create( popQueue : TThreadedQueue<TObject>;
sync : TCriticalSection);
begin
fPopQueue:= popQueue;
fMsg:= nil;
fSync:= sync;
Self.FreeOnTerminate:= FALSE;
fException:= nil;
Inherited Create( FALSE);
end;
procedure TThreadReader.DoSync ;
begin
WriteLn(fMsg.threadMsg + ' ' + IntToStr(fMsg.threadId));
end;
procedure TThreadReader.DoHandleException;
begin
WriteLn('Exception ->' + fException.Message);
end;
procedure TThreadReader.Execute;
var signal : TWaitResult;
begin
NameThreadForDebugging('QueuePop worker');
while not Terminated do
begin
try
{- Calling PopItem can return empty without waittime !? Let other threads in by sleeping. }
Sleep(20);
{- Serializing calls to PopItem works }
if Assigned(fSync) then fSync.Enter;
try
signal:= fPopQueue.PopItem( TObject(fMsg));
finally
if Assigned(fSync) then fSync.Release;
end;
if (signal = wrSignaled) then
begin
try
if Assigned(fMsg) then
begin
fMsg.threadMsg:= '<Thread id :' +IntToStr( Self.threadId) + '>';
fMsg.Free; // We are just dumping the message in this test
//Synchronize( Self.DoSync);
//PostMessage( fParentForm.Handle,WM_TestQueue_Message,Cardinal(fMsg),0);
end;
except
on E:Exception do begin
end;
end;
end;
except
FException:= Exception(ExceptObject);
try
if not (FException is EAbort) then
begin
{Synchronize(} DoHandleException; //);
end;
finally
FException:= nil;
end;
end;
end;
end;
Constructor TThreadTaskMsg.Create( ID : Integer; Const msg : string);
begin
Inherited Create;
threadID:= ID;
threadMsg:= msg;
end;
var
fSync : TCriticalSection;
fThreadQueue : TThreadedQueue<TObject>;
fReaderArr : array[1..4] of TThreadReader;
i : integer;
begin
try
IsMultiThread:= TRUE;
fSync:= TCriticalSection.Create;
fThreadQueue:= TThreadedQueue<TObject>.Create(1024,1,100);
try
{- Calling without fSync throws exceptions when two or more threads calls PopItem
at the same time }
WriteLn('Creating worker threads ...');
for i:= 1 to 4 do fReaderArr[i]:= TThreadReader.Create( fThreadQueue,Nil);
{- Calling with fSync works ! }
//for i:= 1 to 4 do fReaderArr[i]:= TThreadReader.Create( fThreadQueue,fSync);
WriteLn('Init done. Pushing items ...');
for i:= 1 to 100 do fThreadQueue.PushItem( TThreadTaskMsg.Create( i,''));
ReadLn;
finally
for i:= 1 to 4 do fReaderArr[i].Free;
fThreadQueue.Free;
fSync.Free;
end;
except
on E: Exception do
begin
Writeln(E.ClassName, ': ', E.Message);
ReadLn;
end;
end;
end.
Aggiornare : L'errore in Tmonitor che ha causato lo sradamento di Tthreadqueue è fissato in Delphi XE2.
Aggiornamento 2 : Il test di cui sopra ha stressato la coda nello stato vuoto. Darian Miller ha scoperto che sottolineare la coda a pieno stato, poteva ancora riprodurre l'errore in XE2. L'errore è ancora una volta nel tmonitor. Vedi la sua risposta di seguito per ulteriori informazioni. E anche un collegamento al QC101114.
Aggiornamento 3 : Con Delphi-Xe2 Update 4 c'era una soluzione annunciata per TMonitor
Ciò curerebbe i problemi TThreadedQueue
. I miei test finora non sono in grado di riprodurre alcun errore TThreadedQueue
più. Testatori singoli testati/thread di consumo multipli quando la coda è vuota e piena. Ha anche testato più produttori/più consumatori. Ho variato i thread del lettore e i thread di scrittori da 1 a 100 senza problemi. Ma conoscendo la storia, sfido gli altri a rompersi TMonitor
.
Soluzione
Bene, è difficile essere sicuri senza molti test, ma sembra certamente che questo sia un bug, sia a Tthreadqueue che a Tmonitor. In entrambi i casi è nella RTL e non nel tuo codice. Dovresti archiviare questo come un rapporto QC e utilizzare il tuo esempio sopra come codice "Come riprodurre".
Altri suggerimenti
Ti consiglio di usare Omnithreadlibrary http://www.thedelphigeek.com/search/label/omnithreadlibrary Quando si lavora con thread, parallelismo, ecc. Primoz ha fatto un ottimo lavoro e sul sito troverai molta documentazione utile.
Il tuo esempio sembra funzionare bene sotto XE2, ma se riempiamo la coda non riesce con AV su un Pushitem. (Testato in XE2 Update1)
Per riprodurre, basta aumentare la creazione di attività da 100 a 1100 (la profondità della coda è stata impostata a 1024)
for i:= 1 to 1100 do fThreadQueue.PushItem( TThreadTaskMsg.Create( i,''));
Questo muore per me ogni volta su Windows 7. Inizialmente ho provato una spinta continua per stressarlo, e non è riuscito al loop 30 ... poi al loop 16 ... quindi a 65 quindi a intervalli diversi ma ha costantemente fallito ad alcuni punto.
iLoop := 0;
while iLoop < 1000 do
begin
Inc(iLoop);
WriteLn('Loop: ' + IntToStr(iLoop));
for i:= 1 to 100 do fThreadQueue.PushItem( TThreadTaskMsg.Create( i,''));
end;
Ho cercato la lezione di Tthreadqueue ma non sembra averlo nel mio D2009. Non mi ucciderò esattamente per questo - il supporto del thread di Delphi è sempre stato err .. errm ... "non ottimale" e sospetto che Tthreadqueue non sia diverso :)
Perché utilizzare generici per oggetti PC (produttore / consumatore)? Un semplice discendente TobjectQueue andrà bene - lo userà da decenni - funziona bene con più produttori/consumatori:
unit MinimalSemaphorePCqueue;
{ Absolutely minimal P-C queue based on TobjectQueue and a semaphore.
The semaphore count reflects the queue count
'push' will always succeed unless memory runs out, then you're stuft anyway.
'pop' has a timeout parameter as well as the address of where any received
object is to be put.
'pop' returns immediately with 'true' if there is an object on the queue
available for it.
'pop' blocks the caller if the queue is empty and the timeout is not 0.
'pop' returns false if the timeout is exceeded before an object is available
from the queue.
'pop' returns true if an object is available from the queue before the timeout
is exceeded.
If multiple threads have called 'pop' and are blocked because the queue is
empty, a single 'push' will make only one of the waiting threads ready.
Methods to push/pop from the queue
A 'semaHandle' property that can be used in a 'waitForMultipleObjects' call.
When the handle is signaled, the 'peek' method will retrieve the queued object.
}
interface
uses
Windows, Messages, SysUtils, Classes,syncObjs,contnrs;
type
pObject=^Tobject;
TsemaphoreMailbox=class(TobjectQueue)
private
countSema:Thandle;
protected
access:TcriticalSection;
public
property semaHandle:Thandle read countSema;
constructor create; virtual;
procedure push(aObject:Tobject); virtual;
function pop(pResObject:pObject;timeout:DWORD):boolean; virtual;
function peek(pResObject:pObject):boolean; virtual;
destructor destroy; override;
end;
implementation
{ TsemaphoreMailbox }
constructor TsemaphoreMailbox.create;
begin
{$IFDEF D2009}
inherited Create;
{$ELSE}
inherited create;
{$ENDIF}
access:=TcriticalSection.create;
countSema:=createSemaphore(nil,0,maxInt,nil);
end;
destructor TsemaphoreMailbox.destroy;
begin
access.free;
closeHandle(countSema);
inherited;
end;
function TsemaphoreMailbox.pop(pResObject: pObject;
timeout: DWORD): boolean;
// dequeues an object, if one is available on the queue. If the queue is empty,
// the caller is blocked until either an object is pushed on or the timeout
// period expires
begin // wait for a unit from the semaphore
result:=(WAIT_OBJECT_0=waitForSingleObject(countSema,timeout));
if result then // if a unit was supplied before the timeout,
begin
access.acquire;
try
pResObject^:=inherited pop; // get an object from the queue
finally
access.release;
end;
end;
end;
procedure TsemaphoreMailbox.push(aObject: Tobject);
// pushes an object onto the queue. If threads are waiting in a 'pop' call,
// one of them is made ready.
begin
access.acquire;
try
inherited push(aObject); // shove the object onto the queue
finally
access.release;
end;
releaseSemaphore(countSema,1,nil); // release one unit to semaphore
end;
function TsemaphoreMailbox.peek(pResObject: pObject): boolean;
begin
access.acquire;
try
result:=(count>0);
if result then pResObject^:=inherited pop; // get an object from the queue
finally
access.release;
end;
end;
end.
Non credo che Tthreadqueue dovrebbe supportare più consumatori. È un FIFO, secondo il file di aiuto. Ho l'impressione che ci sia un thread che spinge e un altro (solo uno!).