Pergunta

Eu criei uma nova classe chamada ator que processa as mensagens passadas. O problema que estou encontrando é descobrir qual é a maneira mais elegante de passar mensagens relacionadas, mas diferentes para o ator. Minha primeira idéia é usar a herança, mas parece tão inchada, mas são fortemente tipos que são um requisito definitivo.

Tem alguma ideia?

Exemplo

private abstract class QueueMessage { }

private class ClearMessage : QueueMessage 
{
    public static readonly ClearMessage Instance = new ClearMessage();

    private ClearMessage() { }
}

private class TryDequeueMessage : QueueMessage 
{
    public static readonly TryDequeueMessage Instance = new TryDequeueMessage();

    private TryDequeueMessage() { }
}

private class EnqueueMessage : QueueMessage 
{
    public TValue Item { get; private set; }

    private EnqueueMessage(TValue item)
    {
        Item = item;
    }
}

Classe de ator

/// <summary>Represents a callback method to be executed by an Actor.</summary>
/// <typeparam name="TReply">The type of reply.</typeparam>
/// <param name="reply">The reply made by the actor.</param>
public delegate void ActorReplyCallback<TReply>(TReply reply);

/// <summary>Represents an Actor which receives and processes messages in concurrent applications.</summary>
/// <typeparam name="TMessage">The type of message this actor accepts.</typeparam>
/// <typeparam name="TReply">The type of reply made by this actor.</typeparam>
public abstract class Actor<TMessage, TReply> : IDisposable
{
    /// <summary>The default total number of threads to process messages.</summary>
    private const Int32 DefaultThreadCount = 1;


    /// <summary>Used to serialize access to the message queue.</summary>
    private readonly Locker Locker;

    /// <summary>Stores the messages until they can be processed.</summary>
    private readonly System.Collections.Generic.Queue<Message> MessageQueue;

    /// <summary>Signals the actor thread to process a new message.</summary>
    private readonly ManualResetEvent PostEvent;

    /// <summary>This tells the actor thread to stop reading from the queue.</summary>
    private readonly ManualResetEvent DisposeEvent;

    /// <summary>Processes the messages posted to the actor.</summary>
    private readonly List<Thread> ActorThreads;


    /// <summary>Initializes a new instance of the Genex.Concurrency&lt;TRequest, TResponse&gt; class.</summary>
    public Actor() : this(DefaultThreadCount) { }

    /// <summary>Initializes a new instance of the Genex.Concurrency&lt;TRequest, TResponse&gt; class.</summary>
    /// <param name="thread_count"></param>
    public Actor(Int32 thread_count)
    {
        if (thread_count < 1) throw new ArgumentOutOfRangeException("thread_count", thread_count, "Must be 1 or greater.");

        Locker = new Locker();
        MessageQueue = new System.Collections.Generic.Queue<Message>();
        EnqueueEvent = new ManualResetEvent(true);
        PostEvent = new ManualResetEvent(false);
        DisposeEvent = new ManualResetEvent(true);
        ActorThreads = new List<Thread>();

        for (Int32 i = 0; i < thread_count; i++)
        {
            var thread = new Thread(ProcessMessages);
            thread.IsBackground = true;
            thread.Start();
            ActorThreads.Add(thread);
        }
    }


    /// <summary>Posts a message and waits for the reply.</summary>
    /// <param name="value">The message to post to the actor.</param>
    /// <returns>The reply from the actor.</returns>
    public TReply PostWithReply(TMessage message)
    {
        using (var wrapper = new Message(message))
        {
            lock (Locker) MessageQueue.Enqueue(wrapper);
            PostEvent.Set();
            wrapper.Channel.CompleteEvent.WaitOne();
            return wrapper.Channel.Value;
        }
    }

    /// <summary>Posts a message to the actor and executes the callback when the reply is received.</summary>
    /// <param name="value">The message to post to the actor.</param>
    /// <param name="callback">The callback that will be invoked once the replay is received.</param>
    public void PostWithAsyncReply(TMessage value, ActorReplyCallback<TReply> callback)
    {
        if (callback == null) throw new ArgumentNullException("callback");
        ThreadPool.QueueUserWorkItem(state => callback(PostWithReply(value)));
    }

