Pregunta

I have a service which reads from a queue all the time and I want the service to be able to handle the latest item it have received before the application shutsdown. I did some research about 1-2month ago and found a way to do it but it did not work. What I mean with not working is that when the service is stoped it takes 99% of the cpu and never quits. So I tried to make a lock around the import function and a lock at the close function, which sets import to false and then continue. The same thing happend here so I added a variable for when it is in the import function and then did a loop while the variable is true. And needless to say it did not work.

public void StopImportSms()
    {
        EventLogger.Write("Waiting for lock to false", EventLogEntryType.Information);
        _import = false;
        while (_lock)
        {

        }
        EventLogger.Write("Import is disabled", EventLogEntryType.Information);

    }

.

private void ImportSms()
{
while (_import)
    {
        _lock = true;
        var messages = ReadMessages(consumer);

        if (!messages.Any())
        {
            _lock = false;
            continue;
        }
        //Db insert messages
        //Send ack to queue
        _lock = false;
        Thread.Sleep(5);
    }

.

    public void StartImportSms()
    {
        Task.Factory.StartNew(ImportSms);
    }
¿Fue útil?

Solución

This is a problem best addressed using events, rather than flags, as events can be waited on without using CPU time (as your current while loops do).

I assume that the second code snippet is running in a separate thread, you don't show it, so I'll represent that thread by _importThread (this Thread object will need to be accessible from the StopImportSms() method). You'll also need to declare a ManualResetEvent field:

ManualResetEvent _stopEvent = new ManualResetEvent(false);

Then your import loop becomes:

while (!_stopEvent.WaitOne(0))
{
    var messages = ReadMessages(consumer);

    // ... further processing here?
}

And StopImportSms() changes to:

public void StopImportSms()
{
    EventLogger.Write("Waiting for import to stop...", EventLogEntryType.Information);
    // Instruct the import thread to stop
    _stopEvent.Set();
    // Wait for the thread to complete
    _importThread.Join();
    EventLogger.Write("Import is disabled", EventLogEntryType.Information);
}

Edit:

Since you're using tasks for your import method, you might want to try the following instead:

Declare a CancellationTokenSource field in the class:

CancellationTokenSource _tokenSource = new CancellationTokenSource();

When creating your import task use the following (you will need to add a CancellationToken parameter to the method that implements the message import loop):

var token = _tokenSource.Token;
_importTask = Task.Factory.StartNew(() => ImportMethod(/* other parameters? */ token), token);

Then the method implementing the import task changes to:

private void ImportMethod(/* other parameters? */ CancellationToken token)
{
    while (!token.IsCancellationRequested)
    {
        var messages = ReadMessages(consumer);

        // ... further processing here?
    }
}

And finally, your StopImportSms() method becomes:

public void StopImportSms()
{
    EventLogger.Write("Waiting for import to stop...", EventLogEntryType.Information);
    // Instruct the import task to stop
    _tokenSource.Cancel();
    // Wait for the import task to complete
    _importTask.Wait(/* optionally, add a timeout in milliseconds here */);
    EventLogger.Write("Import is disabled", EventLogEntryType.Information);
}

It may be worth noting that since this doesn't use the CancellationToken.ThrowIfCancellationRequested() method, the task will indicate that it's run to completion (i.e. after the _importTask.Wait() returns, _importTask.Status will be TaskStatus.RanToCompletion, rather than TaskStatus.Canceled).

Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top