TThreadedQueue pas en mesure de consommateurs multiples?
-
27-10-2019 - |
Question
Essayer d'utiliser la TThreadedQueue (Generics.Collections) dans un seul producteur multiples système de consommation. (Delphi-XE). L'idée est de pousser des objets dans une file d'attente et laisser plusieurs threads de travail vider la file d'attente.
Il ne fonctionne pas comme prévu, cependant. Lorsque deux ou plusieurs threads de travail appellent PopItem, les violations d'accès sont éjectés du TThreadedQueue.
Si l'appel à PopItem est sérialisé avec une section critique, tout ira bien.
Certes, le TThreadedQueue devrait être en mesure de gérer plusieurs consommateurs, de sorte que je manque quelque chose ou est-ce un bug pur TThreadedQueue?
Voici un exemple simple pour produire l'erreur.
program TestThreadedQueue;
{$APPTYPE CONSOLE}
uses
// FastMM4 in '..\..\..\FastMM4\FastMM4.pas',
Windows,
Messages,
Classes,
SysUtils,
SyncObjs,
Generics.Collections;
type TThreadTaskMsg =
class(TObject)
private
threadID : integer;
threadMsg : string;
public
Constructor Create( ID : integer; const msg : string);
end;
type TThreadReader =
class(TThread)
private
fPopQueue : TThreadedQueue<TObject>;
fSync : TCriticalSection;
fMsg : TThreadTaskMsg;
fException : Exception;
procedure DoSync;
procedure DoHandleException;
public
Constructor Create( popQueue : TThreadedQueue<TObject>;
sync : TCriticalSection);
procedure Execute; override;
end;
Constructor TThreadReader.Create( popQueue : TThreadedQueue<TObject>;
sync : TCriticalSection);
begin
fPopQueue:= popQueue;
fMsg:= nil;
fSync:= sync;
Self.FreeOnTerminate:= FALSE;
fException:= nil;
Inherited Create( FALSE);
end;
procedure TThreadReader.DoSync ;
begin
WriteLn(fMsg.threadMsg + ' ' + IntToStr(fMsg.threadId));
end;
procedure TThreadReader.DoHandleException;
begin
WriteLn('Exception ->' + fException.Message);
end;
procedure TThreadReader.Execute;
var signal : TWaitResult;
begin
NameThreadForDebugging('QueuePop worker');
while not Terminated do
begin
try
{- Calling PopItem can return empty without waittime !? Let other threads in by sleeping. }
Sleep(20);
{- Serializing calls to PopItem works }
if Assigned(fSync) then fSync.Enter;
try
signal:= fPopQueue.PopItem( TObject(fMsg));
finally
if Assigned(fSync) then fSync.Release;
end;
if (signal = wrSignaled) then
begin
try
if Assigned(fMsg) then
begin
fMsg.threadMsg:= '<Thread id :' +IntToStr( Self.threadId) + '>';
fMsg.Free; // We are just dumping the message in this test
//Synchronize( Self.DoSync);
//PostMessage( fParentForm.Handle,WM_TestQueue_Message,Cardinal(fMsg),0);
end;
except
on E:Exception do begin
end;
end;
end;
except
FException:= Exception(ExceptObject);
try
if not (FException is EAbort) then
begin
{Synchronize(} DoHandleException; //);
end;
finally
FException:= nil;
end;
end;
end;
end;
Constructor TThreadTaskMsg.Create( ID : Integer; Const msg : string);
begin
Inherited Create;
threadID:= ID;
threadMsg:= msg;
end;
var
fSync : TCriticalSection;
fThreadQueue : TThreadedQueue<TObject>;
fReaderArr : array[1..4] of TThreadReader;
i : integer;
begin
try
IsMultiThread:= TRUE;
fSync:= TCriticalSection.Create;
fThreadQueue:= TThreadedQueue<TObject>.Create(1024,1,100);
try
{- Calling without fSync throws exceptions when two or more threads calls PopItem
at the same time }
WriteLn('Creating worker threads ...');
for i:= 1 to 4 do fReaderArr[i]:= TThreadReader.Create( fThreadQueue,Nil);
{- Calling with fSync works ! }
//for i:= 1 to 4 do fReaderArr[i]:= TThreadReader.Create( fThreadQueue,fSync);
WriteLn('Init done. Pushing items ...');
for i:= 1 to 100 do fThreadQueue.PushItem( TThreadTaskMsg.Create( i,''));
ReadLn;
finally
for i:= 1 to 4 do fReaderArr[i].Free;
fThreadQueue.Free;
fSync.Free;
end;
except
on E: Exception do
begin
Writeln(E.ClassName, ': ', E.Message);
ReadLn;
end;
end;
end.
Mise à jour :. L'erreur qui a causé TMonitor TThreadedQueue à l'accident est fixé dans Delphi XE2
Update 2 : Le test ci-dessus a souligné la file d'attente dans l'état vide. Darian Miller a constaté que soulignant la file d'attente à l'état complet, pourrait encore reproduire l'erreur dans XE2. L'erreur est encore une fois dans le TMonitor. Voir sa réponse ci-dessous pour plus d'informations. Et aussi un lien vers le QC101114.
Mise à jour 3 :
Avec la mise à jour Delphi XE2-4 il y avait une solution pour TMonitor
annoncé qui guérirait les problèmes TThreadedQueue
. Mes tests jusqu'à présent ne sont pas en mesure de reproduire des erreurs dans TThreadedQueue
plus.
Testé seul producteur / plusieurs threads de consommation lorsque la file d'attente est vide et plein.
Également testé plusieurs producteurs / consommateurs multiples. Je MODIFIÉ les fils de lecteur et les fils de l'écrivain de 1 à 100, sans pépin. Mais connaissant l'histoire, j'ose les autres à briser TMonitor
.
La solution
Eh bien, il est difficile d'être sûr sans beaucoup de tests, mais il semble certainement ce genre est un bug, que ce soit dans TThreadedQueue ou TMonitor. De toute façon, il est dans le RTL et non votre code. Vous devez déposer comme un rapport QC et utilisez votre exemple ci-dessus que le code « comment reproduire ».
Autres conseils
Je vous recommande d'utiliser OmniThreadLibrary http://www.thedelphigeek.com/search/label / OmniThreadLibrary lorsque vous travaillez avec des fils, le parallélisme, etc. Primoz a fait un très bon travail, et sur le site, vous trouverez beaucoup de documentation utile.
Votre exemple semble fonctionner correctement sous XE2, mais si nous remplissons votre file d'attente, il échoue avec AV sur un PushItem. (Testé sous XE2 Update1)
Pour reproduire, simplement augmenter la création de votre tâche 100-1100 (profondeur de votre file d'attente a été fixée à 1024)
for i:= 1 to 1100 do fThreadQueue.PushItem( TThreadTaskMsg.Create( i,''));
Ce meurt pour moi à chaque fois sur Windows 7. J'ai essayé d'abord une poussée continue à tester de stress, et il a échoué à boucle 30 ... puis à boucle 16 ... puis à 65 donc à des intervalles différents mais toujours a échoué à un moment donné.
iLoop := 0;
while iLoop < 1000 do
begin
Inc(iLoop);
WriteLn('Loop: ' + IntToStr(iLoop));
for i:= 1 to 100 do fThreadQueue.PushItem( TThreadTaskMsg.Create( i,''));
end;
J'ai regardé pour la classe TThreadedQueue mais ne semblent pas avoir dans mon D2009. Je vais pas exactement me tuer sur ce - support de fil Delphi a toujours été err .. ... ERRM « non optimale » et je soupçonne que TThreadedQueue est pas différent:)
Pourquoi utiliser des médicaments génériques pour les objets P-C (producteurs / consommateurs)? Un descendant de TObjectQueue de simples fera très bien - utilise ce depuis des décennies - fonctionne très bien avec plusieurs producteurs / consommateurs:
unit MinimalSemaphorePCqueue;
{ Absolutely minimal P-C queue based on TobjectQueue and a semaphore.
The semaphore count reflects the queue count
'push' will always succeed unless memory runs out, then you're stuft anyway.
'pop' has a timeout parameter as well as the address of where any received
object is to be put.
'pop' returns immediately with 'true' if there is an object on the queue
available for it.
'pop' blocks the caller if the queue is empty and the timeout is not 0.
'pop' returns false if the timeout is exceeded before an object is available
from the queue.
'pop' returns true if an object is available from the queue before the timeout
is exceeded.
If multiple threads have called 'pop' and are blocked because the queue is
empty, a single 'push' will make only one of the waiting threads ready.
Methods to push/pop from the queue
A 'semaHandle' property that can be used in a 'waitForMultipleObjects' call.
When the handle is signaled, the 'peek' method will retrieve the queued object.
}
interface
uses
Windows, Messages, SysUtils, Classes,syncObjs,contnrs;
type
pObject=^Tobject;
TsemaphoreMailbox=class(TobjectQueue)
private
countSema:Thandle;
protected
access:TcriticalSection;
public
property semaHandle:Thandle read countSema;
constructor create; virtual;
procedure push(aObject:Tobject); virtual;
function pop(pResObject:pObject;timeout:DWORD):boolean; virtual;
function peek(pResObject:pObject):boolean; virtual;
destructor destroy; override;
end;
implementation
{ TsemaphoreMailbox }
constructor TsemaphoreMailbox.create;
begin
{$IFDEF D2009}
inherited Create;
{$ELSE}
inherited create;
{$ENDIF}
access:=TcriticalSection.create;
countSema:=createSemaphore(nil,0,maxInt,nil);
end;
destructor TsemaphoreMailbox.destroy;
begin
access.free;
closeHandle(countSema);
inherited;
end;
function TsemaphoreMailbox.pop(pResObject: pObject;
timeout: DWORD): boolean;
// dequeues an object, if one is available on the queue. If the queue is empty,
// the caller is blocked until either an object is pushed on or the timeout
// period expires
begin // wait for a unit from the semaphore
result:=(WAIT_OBJECT_0=waitForSingleObject(countSema,timeout));
if result then // if a unit was supplied before the timeout,
begin
access.acquire;
try
pResObject^:=inherited pop; // get an object from the queue
finally
access.release;
end;
end;
end;
procedure TsemaphoreMailbox.push(aObject: Tobject);
// pushes an object onto the queue. If threads are waiting in a 'pop' call,
// one of them is made ready.
begin
access.acquire;
try
inherited push(aObject); // shove the object onto the queue
finally
access.release;
end;
releaseSemaphore(countSema,1,nil); // release one unit to semaphore
end;
function TsemaphoreMailbox.peek(pResObject: pObject): boolean;
begin
access.acquire;
try
result:=(count>0);
if result then pResObject^:=inherited pop; // get an object from the queue
finally
access.release;
end;
end;
end.
Je ne pense pas TThreadedQueue est censé aider les consommateurs multiples. Il est un FIFO, selon le fichier d'aide. Je suis sous l'impression qu'il ya un fil de poussée et un autre (un seul!) Popping.