    /// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
    public void Dispose()
    {
        if (DisposeEvent.WaitOne(10))
        {
            DisposeEvent.Reset();
            PostEvent.Set();

            foreach (var thread in ActorThreads)
            {
                thread.Join();
            }

            ((IDisposable)PostEvent).Dispose();
            ((IDisposable)DisposeEvent).Dispose();
        }
    }

    /// <summary>Processes a message posted to the actor.</summary>
    /// <param name="message">The message to be processed.</param>
    protected abstract void ProcessMessage(Message message);

    /// <summary>Dequeues the messages passes them to ProcessMessage.</summary>
    private void ProcessMessages()
    {
        while (PostEvent.WaitOne() && DisposeEvent.WaitOne(10))
        {
            var message = (Message)null;

            while (true)
            {
                lock (Locker)
                {
                    message = MessageQueue.Count > 0 ?
                        MessageQueue.Dequeue() :
                        null;

                    if (message == null)
                    {
                        PostEvent.Reset();
                        break;
                    }
                }

                try
                {
                    ProcessMessage(message);
                }
                catch
                {

                }
            }
        }
    }


    /// <summary>Represents a message that is passed to an actor.</summary>
    protected class Message : IDisposable
    {
        /// <summary>The actual value of this message.</summary>
        public TMessage Value { get; private set; }

        /// <summary>The channel used to give a reply to this message.</summary>
        public Channel Channel { get; private set; }


        /// <summary>Initializes a new instance of Genex.Concurrency.Message class.</summary>
        /// <param name="value">The actual value of the message.</param>
        public Message(TMessage value)
        {
            Value = value;
            Channel = new Channel();
        }


        /// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
        public void Dispose()
        {
            Channel.Dispose();
        }
    }

    /// <summary>Represents a channel used by an actor to reply to a message.</summary>         
    protected class Channel : IDisposable
    {
        /// <summary>The value of the reply.</summary>
        public TReply Value { get; private set; }

        /// <summary>Signifies that the message has been replied to.</summary>
        public ManualResetEvent CompleteEvent { get; private set; }


        /// <summary>Initializes a new instance of Genex.Concurrency.Channel class.</summary>
        public Channel()
        {
            CompleteEvent = new ManualResetEvent(false);
        }

        /// <summary>Reply to the message received.</summary>
        /// <param name="value">The value of the reply.</param>
        public void Reply(TReply value)
        {
            Value = value;
            CompleteEvent.Set();
        }

        /// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
        public void Dispose()
        {
            ((IDisposable)CompleteEvent).Dispose();
        }
    }
}
Foi útil?

Solução

No seu código de exemplo, você implementa PostWithAsyncReply em termos de PostWithReply. Isso não é o ideal, porque significa que quando você liga PostWithAsyncReply E o ator demora um pouco para lidar com isso, na verdade, existem dois threads amarrados: aquele que executa o ator e o que está esperando que ele termine. Seria melhor ter o tópico executando o ator e depois chamando o retorno de chamada no caso assíncrono. (Obviamente, no caso síncrono, não há como evitar a amarração de dois threads).

Atualizar:

Mais sobre o exposto: você constrói um ator com um argumento dizendo quantos threads executam. Para simplificar, suponha que todo ator funcione com um thread (na verdade, uma situação bastante boa, porque os atores podem ter estado interno sem bloqueio, pois apenas um thread acessa -o diretamente).

O ator A chama o ator B, esperando uma resposta. Para lidar com a solicitação, o ator B precisa ligar para o ator C. Então agora os apenas threads de A e B estão esperando, e os C's são os únicos que realmente dão à CPU qualquer trabalho a fazer. Tanto para multi-threading! Mas é isso que você ganha se esperar respostas o tempo todo.

Ok, você pode aumentar o número de threads que inicia em cada ator. Mas você os iniciaria para que eles pudessem se sentar sem fazer nada. Uma pilha usa muita memória e a troca de contexto pode ser cara.

Portanto, é melhor enviar mensagens de maneira assíncrona, com um mecanismo de retorno de chamada para que você possa captar o resultado final. O problema com sua implementação é que você pega outro tópico do pool de threads, puramente para sentar e esperar. Então, você basicamente aplica a solução alternativa de aumentar o número de threads. Você aloca um tópico para a tarefa de nunca correndo.

Seria melhor implementar PostWithReply em termos de PostWithAsyncReply, ou seja, o caminho oposto. A versão assíncrona é de baixo nível. Com base no meu exemplo baseado em delegado (porque envolve menos digitação de código!):

