Pregunta

Creé una nueva clase llamada Actor que procesa los mensajes que se le pasan.El problema con el que me encuentro es descubrir cuál es la forma más elegante de pasar mensajes relacionados pero diferentes al Actor.Mi primera idea es utilizar la herencia, pero parece muy inflada, pero son fuertemente tipos, lo cual es un requisito definitivo.

¿Tienes alguna idea?

Ejemplo

private abstract class QueueMessage { }

private class ClearMessage : QueueMessage 
{
    public static readonly ClearMessage Instance = new ClearMessage();

    private ClearMessage() { }
}

private class TryDequeueMessage : QueueMessage 
{
    public static readonly TryDequeueMessage Instance = new TryDequeueMessage();

    private TryDequeueMessage() { }
}

private class EnqueueMessage : QueueMessage 
{
    public TValue Item { get; private set; }

    private EnqueueMessage(TValue item)
    {
        Item = item;
    }
}

Clase de actor

/// <summary>Represents a callback method to be executed by an Actor.</summary>
/// <typeparam name="TReply">The type of reply.</typeparam>
/// <param name="reply">The reply made by the actor.</param>
public delegate void ActorReplyCallback<TReply>(TReply reply);

/// <summary>Represents an Actor which receives and processes messages in concurrent applications.</summary>
/// <typeparam name="TMessage">The type of message this actor accepts.</typeparam>
/// <typeparam name="TReply">The type of reply made by this actor.</typeparam>
public abstract class Actor<TMessage, TReply> : IDisposable
{
    /// <summary>The default total number of threads to process messages.</summary>
    private const Int32 DefaultThreadCount = 1;


    /// <summary>Used to serialize access to the message queue.</summary>
    private readonly Locker Locker;

    /// <summary>Stores the messages until they can be processed.</summary>
    private readonly System.Collections.Generic.Queue<Message> MessageQueue;

    /// <summary>Signals the actor thread to process a new message.</summary>
    private readonly ManualResetEvent PostEvent;

    /// <summary>This tells the actor thread to stop reading from the queue.</summary>
    private readonly ManualResetEvent DisposeEvent;

    /// <summary>Processes the messages posted to the actor.</summary>
    private readonly List<Thread> ActorThreads;


    /// <summary>Initializes a new instance of the Genex.Concurrency&lt;TRequest, TResponse&gt; class.</summary>
    public Actor() : this(DefaultThreadCount) { }

    /// <summary>Initializes a new instance of the Genex.Concurrency&lt;TRequest, TResponse&gt; class.</summary>
    /// <param name="thread_count"></param>
    public Actor(Int32 thread_count)
    {
        if (thread_count < 1) throw new ArgumentOutOfRangeException("thread_count", thread_count, "Must be 1 or greater.");

        Locker = new Locker();
        MessageQueue = new System.Collections.Generic.Queue<Message>();
        EnqueueEvent = new ManualResetEvent(true);
        PostEvent = new ManualResetEvent(false);
        DisposeEvent = new ManualResetEvent(true);
        ActorThreads = new List<Thread>();

        for (Int32 i = 0; i < thread_count; i++)
        {
            var thread = new Thread(ProcessMessages);
            thread.IsBackground = true;
            thread.Start();
            ActorThreads.Add(thread);
        }
    }


    /// <summary>Posts a message and waits for the reply.</summary>
    /// <param name="value">The message to post to the actor.</param>
    /// <returns>The reply from the actor.</returns>
    public TReply PostWithReply(TMessage message)
    {
        using (var wrapper = new Message(message))
        {
            lock (Locker) MessageQueue.Enqueue(wrapper);
            PostEvent.Set();
            wrapper.Channel.CompleteEvent.WaitOne();
            return wrapper.Channel.Value;
        }
    }

    /// <summary>Posts a message to the actor and executes the callback when the reply is received.</summary>
    /// <param name="value">The message to post to the actor.</param>
    /// <param name="callback">The callback that will be invoked once the replay is received.</param>
    public void PostWithAsyncReply(TMessage value, ActorReplyCallback<TReply> callback)
    {
        if (callback == null) throw new ArgumentNullException("callback");
        ThreadPool.QueueUserWorkItem(state => callback(PostWithReply(value)));
    }

