Pregunta

Aquí nuevamente estoy con preguntas sobre subprocesos múltiples y un ejercicio de mi Programación concurrente

Tengo un servidor de subprocesos múltiples - implementado usando .NET Modelo de programación asíncrono - con GET ( descargar ) y PUT ( upload ) servicios de archivos. Esta parte está hecha y probada.

Sucede que la declaración del problema dice que este servidor debe tener actividad logging con el mínimo impacto en el tiempo de respuesta del servidor, y debe estar respaldado por un subproceso de baja prioridad - subproceso del registrador : creado para este efecto. Todos los mensajes de logging serán pasados ??por los hilos que los producen a este hilo del registrador , utilizando un mecanismo de comunicación que no puede bloquear el subproceso que lo invoca (además del bloqueo necesario para asegurar la exclusión mutua) y suponiendo que algunos mensajes de registro pueden ignorarse.

Aquí está mi solución actual, por favor ayude a validar si esto es una solución al problema planteado:

using System;
using System.IO;
using System.Threading;

// Multi-threaded Logger
public class Logger {
    // textwriter to use as logging output
    protected readonly TextWriter _output;
    // logger thread
    protected Thread _loggerThread;
    // logger thread wait timeout
    protected int _timeOut = 500; //500ms
    // amount of log requests attended
    protected volatile int reqNr = 0;
    // logging queue
    protected readonly object[] _queue;
    protected struct LogObj {
        public DateTime _start;
        public string _msg;
        public LogObj(string msg) {
            _start = DateTime.Now;
            _msg = msg;
        }
        public LogObj(DateTime start, string msg) {
            _start = start;
            _msg = msg;
        }
        public override string ToString() {
            return String.Format("{0}: {1}", _start, _msg);
        }
    }

    public Logger(int dimension,TextWriter output) {
        /// initialize queue with parameterized dimension
        this._queue = new object[dimension];
        // initialize logging output
        this._output = output;
        // initialize logger thread
        Start();
    }
    public Logger() {
        // initialize queue with 10 positions
        this._queue = new object[10];
        // initialize logging output to use console output
        this._output = Console.Out;
        // initialize logger thread
        Start();
    }

    public void Log(string msg) {
        lock (this) {
            for (int i = 0; i < _queue.Length; i++) {
                // seek for the first available position on queue
                if (_queue[i] == null) {
                    // insert pending log into queue position
                    _queue[i] = new LogObj(DateTime.Now, msg);
                    // notify logger thread for a pending log on the queue
                    Monitor.Pulse(this);
                    break;
                }
                // if there aren't any available positions on logging queue, this
                // log is not considered and the thread returns
            }
        }
    }

    public void GetLog() {
        lock (this) {
            while(true) {
                for (int i = 0; i < _queue.Length; i++) {
                    // seek all occupied positions on queue (those who have logs)
                    if (_queue[i] != null) {
                        // log
                        LogObj obj = (LogObj)_queue[i];
                        // makes this position available
                        _queue[i] = null;
                        // print log into output stream
                        _output.WriteLine(String.Format("[Thread #{0} | {1}ms] {2}",
                                                        Thread.CurrentThread.ManagedThreadId,
                                                        DateTime.Now.Subtract(obj._start).TotalMilliseconds,
                                                        obj.ToString()));
                    }
                }
                // after printing all pending log's (or if there aren't any pending log's),
                // the thread waits until another log arrives
                //Monitor.Wait(this, _timeOut);
                Monitor.Wait(this);
            }
        }
    }

    // Starts logger thread activity
    public void Start() {
        // Create the thread object, passing in the Logger.Start method
        // via a ThreadStart delegate. This does not start the thread.
        _loggerThread = new Thread(this.GetLog);
        _loggerThread.Priority = ThreadPriority.Lowest;
        _loggerThread.Start();
    }

    // Stops logger thread activity
    public void Stop() {
        _loggerThread.Abort();
        _loggerThread = null;
    }

    // Increments number of attended log requests
    public void IncReq() { reqNr++; }

}

Básicamente, aquí están los puntos principales de este código:

  1. Inicie una cadena de baja prioridad que recorra la cola de registro e imprima registros pendientes en la salida. Después de esto, el tema se suspende hasta que llegue el nuevo log ;
  2. Cuando llega un registro, el hilo del registrador se despierta y funciona.

¿Esta solución es segura para subprocesos ? He estado leyendo el algoritmo de solución y problema Productores-Consumidores , pero en este problema, aunque tengo varios productores, solo tengo un lector.

Gracias de antemano por toda su atención.

¿Fue útil?

Solución

Parece que debería estar funcionando. Productores-Los consumidores no deberían cambiar mucho en el caso de un solo consumidor. Pequeños nitpicks:

  • adquirir un bloqueo puede ser una operación costosa (como dice @Vitaliy Lipchinsky). Recomendaría comparar su registrador con el ingenuo registrador y el registrador de "escritura directa" utilizando operaciones entrelazadas. Otra alternativa sería intercambiar la cola existente con una vacía en GetLog y dejar la sección crítica inmediatamente. De esta manera, ninguno de los productores será bloqueado por las operaciones largas en los consumidores.

  • hace LogObj tipo de referencia (clase). No tiene sentido hacerlo estructurado, ya que de todos modos lo estás boxeando. o bien haga que el campo _queue sea del tipo LogObj [] (eso es mejor de todos modos).

  • cree el fondo de su hilo para que no impida cerrar su programa si no se llama a Stop .

  • Limpie su TextWriter . O bien, está arriesgando a perder incluso los registros que lograron encajar en la cola (10 elementos es un poco IMHO)

  • Implementar IDisposable y / o finalizador. Su registrador es propietario de hilo y escritor de texto, y estos deben ser liberados (y vaciados, ver más arriba).

