Многопоточное взаимодействие приложения с потоком регистратора

StackOverflow https://stackoverflow.com/questions/1613722

Вопрос

И вот я снова обращаюсь к вам с вопросами о многопоточности и упражнением в моем Параллельное программирование класс.

У меня есть многопоточный сервер, реализованный с использованием .NET Модель Асинхронного программирования - с помощью GET (Скачать) и PUT (загрузка) файловые сервисы.Эта часть сделана и протестирована.

Бывает, что в описании проблемы говорится, что этот сервер должен иметь ведение журнала активность с минимальным влиянием на время отклика сервера, и она должна поддерживаться низким приоритетом Нитки - поток регистратора - создан для этого эффекта.Все ведение журнала сообщения должны передаваться Ветки которые приводят их к этому поток регистратора, используя механизм связи , который может не заприте Нитки это вызывает его (помимо необходимой блокировки для обеспечения взаимного исключения) и предполагает, что некоторые протоколирование сообщений может быть проигнорирован.

Вот мое текущее решение, пожалуйста, помогите проверить, является ли это решением заявленной проблемы:

using System;
using System.IO;
using System.Threading;

// Multi-threaded Logger
public class Logger {
    // textwriter to use as logging output
    protected readonly TextWriter _output;
    // logger thread
    protected Thread _loggerThread;
    // logger thread wait timeout
    protected int _timeOut = 500; //500ms
    // amount of log requests attended
    protected volatile int reqNr = 0;
    // logging queue
    protected readonly object[] _queue;
    protected struct LogObj {
        public DateTime _start;
        public string _msg;
        public LogObj(string msg) {
            _start = DateTime.Now;
            _msg = msg;
        }
        public LogObj(DateTime start, string msg) {
            _start = start;
            _msg = msg;
        }
        public override string ToString() {
            return String.Format("{0}: {1}", _start, _msg);
        }
    }

    public Logger(int dimension,TextWriter output) {
        /// initialize queue with parameterized dimension
        this._queue = new object[dimension];
        // initialize logging output
        this._output = output;
        // initialize logger thread
        Start();
    }
    public Logger() {
        // initialize queue with 10 positions
        this._queue = new object[10];
        // initialize logging output to use console output
        this._output = Console.Out;
        // initialize logger thread
        Start();
    }

    public void Log(string msg) {
        lock (this) {
            for (int i = 0; i < _queue.Length; i++) {
                // seek for the first available position on queue
                if (_queue[i] == null) {
                    // insert pending log into queue position
                    _queue[i] = new LogObj(DateTime.Now, msg);
                    // notify logger thread for a pending log on the queue
                    Monitor.Pulse(this);
                    break;
                }
                // if there aren't any available positions on logging queue, this
                // log is not considered and the thread returns
            }
        }
    }

    public void GetLog() {
        lock (this) {
            while(true) {
                for (int i = 0; i < _queue.Length; i++) {
                    // seek all occupied positions on queue (those who have logs)
                    if (_queue[i] != null) {
                        // log
                        LogObj obj = (LogObj)_queue[i];
                        // makes this position available
                        _queue[i] = null;
                        // print log into output stream
                        _output.WriteLine(String.Format("[Thread #{0} | {1}ms] {2}",
                                                        Thread.CurrentThread.ManagedThreadId,
                                                        DateTime.Now.Subtract(obj._start).TotalMilliseconds,
                                                        obj.ToString()));
                    }
                }
                // after printing all pending log's (or if there aren't any pending log's),
                // the thread waits until another log arrives
                //Monitor.Wait(this, _timeOut);
                Monitor.Wait(this);
            }
        }
    }

    // Starts logger thread activity
    public void Start() {
        // Create the thread object, passing in the Logger.Start method
        // via a ThreadStart delegate. This does not start the thread.
        _loggerThread = new Thread(this.GetLog);
        _loggerThread.Priority = ThreadPriority.Lowest;
        _loggerThread.Start();
    }

    // Stops logger thread activity
    public void Stop() {
        _loggerThread.Abort();
        _loggerThread = null;
    }

    // Increments number of attended log requests
    public void IncReq() { reqNr++; }

}

В принципе, вот основные моменты этого кода:

  1. Запуск с низким приоритетом Нитки это зацикливает очередь ведения журнала и ожидающие печати журналы к выходу.После этого, в Нитки приостановлено до нового журнал регистрации прибывает;
  2. Когда приходит журнал, поток регистратора пробуждается и выполняет свою работу.

Является ли это решением потокобезопасный?Я тут читал Производители-потребители проблема и алгоритм решения, но в этой задаче, хотя у меня есть несколько производителей, у меня есть только один читатель.

Заранее благодарю за все ваше внимание.

Это было полезно?

Решение

Кажется, это должно сработать.Соотношение производителей и потребителей не должно сильно меняться в случае одного потребителя.Маленькие придирки:

  • получение блокировки может быть дорогостоящей операцией (как говорит @Vitaliy Lipchinsky).Я бы рекомендовал сравнить ваш регистратор с наивным регистратором "сквозной записи" и регистратором, использующим взаимосвязанные операции.Другой альтернативой было бы заменить существующую очередь пустой в GetLog и немедленно покидает критическую секцию.Таким образом, ни один из производителей не будет заблокирован длительными операциями у потребителей.

  • создайте ссылочный тип LogObj (класс).Нет смысла делать его структурированным, поскольку вы все равно его упаковываете.или же сделать _queue поле должно иметь тип LogObj[] (в любом случае, так лучше).

  • сделайте ваш поток фоновым, чтобы он не препятствовал закрытию вашей программы, если Stop не будет вызван.

  • Промойте свой TextWriter.В противном случае вы рискуете потерять даже те записи, которые успели поместиться в очередь (10 элементов немного маловаты, ИМХО)

  • Реализуйте IDisposable и / или finalizer.Вашему регистратору принадлежат thread и text writer, и они должны быть освобождены (и сброшены - см. Выше).