    /// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
    public void Dispose()
    {
        if (DisposeEvent.WaitOne(10))
        {
            DisposeEvent.Reset();
            PostEvent.Set();

            foreach (var thread in ActorThreads)
            {
                thread.Join();
            }

            ((IDisposable)PostEvent).Dispose();
            ((IDisposable)DisposeEvent).Dispose();
        }
    }

    /// <summary>Processes a message posted to the actor.</summary>
    /// <param name="message">The message to be processed.</param>
    protected abstract void ProcessMessage(Message message);

    /// <summary>Dequeues the messages passes them to ProcessMessage.</summary>
    private void ProcessMessages()
    {
        while (PostEvent.WaitOne() && DisposeEvent.WaitOne(10))
        {
            var message = (Message)null;

            while (true)
            {
                lock (Locker)
                {
                    message = MessageQueue.Count > 0 ?
                        MessageQueue.Dequeue() :
                        null;

                    if (message == null)
                    {
                        PostEvent.Reset();
                        break;
                    }
                }

                try
                {
                    ProcessMessage(message);
                }
                catch
                {

                }
            }
        }
    }


    /// <summary>Represents a message that is passed to an actor.</summary>
    protected class Message : IDisposable
    {
        /// <summary>The actual value of this message.</summary>
        public TMessage Value { get; private set; }

        /// <summary>The channel used to give a reply to this message.</summary>
        public Channel Channel { get; private set; }


        /// <summary>Initializes a new instance of Genex.Concurrency.Message class.</summary>
        /// <param name="value">The actual value of the message.</param>
        public Message(TMessage value)
        {
            Value = value;
            Channel = new Channel();
        }


        /// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
        public void Dispose()
        {
            Channel.Dispose();
        }
    }

    /// <summary>Represents a channel used by an actor to reply to a message.</summary>         
    protected class Channel : IDisposable
    {
        /// <summary>The value of the reply.</summary>
        public TReply Value { get; private set; }

        /// <summary>Signifies that the message has been replied to.</summary>
        public ManualResetEvent CompleteEvent { get; private set; }


        /// <summary>Initializes a new instance of Genex.Concurrency.Channel class.</summary>
        public Channel()
        {
            CompleteEvent = new ManualResetEvent(false);
        }

        /// <summary>Reply to the message received.</summary>
        /// <param name="value">The value of the reply.</param>
        public void Reply(TReply value)
        {
            Value = value;
            CompleteEvent.Set();
        }

        /// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
        public void Dispose()
        {
            ((IDisposable)CompleteEvent).Dispose();
        }
    }
}
¿Fue útil?

Solución

En su código de ejemplo, implementa PostWithAsyncReply en términos de PostWithReply.Eso no es ideal, porque significa que cuando llamas PostWithAsyncReply y el actor tarda un poco en manejarlo, en realidad hay dos hilos atados:el que ejecuta al actor y el que espera que termine.Sería mejor que un subproceso ejecute al actor y luego llame a la devolución de llamada en el caso asincrónico.(Obviamente, en el caso sincrónico no se puede evitar la vinculación de dos hilos).

Actualizar:

Más sobre lo anterior:construyes un actor con un argumento que le dice cuántos subprocesos ejecutar.Para simplificar, supongamos que cada actor se ejecuta con un subproceso (en realidad, es una situación bastante buena porque los actores pueden tener un estado interno sin bloqueo, ya que solo un subproceso accede a él directamente).

El actor A llama al actor B, esperando una respuesta.Para manejar la solicitud, el actor B necesita llamar al actor C.Así que ahora los únicos subprocesos de A y B están esperando, y el de C es el único que realmente le da trabajo a la CPU.¡Esto en cuanto a subprocesos múltiples!Pero esto es lo que obtienes si esperas respuestas todo el tiempo.

Bien, podrías aumentar la cantidad de hilos que inicias en cada actor.Pero los iniciarías para que pudieran sentarse sin hacer nada.Una pila consume mucha memoria y el cambio de contexto puede resultar costoso.

Por lo tanto, es mejor enviar mensajes de forma asincrónica, con un mecanismo de devolución de llamada para que pueda obtener el resultado final.El problema con su implementación es que toma otro subproceso del grupo de subprocesos, simplemente para sentarse y esperar.Básicamente, aplica la solución alternativa de aumentar el número de subprocesos.Asignas un hilo a la tarea de nunca corriendo.

Sería mejor implementar PostWithReply en términos de PostWithAsyncReply, es decir.al revés.La versión asincrónica es de bajo nivel.Basado en mi ejemplo basado en delegados (¡porque implica menos escritura de código!):