private bool InsertCoinImpl(int value) 
{
    // only accept dimes/10p/whatever it is in euros
    return (value == 10);
}

public void InsertCoin(int value, Action<bool> accepted)
{
    Submit(() => accepted(InsertCoinImpl(value)));
}

Portanto, a implementação privada retorna um bool. O método assíncrono público aceita uma ação que receberá o valor de retorno; Tanto a implementação privada quanto a ação de retorno de chamada são executadas no mesmo thread.

Esperançosamente, a necessidade de esperar de forma síncrona será o caso minoritário. Mas quando você precisar, ele pode ser fornecido por um método auxiliar, totalmente geral e não vinculado a nenhum ator ou tipo de mensagem específico:

public static T Wait<T>(Action<Action<T>> activity)
{
    T result = default(T);
    var finished = new EventWaitHandle(false, EventResetMode.AutoReset);

    activity(r =>
        {
            result = r;
            finished.Set();
        });

    finished.WaitOne();
    return result;
}

Então agora em algum outro ator, podemos dizer:

bool accepted = Helpers.Wait<bool>(r => chocMachine.InsertCoin(5, r));

O tipo de argumento para Wait Pode ser desnecessário, não tentei compilar nada disso. Mas Wait Basicamente, um retorno de chamada de mágica para você, para que você possa transmiti-lo para algum método assíncrono e, do lado de fora, basta receber o que foi passado para o retorno de chamada como seu valor de retorno. Observe que o lambda que você passa Wait ainda é realmente executado no mesmo tópico que chamou Wait.

Agora o retornamos ao nosso programa regular ...

Quanto ao problema real sobre o qual você perguntou, você envia uma mensagem a um ator para fazer com que faça algo. Os delegados são úteis aqui. Eles permitem que você efetivamente faça com que o compilador gerasse uma classe com alguns dados, um construtor que você nem precisa chamar explicitamente e também um método. Se você precisar escrever um monte de pequenas aulas, mude para os delegados.

abstract class Actor
{
    Queue<Action> _messages = new Queue<Action>();

    protected void Submit(Action action)
    {
        // take out a lock of course
        _messages.Enqueue(action);
    }

    // also a "run" that reads and executes the 
    // message delegates on background threads
}

Agora, um ator derivado específico segue esse padrão:

class ChocolateMachineActor : Actor
{
    private void InsertCoinImpl(int value) 
    {
        // whatever...
    }

    public void InsertCoin(int value)
    {
        Submit(() => InsertCoinImpl(value));
    }
}

Então, para enviar uma mensagem ao ator, basta chamar os métodos públicos. O privado Impl Método faz o trabalho real. Não há necessidade de escrever um monte de aulas de mensagem manualmente.

Obviamente, deixei de fora as coisas sobre responder, mas tudo isso pode ser feito com mais parâmetros. (Veja a atualização acima).

Outras dicas

Steve Gilham resumiu como o compilador lida com sindicatos discriminados. Para seu próprio código, você pode considerar uma versão simplificada disso. Dado o seguinte F#:

type QueueMessage<T> = ClearMessage | TryDequeueMessage | EnqueueMessage of T

Aqui está uma maneira de imitá -lo em C#:

public enum MessageType { ClearMessage, TryDequeueMessage, EnqueueMessage }

public abstract class QueueMessage<T>
{
    // prevents unwanted subclassing
    private QueueMessage() { }

    public abstract MessageType MessageType { get; }

    /// <summary>
    /// Only applies to EnqueueMessages
    /// </summary>
    public abstract T Item { get; }

    public static QueueMessage<T> MakeClearMessage() { return new ClearMessage(); }
    public static QueueMessage<T> MakeTryDequeueMessage() { return new TryDequeueMessage(); }
    public static QueueMessage<T> MakeEnqueueMessage(T item) { return new EnqueueMessage(item); }


    private sealed class ClearMessage : QueueMessage<T>
    {
        public ClearMessage() { }

        public override MessageType MessageType
        {
            get { return MessageType.ClearMessage; }
        }

        /// <summary>
        /// Not implemented by this subclass
        /// </summary>
        public override T Item
        {
            get { throw new NotImplementedException(); }
        }
    }

    private sealed class TryDequeueMessage : QueueMessage<T>
    {
        public TryDequeueMessage() { }

        public override MessageType MessageType
        {
            get { return MessageType.TryDequeueMessage; }
        }