Другие советы

Привет. Быстро посмотрел, и, хотя он кажется поточно-ориентированным, я не думаю, что он особенно оптимален. Я бы предложил решение в этом направлении

ПРИМЕЧАНИЕ. просто прочитайте остальные ответы. Далее следует довольно оптимальное, оптимистичное решение для блокировки на основе вашего собственного. Основное отличие заключается в блокировке внутреннего класса, минимизации «критических секций» и обеспечении плавного завершения потока. Если вы хотите вообще избежать блокировки, попробуйте некоторые из этих изменчивых «неблокирующих» параметров. связанный список, как подсказывает @Vitaliy Lipchinsky.

using System.Collections.Generic;
using System.Linq;
using System.Threading;

...

public class Logger
{
    // BEST PRACTICE: private synchronization object. 
    // lock on _syncRoot - you should have one for each critical
    // section - to avoid locking on public 'this' instance
    private readonly object _syncRoot = new object ();

    // synchronization device for stopping our log thread.
    // initialized to unsignaled state - when set to signaled
    // we stop!
    private readonly AutoResetEvent _isStopping = 
        new AutoResetEvent (false);

    // use a Queue<>, cleaner and less error prone than
    // manipulating an array. btw, check your indexing
    // on your array queue, while starvation will not
    // occur in your full pass, ordering is not preserved
    private readonly Queue<LogObj> _queue = new Queue<LogObj>();

    ...

    public void Log (string message)
    {
        // you want to lock ONLY when absolutely necessary
        // which in this case is accessing the ONE resource
        // of _queue.
        lock (_syncRoot)
        {
            _queue.Enqueue (new LogObj (DateTime.Now, message));
        }
    }

    public void GetLog ()
    {
        // while not stopping
        // 
        // NOTE: _loggerThread is polling. to increase poll
        // interval, increase wait period. for a more event
        // driven approach, consider using another
        // AutoResetEvent at end of loop, and signal it
        // from Log() method above
        for (; !_isStopping.WaitOne(1); )
        {
            List<LogObj> logs = null;
            // again lock ONLY when you need to. because our log
            // operations may be time-intensive, we do not want
            // to block pessimistically. what we really want is 
            // to dequeue all available messages and release the
            // shared resource.
            lock (_syncRoot)
            {
                // copy messages for local scope processing!
                // 
                // NOTE: .Net3.5 extension method. if not available
                // logs = new List<LogObj> (_queue);
                logs = _queue.ToList ();
                // clear the queue for new messages
                _queue.Clear ();
                // release!
            }
            foreach (LogObj log in logs)
            {
                // do your thang
                ...
            }
        }
    }
}
...
public void Stop ()
{
    // graceful thread termination. give threads a chance!
    _isStopping.Set ();
    _loggerThread.Join (100);
    if (_loggerThread.IsAlive)
    {
        _loggerThread.Abort ();
    }
    _loggerThread = null;
}

На самом деле вы вводите здесь блокировку. У вас есть блокировка при отправке записи журнала в очередь (метод Log): если 10 потоков одновременно поместили 10 элементов в очередь и разбудили поток Logger, то 11-й поток будет ожидать, пока поток регистратора не зарегистрирует все элементы ...

Если вы хотите что-то действительно масштабируемое - используйте очередь без блокировки (пример ниже). С механизмом синхронизации очереди без блокировки будет очень просто (вы можете даже использовать один дескриптор ожидания для уведомлений).

Если вам не удастся найти реализацию очереди без блокировки в Интернете, вот идея, как это сделать: Используйте связанный список для реализации. Каждый узел в связанном списке содержит значение и переменную ссылку на следующий узел. поэтому для операций постановки и снятия вы можете использовать метод Interlocked.CompareExchange. Надеюсь, идея понятна. Если нет - дайте мне знать, и я предоставлю более подробную информацию.

Я просто провожу здесь мысленный эксперимент, поскольку у меня нет времени, чтобы на самом деле попробовать код прямо сейчас, но я думаю, что вы можете сделать это вообще без блокировок, если вы креативны.

Пусть ваш класс ведения журнала содержит метод, который выделяет очередь и семафор при каждом вызове (и другой, который освобождает очередь и семафор, когда поток завершен). Потоки, которые хотят вести регистрацию, будут вызывать этот метод при запуске. Когда они хотят войти, они помещают сообщение в свою очередь и устанавливают семафор. Поток регистратора имеет большой цикл, который проходит по очередям и проверяет связанные семафоры. Если семафор, связанный с очередью, больше нуля, то очередь отбрасывается, а семафор уменьшается.

Поскольку вы не пытаетесь вытолкнуть что-либо из очереди до тех пор, пока не установлен семафор, и вы не настроите семафор до тех пор, пока вы не добавите вещи в очередь, я думаю это будет безопасно. Согласно документации MSDN для класса очереди, если вы перечисляете очередь, а другой поток изменяет коллекцию, создается исключение. Ловите это исключение, и вы должны быть хорошими.

Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top