private bool InsertCoinImpl(int value) 
{
    // only accept dimes/10p/whatever it is in euros
    return (value == 10);
}

public void InsertCoin(int value, Action<bool> accepted)
{
    Submit(() => accepted(InsertCoinImpl(value)));
}

Entonces la implementación privada devuelve un bool.El método público asíncrono acepta una acción que recibirá el valor de retorno;Tanto la implementación privada como la acción de devolución de llamada se ejecutan en el mismo hilo.

Con suerte, la necesidad de esperar sincrónicamente será un caso minoritario.Pero cuando lo necesites, podría ser proporcionado por un método auxiliar, de propósito totalmente general y no vinculado a ningún actor o tipo de mensaje específico:

public static T Wait<T>(Action<Action<T>> activity)
{
    T result = default(T);
    var finished = new EventWaitHandle(false, EventResetMode.AutoReset);

    activity(r =>
        {
            result = r;
            finished.Set();
        });

    finished.WaitOne();
    return result;
}

Entonces ahora en algún otro actor podemos decir:

bool accepted = Helpers.Wait<bool>(r => chocMachine.InsertCoin(5, r));

El argumento de tipo para Wait Puede que sea innecesario, no he intentado compilar nada de esto.Pero Wait Básicamente, crea una devolución de llamada para usted, para que pueda pasarla a algún método asincrónico, y en el exterior simplemente recupera lo que se pasó a la devolución de llamada como su valor de retorno.Tenga en cuenta que la lambda a la que pasa Wait todavía se ejecuta en el mismo hilo que llamó Wait.

Ahora os regresamos a nuestro programa habitual...

En cuanto al problema real sobre el que preguntaste, envías un mensaje a un actor para que haga algo.Los delegados son útiles aquí.Le permiten lograr que el compilador le genere una clase con algunos datos, un constructor al que ni siquiera tiene que llamar explícitamente y también un método.Si tienes que escribir un montón de clases pequeñas, cambia a delegados.

abstract class Actor
{
    Queue<Action> _messages = new Queue<Action>();

    protected void Submit(Action action)
    {
        // take out a lock of course
        _messages.Enqueue(action);
    }

    // also a "run" that reads and executes the 
    // message delegates on background threads
}

Ahora un actor derivado específico sigue este patrón:

class ChocolateMachineActor : Actor
{
    private void InsertCoinImpl(int value) 
    {
        // whatever...
    }

    public void InsertCoin(int value)
    {
        Submit(() => InsertCoinImpl(value));
    }
}

Entonces, para enviar un mensaje al actor, simplemente llama a los métodos públicos.Lo privado Impl El método hace el trabajo real.No es necesario escribir un montón de clases de mensajes a mano.

Obviamente he omitido lo relacionado con responder, pero todo eso se puede hacer con más parámetros.(Ver actualización arriba).

Otros consejos

Steve Gilham resume cómo el compilador realidad asas discriminado sindicatos. Para su propio código, usted podría considerar una versión simplificada de eso. Dada la siguiente F #:

type QueueMessage<T> = ClearMessage | TryDequeueMessage | EnqueueMessage of T

He aquí una forma de emular en C #:

public enum MessageType { ClearMessage, TryDequeueMessage, EnqueueMessage }

public abstract class QueueMessage<T>
{
    // prevents unwanted subclassing
    private QueueMessage() { }

    public abstract MessageType MessageType { get; }

    /// <summary>
    /// Only applies to EnqueueMessages
    /// </summary>
    public abstract T Item { get; }

    public static QueueMessage<T> MakeClearMessage() { return new ClearMessage(); }
    public static QueueMessage<T> MakeTryDequeueMessage() { return new TryDequeueMessage(); }
    public static QueueMessage<T> MakeEnqueueMessage(T item) { return new EnqueueMessage(item); }


    private sealed class ClearMessage : QueueMessage<T>
    {
        public ClearMessage() { }

        public override MessageType MessageType
        {
            get { return MessageType.ClearMessage; }
        }

        /// <summary>
        /// Not implemented by this subclass
        /// </summary>
        public override T Item
        {
            get { throw new NotImplementedException(); }
        }
    }

    private sealed class TryDequeueMessage : QueueMessage<T>
    {
        public TryDequeueMessage() { }

        public override MessageType MessageType
        {
            get { return MessageType.TryDequeueMessage; }
        }