        /// <summary>
        /// Not implemented by this subclass
        /// </summary>
        public override T Item
        {
            get { throw new NotImplementedException(); }
        }
    }

    private sealed class EnqueueMessage : QueueMessage<T>
    {
        private T item;
        public EnqueueMessage(T item) { this.item = item; }

        public override MessageType MessageType
        {
            get { return MessageType.EnqueueMessage; }
        }

        /// <summary>
        /// Gets the item to be enqueued
        /// </summary>
        public override T Item { get { return item; } }
    }
}

Agora, em código que é dado um QueueMessage, você pode ligar o MessageType propriedade em vez de correspondência de padrões e certifique -se de acessar o Item propriedade apenas em EnqueueMessages.

EDITAR

Aqui está outra alternativa, com base no código de Julieta. Eu tentei otimizar as coisas para que ela tenha uma interface mais utilizável da C#. Isso é preferível à versão anterior, pois você não pode obter um MethodNotImplemented exceção.

public abstract class QueueMessage<T>
{
    // prevents unwanted subclassing
    private QueueMessage() { }

    public abstract TReturn Match<TReturn>(Func<TReturn> clearCase, Func<TReturn> tryDequeueCase, Func<T, TReturn> enqueueCase);

    public static QueueMessage<T> MakeClearMessage() { return new ClearMessage(); }
    public static QueueMessage<T> MakeTryDequeueMessage() { return new TryDequeueMessage(); }
    public static QueueMessage<T> MakeEnqueueMessage(T item) { return new EnqueueMessage(item); }

    private sealed class ClearMessage : QueueMessage<T>
    {
        public ClearMessage() { }

        public override TReturn Match<TReturn>(Func<TReturn> clearCase, Func<TReturn> tryDequeueCase, Func<T, TReturn> enqueueCase)
        {
            return clearCase();
        }
    }

    private sealed class TryDequeueMessage : QueueMessage<T>
    {
        public TryDequeueMessage() { }

        public override TReturn Match<TReturn>(Func<TReturn> clearCase, Func<TReturn> tryDequeueCase, Func<T, TReturn> enqueueCase)
        {
            return tryDequeueCase();
        }
    }

    private sealed class EnqueueMessage : QueueMessage<T>
    {
        private T item;
        public EnqueueMessage(T item) { this.item = item; }

        public override TReturn Match<TReturn>(Func<TReturn> clearCase, Func<TReturn> tryDequeueCase, Func<T, TReturn> enqueueCase)
        {
            return enqueueCase(item);
        }
    }
}

Você usaria este código como este:

public class MessageUserTest
{
    public void Use()
    {
        // your code to get a message here...
        QueueMessage<string> msg = null; 

        // emulate pattern matching, but without constructor names
        int i =
            msg.Match(
                clearCase:      () => -1,
                tryDequeueCase: () => -2,
                enqueueCase:     s =>  s.Length);
    }
}

Tipos de sindicatos e mapa de correspondência de padrões diretamente diretamente ao padrão do visitante, já publiquei sobre isso algumas vezes antes:

Portanto, se você deseja passar mensagens com muitos tipos diferentes, está preso implementando o padrão do visitante.

(Aviso, código não testado com antecedência, mas deve dar uma idéia de como é feito)

Digamos que temos algo assim:

type msg =
    | Add of int
    | Sub of int
    | Query of ReplyChannel<int>


let rec counts = function
    | [] -> (0, 0, 0)
    | Add(_)::xs -> let (a, b, c) = counts xs in (a + 1, b, c)
    | Sub(_)::xs -> let (a, b, c) = counts xs in (a, b + 1, c)
    | Query(_)::xs -> let (a, b, c) = counts xs in (a, b, c + 1)

Você acaba com este código C# volumoso:

interface IMsgVisitor<T>
{
    T Visit(Add msg);
    T Visit(Sub msg);
    T Visit(Query msg);
}

abstract class Msg
{
    public abstract T Accept<T>(IMsgVistor<T> visitor)
}

class Add : Msg
{
    public readonly int Value;
    public Add(int value) { this.Value = value; }
    public override T Accept<T>(IMsgVisitor<T> visitor) { return visitor.Visit(this); }
}

class Sub : Msg
{
    public readonly int Value;
    public Add(int value) { this.Value = value; }
    public override T Accept<T>(IMsgVisitor<T> visitor) { return visitor.Visit(this); }
}