Otros consejos

Hola allí. Eché un vistazo rápido y, si bien parece ser seguro para los subprocesos, no creo que sea particularmente óptimo. Sugeriría una solución en este sentido.

NOTA: acaba de leer las otras respuestas. Lo que sigue es una solución de bloqueo optimista bastante óptima basada en la suya. Las principales diferencias son el bloqueo en una clase interna, minimizando las 'secciones críticas' y proporcionando una terminación de hilo elegante. Si desea evitar el bloqueo por completo, puede probar algo de ese " no-bloqueo " cosas de la lista enlazada como sugiere @Vitaliy Lipchinsky.

using System.Collections.Generic;
using System.Linq;
using System.Threading;

...

public class Logger
{
    // BEST PRACTICE: private synchronization object. 
    // lock on _syncRoot - you should have one for each critical
    // section - to avoid locking on public 'this' instance
    private readonly object _syncRoot = new object ();

    // synchronization device for stopping our log thread.
    // initialized to unsignaled state - when set to signaled
    // we stop!
    private readonly AutoResetEvent _isStopping = 
        new AutoResetEvent (false);

    // use a Queue<>, cleaner and less error prone than
    // manipulating an array. btw, check your indexing
    // on your array queue, while starvation will not
    // occur in your full pass, ordering is not preserved
    private readonly Queue<LogObj> _queue = new Queue<LogObj>();

    ...

    public void Log (string message)
    {
        // you want to lock ONLY when absolutely necessary
        // which in this case is accessing the ONE resource
        // of _queue.
        lock (_syncRoot)
        {
            _queue.Enqueue (new LogObj (DateTime.Now, message));
        }
    }

    public void GetLog ()
    {
        // while not stopping
        // 
        // NOTE: _loggerThread is polling. to increase poll
        // interval, increase wait period. for a more event
        // driven approach, consider using another
        // AutoResetEvent at end of loop, and signal it
        // from Log() method above
        for (; !_isStopping.WaitOne(1); )
        {
            List<LogObj> logs = null;
            // again lock ONLY when you need to. because our log
            // operations may be time-intensive, we do not want
            // to block pessimistically. what we really want is 
            // to dequeue all available messages and release the
            // shared resource.
            lock (_syncRoot)
            {
                // copy messages for local scope processing!
                // 
                // NOTE: .Net3.5 extension method. if not available
                // logs = new List<LogObj> (_queue);
                logs = _queue.ToList ();
                // clear the queue for new messages
                _queue.Clear ();
                // release!
            }
            foreach (LogObj log in logs)
            {
                // do your thang
                ...
            }
        }
    }
}
...
public void Stop ()
{
    // graceful thread termination. give threads a chance!
    _isStopping.Set ();
    _loggerThread.Join (100);
    if (_loggerThread.IsAlive)
    {
        _loggerThread.Abort ();
    }
    _loggerThread = null;
}

En realidad, usted está introduciendo bloqueo aquí. Se ha bloqueado al empujar una entrada de registro a la cola (Método de registro): si 10 subprocesos pusieron simultáneamente 10 elementos en la cola y despertaron el subproceso del registrador, el subproceso 11 esperará hasta que el hilo del registrador registre todos los elementos ...

Si desea algo realmente escalable: implemente una cola sin bloqueo (el ejemplo se muestra a continuación). Con el mecanismo de sincronización de colas sin bloqueo será muy sencillo (incluso puede usar el controlador de espera único para las notificaciones).

Si no logras encontrar una implementación de cola sin bloqueo en la web, aquí tienes una idea de cómo hacerlo: Utilice la lista enlazada para una implementación. Cada nodo en la lista enlazada contiene un valor y una referencia volátil al siguiente nodo. por lo tanto, para operaciones en cola y en cola, puede usar el método Interlocked.CompareExchange. Espero que la idea sea clara. Si no, hágamelo saber y le daré más detalles.

Estoy haciendo un experimento mental aquí, ya que no tengo tiempo para probar el código en este momento, pero creo que puedes hacerlo sin ningún tipo de bloqueo si eres creativo.

Haga que su clase de registro contenga un método que asigne una cola y un semáforo cada vez que se llame (y otro que desasigne la cola y el semáforo cuando el hilo esté terminado). Los subprocesos que desean hacer el registro llamarán a este método cuando se inicien. Cuando quieren iniciar sesión, empujan el mensaje en su propia cola y establecen el semáforo. El hilo del registrador tiene un gran bucle que se ejecuta a través de las colas y comprueba los semáforos asociados. Si el semáforo asociado con la cola es mayor que cero, entonces la cola se desprende y el semáforo disminuye.

Debido a que no está intentando sacar cosas de la cola hasta que se haya establecido el semáforo, y no está configurando el semáforo hasta que haya puesto las cosas en la cola, pienso esto será seguro De acuerdo con la documentación de MSDN para la clase de cola, si está enumerando la cola y otro hilo modifica la colección, se lanza una excepción. Captura esa excepción y deberías ser bueno.

Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top