        /// <summary>
        /// Not implemented by this subclass
        /// </summary>
        public override T Item
        {
            get { throw new NotImplementedException(); }
        }
    }

    private sealed class EnqueueMessage : QueueMessage<T>
    {
        private T item;
        public EnqueueMessage(T item) { this.item = item; }

        public override MessageType MessageType
        {
            get { return MessageType.EnqueueMessage; }
        }

        /// <summary>
        /// Gets the item to be enqueued
        /// </summary>
        public override T Item { get { return item; } }
    }
}

Ahora, en el código que se le da un QueueMessage, puede cambiar en la propiedad MessageType en lugar de coincidencia de patrones, y asegúrese de que tiene acceso a la propiedad Item sólo en EnqueueMessages.

Editar

Aquí hay otra alternativa, basada en el código de Julieta. He tratado de racionalizar las cosas de manera que tiene una interfaz más fácil de usar desde C #, sin embargo. Esto es preferible a la versión anterior en la que no se puede obtener una excepción MethodNotImplemented.

public abstract class QueueMessage<T>
{
    // prevents unwanted subclassing
    private QueueMessage() { }

    public abstract TReturn Match<TReturn>(Func<TReturn> clearCase, Func<TReturn> tryDequeueCase, Func<T, TReturn> enqueueCase);

    public static QueueMessage<T> MakeClearMessage() { return new ClearMessage(); }
    public static QueueMessage<T> MakeTryDequeueMessage() { return new TryDequeueMessage(); }
    public static QueueMessage<T> MakeEnqueueMessage(T item) { return new EnqueueMessage(item); }

    private sealed class ClearMessage : QueueMessage<T>
    {
        public ClearMessage() { }

        public override TReturn Match<TReturn>(Func<TReturn> clearCase, Func<TReturn> tryDequeueCase, Func<T, TReturn> enqueueCase)
        {
            return clearCase();
        }
    }

    private sealed class TryDequeueMessage : QueueMessage<T>
    {
        public TryDequeueMessage() { }

        public override TReturn Match<TReturn>(Func<TReturn> clearCase, Func<TReturn> tryDequeueCase, Func<T, TReturn> enqueueCase)
        {
            return tryDequeueCase();
        }
    }

    private sealed class EnqueueMessage : QueueMessage<T>
    {
        private T item;
        public EnqueueMessage(T item) { this.item = item; }

        public override TReturn Match<TReturn>(Func<TReturn> clearCase, Func<TReturn> tryDequeueCase, Func<T, TReturn> enqueueCase)
        {
            return enqueueCase(item);
        }
    }
}

tendrá que utilizar este código como el siguiente:

public class MessageUserTest
{
    public void Use()
    {
        // your code to get a message here...
        QueueMessage<string> msg = null; 

        // emulate pattern matching, but without constructor names
        int i =
            msg.Match(
                clearCase:      () => -1,
                tryDequeueCase: () => -2,
                enqueueCase:     s =>  s.Length);
    }
}

Los tipos de unión y el patrón de emparejamiento de mapas muy directamente al patrón de visitante, que he publicado sobre este unas cuantas veces antes:

Así que si quieres pasar mensajes con una gran cantidad de diferentes tipos, que está atrapado implementar el patrón de visitante.

(Advertencia, el código no probado por delante, pero debe darle una idea de cómo se hace)

Vamos a decir que tenemos algo como esto:

type msg =
    | Add of int
    | Sub of int
    | Query of ReplyChannel<int>


let rec counts = function
    | [] -> (0, 0, 0)
    | Add(_)::xs -> let (a, b, c) = counts xs in (a + 1, b, c)
    | Sub(_)::xs -> let (a, b, c) = counts xs in (a, b + 1, c)
    | Query(_)::xs -> let (a, b, c) = counts xs in (a, b, c + 1)

Se termina con el código C # voluminosos:

interface IMsgVisitor<T>
{
    T Visit(Add msg);
    T Visit(Sub msg);
    T Visit(Query msg);
}

abstract class Msg
{
    public abstract T Accept<T>(IMsgVistor<T> visitor)
}

class Add : Msg
{
    public readonly int Value;
    public Add(int value) { this.Value = value; }
    public override T Accept<T>(IMsgVisitor<T> visitor) { return visitor.Visit(this); }
}