class Query : Msg
{
    public readonly ReplyChannel<int> Value;
    public Add(ReplyChannel<int> value) { this.Value = value; }
    public override T Accept<T>(IMsgVisitor<T> visitor) { return visitor.Visit(this); }
}

Agora, sempre que você quiser fazer algo com a mensagem, você precisa implementar um visitante:

class MsgTypeCounter : IMsgVisitor<MsgTypeCounter>
{
    public readonly Tuple<int, int, int> State;    

    public MsgTypeCounter(Tuple<int, int, int> state) { this.State = state; }

    public MsgTypeCounter Visit(Add msg)
    {
        Console.WriteLine("got Add of " + msg.Value);
        return new MsgTypeCounter(Tuple.Create(1 + State.Item1, State.Item2, State.Item3));
    }

    public MsgTypeCounter Visit(Sub msg)
    {
        Console.WriteLine("got Sub of " + msg.Value);
        return new MsgTypeCounter(Tuple.Create(State.Item1, 1 + State.Item2, State.Item3));
    }

    public MsgTypeCounter Visit(Query msg)
    {
        Console.WriteLine("got Query of " + msg.Value);
        return new MsgTypeCounter(Tuple.Create(State.Item1, 1 + State.Item2, State.Item3));
    }
}

Então, finalmente, você pode usá -lo assim:

var msgs = new Msg[] { new Add(1), new Add(3), new Sub(4), new ReplyChannel(null) };
var counts = msgs.Aggregate(new MsgTypeVisitor(Tuple.Create(0, 0, 0)),
    (acc, x) => x.Accept(acc)).State;

Sim, é tão obtuso quanto parece, mas é assim que você passa várias mensagens uma classe de maneira segura, e é por isso que não implementamos sindicatos em C#;)

Um tiro no escuro, mas de qualquer maneira ..

Estou assumindo que a união discriminada é F# para ADT (tipo de dados abstrato). O que significa que o tipo pode ser uma das várias coisas.

Caso haja dois, você pode tentar colocá -lo em uma classe genérica simples com dois parâmetros de tipo:

 public struct DiscriminatedUnion<T1,T2>
 {   
      public DiscriminatedUnion(T1 t1) { value = t1; }
      public DiscriminatedUnion(T2 t1) { value = t2; }


      public static implicit operator T1(DiscriminatedUnion<T1,T2> du) {return (T1)du.value; }
      public static implicit operator T2(DiscriminatedUnion<T1,T2> du) {return (T2)du.value; }

      object value;
 }

Para fazê -lo funcionar por 3 ou mais, precisamos replicar essa classe várias vezes. Alguém tem uma solução para sobrecarga de função, dependendo do tipo de tempo de execução?

Se você tem isso

type internal Either<'a, 'b> =
  | Left of 'a
  | Right of 'b

Em F#, então o equivalente C# do CLR gerado para a aula Either<'a, 'b> tem tipos internos como

internal  class _Left : Either<a, b>
{
     internal readonly a left1;
     internal _Left(a left1);
}

cada um com uma tag, um getter e um método de fábrica

internal const  int tag_Left = 0;
internal static  Either<a, b> Left(a Left1);
internal a Left1 {  get; }

além de um discriminador

internal int  Tag { get; }

e uma série de métodos para implementar interfaces IStructuralEquatable, IComparable, IStructuralComparable

Há um tipo de sindicato discriminado por tempo de compilação em tempo de compilação em União discriminada em C#

private class ClearMessage
{
    public static readonly ClearMessage Instance = new ClearMessage();    
    private ClearMessage() { }
}

private class TryDequeueMessage 
{
    public static readonly TryDequeueMessage Instance = new TryDequeueMessage();    
    private TryDequeueMessage() { }
}

private class EnqueueMessage
{
    public TValue Item { get; private set; }    
    private EnqueueMessage(TValue item) { Item = item; }
}

Usar o sindicato discriminado pode ser feito da seguinte forma:

// New file
// Create an alias
using Message = Union<ClearMessage, TryDequeueMessage, EnqueMessage>;

int ProcessMessage(Message msg)
{
   return Message.Match(
      clear => 1,
      dequeue => 2,
      enqueue => 3);
}
Licenciado em: CC-BY-SA com atribuição
Não afiliado a StackOverflow
scroll top