I need to terminate my running thread in a way that doesn’t cause an error or delay. The problem lies in the function “lMsg := lMsgQueue.Get(FQueueGetTimeout);”, where it will wait for the defined time (usually 5000 ms). Thus, if I need to call an external terminate, my application will be stuck waiting for the termination.
What would be the best way to terminate it in the middle of the process?
{ TConsumerThread }
constructor TConsumerThread.Create;
begin
FreeOnTerminate := True;
InitializeVars;
inherited Create(True);
end;
procedure TConsumerThread.Execute;
var
lMsgQueue: TAMQPMessageQueue;
lMsg: TAMQPMessage;
lStartTime: TDateTime;
begin
lMsgQueue := TAMQPMessageQueue.Create;
FChannelAMQPThread := FConnectionAMQP.OpenChannel(FQueuePrefetchSize, FQueuePrefetchCount);
try
try
FChannelAMQPThread.BasicConsume(lMsgQueue, FQueue, 'Consumer');
lStartTime := Now;
repeat
try
try
if not(FConnectionAMQP.IsOpen) then
BEGIN
FConnectionAMQP.Connect;
FChannelAMQPThread := FConnectionAMQP.OpenChannel(FQueuePrefetchSize, FQueuePrefetchCount);
FChannelAMQPThread.BasicConsume(lMsgQueue, FQueue, 'Consumer';
END;
except
on E: Exception do
Break;
end;
lMsg := lMsgQueue.Get(FQueueGetTimeout);
if (lMsg = nil) and not(Terminated) then
begin
if Assigned(FChannelAMQPThread) then
begin
FConnectionAMQP.CloseChannel(FChannelAMQPThread);
FChannelAMQPThread := nil;
end;
FChannelAMQPThread := FConnectionAMQP.OpenChannel(FQueuePrefetchSize, FQueuePrefetchCount);
FChannelAMQPThread.BasicConsume(lMsgQueue, FQueue, 'Consumer');
end;
if not(Terminated) then
begin
try
if not(FConnectionAMQP.IsOpen) then
FConnectionAMQP.Connect;
except
on E: Exception do
Break;
end;
end;
if not(Terminated) then
begin
if ValidateFilter(lMsg) then
begin
FCorrelationID := lMsg.Header.PropertyList.CorrelationID.Value;
FReceivedMessage := lMsg.Body.asString[TEncoding.ASCII];
lMsg.Ack;
lMsg.Free;
Terminate;
end
else
begin
lMsg.Reject;
lMsg.Free;
if not(FTimeout = INFINITE) then
begin
if (MilliSecondsBetween(Now, lStartTime) >= (Int64(FTimeout))) then
begin
FReceivedMessage := '';
Terminate;
end;
end;
end;
end
else
begin
Terminate;
end;
except
on E: Exception do
begin
if Assigned(lMsg) then
begin
lMsg.Free;
lMsg := nil;
end;
end;
end;
until (Terminated);
except
on E: Exception do
begin
FReceivedMessage := '';
if not(Terminated) then
Terminate;
end;
end;
finally
lMsgQueue.Free;
end;
end;
procedure TConsumerThread.TerminatedSet;
begin
inherited;
if Assigned(FChannelAMQPThread) then
begin
try
if FConnectionAMQP.IsOpen then
FConnectionAMQP.CloseChannel(FChannelAMQPThread);
except
on E: Exception do
end;
FChannelAMQPThread := nil;
end;
end;
function TConsumerThread.ValidateFilter(pMsg: TAMQPMessage): Boolean;
begin
Result := False;
case FMsgFilter of
fmsgNone:
Result := True;
fmsgMessageID:
Result := (pMsg.Header.PropertyList.MessageID.Value = FFilterValue);
fmsgCorrelationID:
Result := (pMsg.Header.PropertyList.CorrelationID.Value = FFilterValue);
end;
end;
procedure TConsumerThread.InitializeVars;
begin
FConnectionAMQP := nil;
FChannelAMQPThread := nil;
FQueue := '';
FTimeout := INFINITE;
FQueueGetTimeout := 5000;
FQueuePrefetchSize := 0;
FQueuePrefetchCount := 10;
FMsgFilter := fmsgNone;
FFilterValue := '';
FReceivedMessage := '';
end;
To check the returned message or exception, I am using a function in OnTerminate.
Also, making it “FreeOnTerminate” in this case is the best alternative?
I start it suspended because I set the properties (initialized in InitializeVars) before the start.
This code is from the “Get” function, I didn’t write it, but I can edit it if necessary.
{$I AMQP.Options.inc}
unit AMQP.Classes;
interface
Uses
SysUtils, Classes, SyncObjs, Generics.Collections,
AMQP.Frame, AMQP.Message, AMQP.Method, AMQP.Types
{$IfDef fpc}
, AMQP.SyncObjs
{$EndIf}
;
Type
AMQPException = Class(Exception);
AMQPTimeout = class(AMQPException);
TAMQPServerProperties = Class
Strict Private
FCapabilities : TStringList;
FMechanisms : TStringList;
FLocales : TStringList;
FClusterName : String;
FCopyright : String;
FInformation : String;
FPlatform : String;
FProduct : String;
FVersion : String;
FKnownHosts : String;
FVersionMajor : Integer;
FVersionMinor : Integer;
FChannelMax : Integer;
FFrameMax : Integer;
FHeartbeat : Integer;
Public
Property Capabilities : TStringList read FCapabilities;
Property Mechanisms : TStringList read FMechanisms;
Property Locales : TStringList read FLocales;
Property ClusterName : String read FClusterName;
Property Copyright : String read FCopyright;
Property Information : String read FInformation;
Property &Platform : String read FPlatform;
Property Product : String read FProduct;
Property Version : String read FVersion;
Property KnownHosts : String read FKnownHosts;
Property ProtocolVersionMajor : Integer read FVersionMajor;
Property ProtocolVersionMinor : Integer read FVersionMinor;
Property ChannelMax : Integer read FChannelMax;
Property FrameMax : Integer read FFrameMax;
Property Heartbeat : Integer read FHeartbeat;
Procedure ReadConnectionStart( AConnectionStart: TAMQPMethod );
Procedure ReadConnectionTune( AConnectionTune: TAMQPMethod );
Procedure ReadConnectionOpenOK( AConnectionOpenOK: TAMQPMethod );
Constructor Create;
Destructor Destroy; Override;
End;
TBlockingQueue<T> = Class
Strict Protected
FGuard : {$IFDEF FPC}TRTLCriticalSection{$ELSE}TCriticalSection{$ENDIF};
FCondition : TConditionVariableCS;
FQueue : TQueue<T>;
Public
Function Count: Integer; Virtual;
Function Get(ATimeOut: LongWord): T; Virtual;
Procedure Put( Item: T ); Virtual;
Constructor Create; Virtual;
Destructor Destroy; Override;
End;
TAMQPQueue = TBlockingQueue<TAMQPFrame>;
TAMQPMessageQueue = TBlockingQueue<TAMQPMessage>;
implementation
{ TAMQPServerProperties }
constructor TAMQPServerProperties.Create;
begin
FCapabilities := TStringList.Create;
FMechanisms := TStringList.Create;
FLocales := TStringList.Create;
FMechanisms.StrictDelimiter := True;
FMechanisms.Delimiter := ' ';
FLocales.StrictDelimiter := True;
FLocales.Delimiter := ' ';
FClusterName := '';
FCopyright := '';
FInformation := '';
FPlatform := '';
FProduct := '';
FVersion := '';
FKnownHosts := '';
FVersionMajor := 0;
FVersionMinor := 0;
FChannelMax := 0;
FFrameMax := 0;
FHeartbeat := 0;
end;
Procedure TAMQPServerProperties.ReadConnectionStart( AConnectionStart: TAMQPMethod );
var
ServerProperties: TFieldTable;
ServerCapabilities: TFieldTable;
Pair: TFieldValuePair;
begin
FMechanisms.DelimitedText := AConnectionStart.Field['mechanisms'].AsLongString.Value;
FLocales.DelimitedText := AConnectionStart.Field['locales'].AsLongString.Value;
ServerProperties := AConnectionStart.Field['server-properties'].AsFieldTable;
FVersionMajor := AConnectionStart.Field['version-major'].AsShortShortUInt.Value;
FVersionMinor := AConnectionStart.Field['version-minor'].AsShortShortUInt.Value;
FClusterName := ServerProperties.Field['cluster_name'].AsShortString.Value;
FCopyright := ServerProperties.Field['copyright'].AsShortString.Value;
FInformation := ServerProperties.Field['information'].AsShortString.Value;
FPlatform := ServerProperties.Field['platform'].AsShortString.Value;
FProduct := ServerProperties.Field['product'].AsShortString.Value;
FVersion := ServerProperties.Field['version'].AsShortString.Value;
ServerCapabilities := ServerProperties.Field['capabilities'].AsFieldTable;
for Pair in ServerCapabilities do
FCapabilities.Values[ Pair.Name.Value ] := Pair.Value.AsString('');
end;
Procedure TAMQPServerProperties.ReadConnectionTune( AConnectionTune: TAMQPMethod );
begin
FChannelMax := AConnectionTune.Field['channel-max'].AsShortUInt.Value;
FFrameMax := AConnectionTune.Field['frame-max'].AsLongUInt.Value;
FHeartbeat := AConnectionTune.Field['heartbeat'].AsShortUInt.Value;
end;
Procedure TAMQPServerProperties.ReadConnectionOpenOK( AConnectionOpenOK: TAMQPMethod );
begin
FKnownHosts := AConnectionOpenOK.Field['known-hosts'].AsShortString.Value;
end;
destructor TAMQPServerProperties.Destroy;
begin
FCapabilities.Free;
FMechanisms.Free;
FLocales.Free;
inherited;
end;
{ TBlockingQueue<T> }
function TBlockingQueue<T>.Count: Integer;
begin
{$IFDEF FPC}
EnterCriticalSection(FGuard);
{$ELSE}
FGuard.Acquire;
{$ENDIF}
try
Result := FQueue.Count;
finally
{$IFDEF FPC}
LeaveCriticalSection(FGuard);
{$ELSE}
FGuard.Release;
{$ENDIF}
end;
end;
constructor TBlockingQueue<T>.Create;
begin
inherited;
{$IFDEF FPC}
InitCriticalSection(FGuard);
{$ELSE}
FGuard := TCriticalSection.Create;
{$ENDIF}
FCondition := TConditionVariableCS.Create;
FQueue := TQueue<T>.Create;
end;
destructor TBlockingQueue<T>.Destroy;
begin
FQueue.Free;
FQueue := nil;
FCondition.Free;
FCondition := nil;
{$IFDEF FPC}
DoneCriticalSection(FGuard);
{$ELSE}
FGuard.Free;
FGuard := nil;
{$ENDIF}
inherited;
end;
function TBlockingQueue<T>.Get(ATimeOut: LongWord): T;
begin
{$IFDEF FPC}
EnterCriticalSection(FGuard);
{$ELSE}
FGuard.Acquire;
{$ENDIF}
try
while FQueue.Count = 0 do
begin
{$IFDEF FPC}
if FCondition.WaitForRTL(FGuard, ATimeOut) = wrTimeout then
{$Else}
if FCondition.WaitFor(FGuard, ATimeOut) = wrTimeout then
{$EndIf}
raise AMQPTimeout.Create('Timeout!');
end;
Result := FQueue.Dequeue
finally
{$IFDEF FPC}
LeaveCriticalSection(FGuard);
{$ELSE}
FGuard.Release;
{$ENDIF}
end;
end;
procedure TBlockingQueue<T>.Put(Item: T);
begin
{$IFDEF FPC}
EnterCriticalSection(FGuard);
{$ELSE}
FGuard.Acquire;
{$ENDIF}
try
FQueue.Enqueue( Item );
FCondition.ReleaseAll;
finally
{$IFDEF FPC}
LeaveCriticalSection(FGuard);
{$ELSE}
FGuard.Release;
{$ENDIF}
end;
end;
end.