class Sub : Msg
{
    public readonly int Value;
    public Add(int value) { this.Value = value; }
    public override T Accept<T>(IMsgVisitor<T> visitor) { return visitor.Visit(this); }
}

class Query : Msg
{
    public readonly ReplyChannel<int> Value;
    public Add(ReplyChannel<int> value) { this.Value = value; }
    public override T Accept<T>(IMsgVisitor<T> visitor) { return visitor.Visit(this); }
}

Ahora, cada vez que quieres hacer algo con el mensaje, es necesario implementar un visitante:

class MsgTypeCounter : IMsgVisitor<MsgTypeCounter>
{
    public readonly Tuple<int, int, int> State;    

    public MsgTypeCounter(Tuple<int, int, int> state) { this.State = state; }

    public MsgTypeCounter Visit(Add msg)
    {
        Console.WriteLine("got Add of " + msg.Value);
        return new MsgTypeCounter(Tuple.Create(1 + State.Item1, State.Item2, State.Item3));
    }

    public MsgTypeCounter Visit(Sub msg)
    {
        Console.WriteLine("got Sub of " + msg.Value);
        return new MsgTypeCounter(Tuple.Create(State.Item1, 1 + State.Item2, State.Item3));
    }

    public MsgTypeCounter Visit(Query msg)
    {
        Console.WriteLine("got Query of " + msg.Value);
        return new MsgTypeCounter(Tuple.Create(State.Item1, 1 + State.Item2, State.Item3));
    }
}

Entonces, finalmente, se puede utilizar de esta manera:

var msgs = new Msg[] { new Add(1), new Add(3), new Sub(4), new ReplyChannel(null) };
var counts = msgs.Aggregate(new MsgTypeVisitor(Tuple.Create(0, 0, 0)),
    (acc, x) => x.Accept(acc)).State;

Sí, es tan obtuso como parece, pero así es como pasa varios mensajes de una clase de una forma de tipo seguro, y eso también es por eso que no implementan los sindicatos en C #;)

Una posibilidad remota, pero de todos modos ..

Estoy asumiendo que es discriminado por la Unión F # para los ADT (Resumen Tipo de datos). Lo que significa que el tipo podría ser una de varias cosas.

En caso de que haya dos, se podría tratar de poner en un simple clase genérica con dos parámetros de tipo:

 public struct DiscriminatedUnion<T1,T2>
 {   
      public DiscriminatedUnion(T1 t1) { value = t1; }
      public DiscriminatedUnion(T2 t1) { value = t2; }


      public static implicit operator T1(DiscriminatedUnion<T1,T2> du) {return (T1)du.value; }
      public static implicit operator T2(DiscriminatedUnion<T1,T2> du) {return (T2)du.value; }

      object value;
 }

Para hacer que funcione para 3 o más, tenemos que replicar esta clase varias veces. Cualquiera tiene una solución para la sobrecarga de funciones dependiendo del tipo de tiempo de ejecución?

Si usted tiene este

type internal Either<'a, 'b> =
  | Left of 'a
  | Right of 'b

en F #, entonces el C # equivalente de la CLR generado para Either<'a, 'b> clase tiene tipos interiores como

internal  class _Left : Either<a, b>
{
     internal readonly a left1;
     internal _Left(a left1);
}

cada uno con una etiqueta, un captador y un método de fábrica

internal const  int tag_Left = 0;
internal static  Either<a, b> Left(a Left1);
internal a Left1 {  get; }

además de un discriminador

internal int  Tag { get; }

y una serie de métodos para implementar interfaces IStructuralEquatable, IComparable, IStructuralComparable

Hay un tiempo de compilación comprobada tipo de unión discriminada en unión discriminada en C #

private class ClearMessage
{
    public static readonly ClearMessage Instance = new ClearMessage();    
    private ClearMessage() { }
}

private class TryDequeueMessage 
{
    public static readonly TryDequeueMessage Instance = new TryDequeueMessage();    
    private TryDequeueMessage() { }
}

private class EnqueueMessage
{
    public TValue Item { get; private set; }    
    private EnqueueMessage(TValue item) { Item = item; }
}

Uso de la unión discriminada podría hacerse de la siguiente manera:

// New file
// Create an alias
using Message = Union<ClearMessage, TryDequeueMessage, EnqueMessage>;

int ProcessMessage(Message msg)
{
   return Message.Match(
      clear => 1,
      dequeue => 2,
      enqueue => 3);
}
Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top