Многопоточное взаимодействие приложения с потоком регистратора
-
06-07-2019 - |
Вопрос
И вот я снова обращаюсь к вам с вопросами о многопоточности и упражнением в моем Параллельное программирование класс.
У меня есть многопоточный сервер, реализованный с использованием .NET Модель Асинхронного программирования - с помощью GET
(Скачать) и PUT
(загрузка) файловые сервисы.Эта часть сделана и протестирована.
Бывает, что в описании проблемы говорится, что этот сервер должен иметь ведение журнала активность с минимальным влиянием на время отклика сервера, и она должна поддерживаться низким приоритетом Нитки - поток регистратора - создан для этого эффекта.Все ведение журнала сообщения должны передаваться Ветки которые приводят их к этому поток регистратора, используя механизм связи , который может не заприте Нитки это вызывает его (помимо необходимой блокировки для обеспечения взаимного исключения) и предполагает, что некоторые протоколирование сообщений может быть проигнорирован.
Вот мое текущее решение, пожалуйста, помогите проверить, является ли это решением заявленной проблемы:
using System;
using System.IO;
using System.Threading;
// Multi-threaded Logger
public class Logger {
// textwriter to use as logging output
protected readonly TextWriter _output;
// logger thread
protected Thread _loggerThread;
// logger thread wait timeout
protected int _timeOut = 500; //500ms
// amount of log requests attended
protected volatile int reqNr = 0;
// logging queue
protected readonly object[] _queue;
protected struct LogObj {
public DateTime _start;
public string _msg;
public LogObj(string msg) {
_start = DateTime.Now;
_msg = msg;
}
public LogObj(DateTime start, string msg) {
_start = start;
_msg = msg;
}
public override string ToString() {
return String.Format("{0}: {1}", _start, _msg);
}
}
public Logger(int dimension,TextWriter output) {
/// initialize queue with parameterized dimension
this._queue = new object[dimension];
// initialize logging output
this._output = output;
// initialize logger thread
Start();
}
public Logger() {
// initialize queue with 10 positions
this._queue = new object[10];
// initialize logging output to use console output
this._output = Console.Out;
// initialize logger thread
Start();
}
public void Log(string msg) {
lock (this) {
for (int i = 0; i < _queue.Length; i++) {
// seek for the first available position on queue
if (_queue[i] == null) {
// insert pending log into queue position
_queue[i] = new LogObj(DateTime.Now, msg);
// notify logger thread for a pending log on the queue
Monitor.Pulse(this);
break;
}
// if there aren't any available positions on logging queue, this
// log is not considered and the thread returns
}
}
}
public void GetLog() {
lock (this) {
while(true) {
for (int i = 0; i < _queue.Length; i++) {
// seek all occupied positions on queue (those who have logs)
if (_queue[i] != null) {
// log
LogObj obj = (LogObj)_queue[i];
// makes this position available
_queue[i] = null;
// print log into output stream
_output.WriteLine(String.Format("[Thread #{0} | {1}ms] {2}",
Thread.CurrentThread.ManagedThreadId,
DateTime.Now.Subtract(obj._start).TotalMilliseconds,
obj.ToString()));
}
}
// after printing all pending log's (or if there aren't any pending log's),
// the thread waits until another log arrives
//Monitor.Wait(this, _timeOut);
Monitor.Wait(this);
}
}
}
// Starts logger thread activity
public void Start() {
// Create the thread object, passing in the Logger.Start method
// via a ThreadStart delegate. This does not start the thread.
_loggerThread = new Thread(this.GetLog);
_loggerThread.Priority = ThreadPriority.Lowest;
_loggerThread.Start();
}
// Stops logger thread activity
public void Stop() {
_loggerThread.Abort();
_loggerThread = null;
}
// Increments number of attended log requests
public void IncReq() { reqNr++; }
}
В принципе, вот основные моменты этого кода:
- Запуск с низким приоритетом Нитки это зацикливает очередь ведения журнала и ожидающие печати журналы к выходу.После этого, в Нитки приостановлено до нового журнал регистрации прибывает;
- Когда приходит журнал, поток регистратора пробуждается и выполняет свою работу.
Является ли это решением потокобезопасный?Я тут читал Производители-потребители проблема и алгоритм решения, но в этой задаче, хотя у меня есть несколько производителей, у меня есть только один читатель.
Заранее благодарю за все ваше внимание.
Решение
Кажется, это должно сработать.Соотношение производителей и потребителей не должно сильно меняться в случае одного потребителя.Маленькие придирки:
получение блокировки может быть дорогостоящей операцией (как говорит @Vitaliy Lipchinsky).Я бы рекомендовал сравнить ваш регистратор с наивным регистратором "сквозной записи" и регистратором, использующим взаимосвязанные операции.Другой альтернативой было бы заменить существующую очередь пустой в
GetLog
и немедленно покидает критическую секцию.Таким образом, ни один из производителей не будет заблокирован длительными операциями у потребителей.создайте ссылочный тип LogObj (класс).Нет смысла делать его структурированным, поскольку вы все равно его упаковываете.или же сделать
_queue
поле должно иметь типLogObj[]
(в любом случае, так лучше).сделайте ваш поток фоновым, чтобы он не препятствовал закрытию вашей программы, если
Stop
не будет вызван.Промойте свой
TextWriter
.В противном случае вы рискуете потерять даже те записи, которые успели поместиться в очередь (10 элементов немного маловаты, ИМХО)Реализуйте IDisposable и / или finalizer.Вашему регистратору принадлежат thread и text writer, и они должны быть освобождены (и сброшены - см. Выше).
Другие советы
Привет. Быстро посмотрел, и, хотя он кажется поточно-ориентированным, я не думаю, что он особенно оптимален. Я бы предложил решение в этом направлении
ПРИМЕЧАНИЕ. просто прочитайте остальные ответы. Далее следует довольно оптимальное, оптимистичное решение для блокировки на основе вашего собственного. Основное отличие заключается в блокировке внутреннего класса, минимизации «критических секций» и обеспечении плавного завершения потока. Если вы хотите вообще избежать блокировки, попробуйте некоторые из этих изменчивых «неблокирующих» параметров. связанный список, как подсказывает @Vitaliy Lipchinsky.
using System.Collections.Generic;
using System.Linq;
using System.Threading;
...
public class Logger
{
// BEST PRACTICE: private synchronization object.
// lock on _syncRoot - you should have one for each critical
// section - to avoid locking on public 'this' instance
private readonly object _syncRoot = new object ();
// synchronization device for stopping our log thread.
// initialized to unsignaled state - when set to signaled
// we stop!
private readonly AutoResetEvent _isStopping =
new AutoResetEvent (false);
// use a Queue<>, cleaner and less error prone than
// manipulating an array. btw, check your indexing
// on your array queue, while starvation will not
// occur in your full pass, ordering is not preserved
private readonly Queue<LogObj> _queue = new Queue<LogObj>();
...
public void Log (string message)
{
// you want to lock ONLY when absolutely necessary
// which in this case is accessing the ONE resource
// of _queue.
lock (_syncRoot)
{
_queue.Enqueue (new LogObj (DateTime.Now, message));
}
}
public void GetLog ()
{
// while not stopping
//
// NOTE: _loggerThread is polling. to increase poll
// interval, increase wait period. for a more event
// driven approach, consider using another
// AutoResetEvent at end of loop, and signal it
// from Log() method above
for (; !_isStopping.WaitOne(1); )
{
List<LogObj> logs = null;
// again lock ONLY when you need to. because our log
// operations may be time-intensive, we do not want
// to block pessimistically. what we really want is
// to dequeue all available messages and release the
// shared resource.
lock (_syncRoot)
{
// copy messages for local scope processing!
//
// NOTE: .Net3.5 extension method. if not available
// logs = new List<LogObj> (_queue);
logs = _queue.ToList ();
// clear the queue for new messages
_queue.Clear ();
// release!
}
foreach (LogObj log in logs)
{
// do your thang
...
}
}
}
}
...
public void Stop ()
{
// graceful thread termination. give threads a chance!
_isStopping.Set ();
_loggerThread.Join (100);
if (_loggerThread.IsAlive)
{
_loggerThread.Abort ();
}
_loggerThread = null;
}
На самом деле вы вводите здесь блокировку. У вас есть блокировка при отправке записи журнала в очередь (метод Log): если 10 потоков одновременно поместили 10 элементов в очередь и разбудили поток Logger, то 11-й поток будет ожидать, пока поток регистратора не зарегистрирует все элементы ...
Если вы хотите что-то действительно масштабируемое - используйте очередь без блокировки (пример ниже). С механизмом синхронизации очереди без блокировки будет очень просто (вы можете даже использовать один дескриптор ожидания для уведомлений).
Если вам не удастся найти реализацию очереди без блокировки в Интернете, вот идея, как это сделать: Используйте связанный список для реализации. Каждый узел в связанном списке содержит значение и переменную ссылку на следующий узел. поэтому для операций постановки и снятия вы можете использовать метод Interlocked.CompareExchange. Надеюсь, идея понятна. Если нет - дайте мне знать, и я предоставлю более подробную информацию.
Я просто провожу здесь мысленный эксперимент, поскольку у меня нет времени, чтобы на самом деле попробовать код прямо сейчас, но я думаю, что вы можете сделать это вообще без блокировок, если вы креативны.
Пусть ваш класс ведения журнала содержит метод, который выделяет очередь и семафор при каждом вызове (и другой, который освобождает очередь и семафор, когда поток завершен). Потоки, которые хотят вести регистрацию, будут вызывать этот метод при запуске. Когда они хотят войти, они помещают сообщение в свою очередь и устанавливают семафор. Поток регистратора имеет большой цикл, который проходит по очередям и проверяет связанные семафоры. Если семафор, связанный с очередью, больше нуля, то очередь отбрасывается, а семафор уменьшается.
Поскольку вы не пытаетесь вытолкнуть что-либо из очереди до тех пор, пока не установлен семафор, и вы не настроите семафор до тех пор, пока вы не добавите вещи в очередь, я думаю это будет безопасно. Согласно документации MSDN для класса очереди, если вы перечисляете очередь, а другой поток изменяет коллекцию, создается исключение. Ловите это исключение, и вы должны быть хорошими.