Domanda

Ho creato un attore chiamato nuova classe che elabora i messaggi passati ad esso. Il problema che sto funzionando in è capire qual è il modo più elegante per passare i messaggi correlati, ma diversi per l'attore. La mia prima idea è quella di utilizzare l'ereditarietà, ma sembra così gonfio, ma è fortemente tipi che è un requisito definito.

ha qualche idea?

Esempio

private abstract class QueueMessage { }

private class ClearMessage : QueueMessage 
{
    public static readonly ClearMessage Instance = new ClearMessage();

    private ClearMessage() { }
}

private class TryDequeueMessage : QueueMessage 
{
    public static readonly TryDequeueMessage Instance = new TryDequeueMessage();

    private TryDequeueMessage() { }
}

private class EnqueueMessage : QueueMessage 
{
    public TValue Item { get; private set; }

    private EnqueueMessage(TValue item)
    {
        Item = item;
    }
}

Attore Classe

/// <summary>Represents a callback method to be executed by an Actor.</summary>
/// <typeparam name="TReply">The type of reply.</typeparam>
/// <param name="reply">The reply made by the actor.</param>
public delegate void ActorReplyCallback<TReply>(TReply reply);

/// <summary>Represents an Actor which receives and processes messages in concurrent applications.</summary>
/// <typeparam name="TMessage">The type of message this actor accepts.</typeparam>
/// <typeparam name="TReply">The type of reply made by this actor.</typeparam>
public abstract class Actor<TMessage, TReply> : IDisposable
{
    /// <summary>The default total number of threads to process messages.</summary>
    private const Int32 DefaultThreadCount = 1;


    /// <summary>Used to serialize access to the message queue.</summary>
    private readonly Locker Locker;

    /// <summary>Stores the messages until they can be processed.</summary>
    private readonly System.Collections.Generic.Queue<Message> MessageQueue;

    /// <summary>Signals the actor thread to process a new message.</summary>
    private readonly ManualResetEvent PostEvent;

    /// <summary>This tells the actor thread to stop reading from the queue.</summary>
    private readonly ManualResetEvent DisposeEvent;

    /// <summary>Processes the messages posted to the actor.</summary>
    private readonly List<Thread> ActorThreads;


    /// <summary>Initializes a new instance of the Genex.Concurrency&lt;TRequest, TResponse&gt; class.</summary>
    public Actor() : this(DefaultThreadCount) { }

    /// <summary>Initializes a new instance of the Genex.Concurrency&lt;TRequest, TResponse&gt; class.</summary>
    /// <param name="thread_count"></param>
    public Actor(Int32 thread_count)
    {
        if (thread_count < 1) throw new ArgumentOutOfRangeException("thread_count", thread_count, "Must be 1 or greater.");

        Locker = new Locker();
        MessageQueue = new System.Collections.Generic.Queue<Message>();
        EnqueueEvent = new ManualResetEvent(true);
        PostEvent = new ManualResetEvent(false);
        DisposeEvent = new ManualResetEvent(true);
        ActorThreads = new List<Thread>();

        for (Int32 i = 0; i < thread_count; i++)
        {
            var thread = new Thread(ProcessMessages);
            thread.IsBackground = true;
            thread.Start();
            ActorThreads.Add(thread);
        }
    }


    /// <summary>Posts a message and waits for the reply.</summary>
    /// <param name="value">The message to post to the actor.</param>
    /// <returns>The reply from the actor.</returns>
    public TReply PostWithReply(TMessage message)
    {
        using (var wrapper = new Message(message))
        {
            lock (Locker) MessageQueue.Enqueue(wrapper);
            PostEvent.Set();
            wrapper.Channel.CompleteEvent.WaitOne();
            return wrapper.Channel.Value;
        }
    }

    /// <summary>Posts a message to the actor and executes the callback when the reply is received.</summary>
    /// <param name="value">The message to post to the actor.</param>
    /// <param name="callback">The callback that will be invoked once the replay is received.</param>
    public void PostWithAsyncReply(TMessage value, ActorReplyCallback<TReply> callback)
    {
        if (callback == null) throw new ArgumentNullException("callback");
        ThreadPool.QueueUserWorkItem(state => callback(PostWithReply(value)));
    }

    /// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
    public void Dispose()
    {
        if (DisposeEvent.WaitOne(10))
        {
            DisposeEvent.Reset();
            PostEvent.Set();

            foreach (var thread in ActorThreads)
            {
                thread.Join();
            }

            ((IDisposable)PostEvent).Dispose();
            ((IDisposable)DisposeEvent).Dispose();
        }
    }

    /// <summary>Processes a message posted to the actor.</summary>
    /// <param name="message">The message to be processed.</param>
    protected abstract void ProcessMessage(Message message);

    /// <summary>Dequeues the messages passes them to ProcessMessage.</summary>
    private void ProcessMessages()
    {
        while (PostEvent.WaitOne() && DisposeEvent.WaitOne(10))
        {
            var message = (Message)null;

            while (true)
            {
                lock (Locker)
                {
                    message = MessageQueue.Count > 0 ?
                        MessageQueue.Dequeue() :
                        null;

                    if (message == null)
                    {
                        PostEvent.Reset();
                        break;
                    }
                }

                try
                {
                    ProcessMessage(message);
                }
                catch
                {

                }
            }
        }
    }


    /// <summary>Represents a message that is passed to an actor.</summary>
    protected class Message : IDisposable
    {
        /// <summary>The actual value of this message.</summary>
        public TMessage Value { get; private set; }

        /// <summary>The channel used to give a reply to this message.</summary>
        public Channel Channel { get; private set; }


        /// <summary>Initializes a new instance of Genex.Concurrency.Message class.</summary>
        /// <param name="value">The actual value of the message.</param>
        public Message(TMessage value)
        {
            Value = value;
            Channel = new Channel();
        }


        /// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
        public void Dispose()
        {
            Channel.Dispose();
        }
    }

    /// <summary>Represents a channel used by an actor to reply to a message.</summary>         
    protected class Channel : IDisposable
    {
        /// <summary>The value of the reply.</summary>
        public TReply Value { get; private set; }

        /// <summary>Signifies that the message has been replied to.</summary>
        public ManualResetEvent CompleteEvent { get; private set; }


        /// <summary>Initializes a new instance of Genex.Concurrency.Channel class.</summary>
        public Channel()
        {
            CompleteEvent = new ManualResetEvent(false);
        }

        /// <summary>Reply to the message received.</summary>
        /// <param name="value">The value of the reply.</param>
        public void Reply(TReply value)
        {
            Value = value;
            CompleteEvent.Set();
        }

        /// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
        public void Dispose()
        {
            ((IDisposable)CompleteEvent).Dispose();
        }
    }
}
È stato utile?

Soluzione

Nel codice esempio, implementare PostWithAsyncReply in termini di PostWithReply. Questo non è l'ideale, perché vuol dire che quando si chiama PostWithAsyncReply e l'attore vuole un po 'per gestire la cosa, ci sono in realtà due thread legato: quello di eseguire l'attore e quella in attesa che finisca. Sarebbe meglio avere un thread in esecuzione l'attore e quindi chiamando la richiamata nel caso asincrono. (Ovviamente nel caso sincrono c'è nessun evitando la legatura di due fili).

Aggiornamento:

Ulteriori informazioni su quanto sopra: si costruisce un attore con un argomento dicendogli il numero di thread per l'esecuzione. Per semplicità supponiamo ogni attore viene eseguito con un filo (in realtà una buona situazione del tutto perché gli attori possono avere stato interno senza bloccaggio su di essa, come solo thread accede direttamente).

A chiama Attore Attore B, in attesa di una risposta. Al fine di gestire la richiesta, l'attore B ha bisogno di chiamare l'attore C. Così ora A e solo discussioni di B sono in attesa, e C di è l'unico in realtà dando la CPU alcun lavoro da fare. Questo per quanto riguarda il multi-threading! Ma questo è quello che si ottiene se si attende per le risposte per tutto il tempo.

D'accordo, si potrebbe aumentare il numero di thread che si avvia a ogni attore. Ma devi essere loro a partire in modo che potessero sedersi intorno facendo niente. Una pila consuma molta memoria, e cambio di contesto può essere costoso.

Quindi è meglio per inviare messaggi in modo asincrono, con un meccanismo di callback in modo da poter raccogliere il risultato finale. Il problema con l'implementazione è che si afferra un altro thread dal pool di thread, puramente a sedersi intorno e aspettare. Quindi, si applica in pratica la soluzione di aumentare il numero di thread. Si assegna un filo al compito di mai in esecuzione .

Sarebbe preferibile implementare PostWithReply in termini di PostWithAsyncReply, cioè il tondo opposto modo. La versione asincrona è di basso livello. Sulla mio esempio delegato-based (perché si tratta di meno di battitura di codice!):

private bool InsertCoinImpl(int value) 
{
    // only accept dimes/10p/whatever it is in euros
    return (value == 10);
}

public void InsertCoin(int value, Action<bool> accepted)
{
    Submit(() => accepted(InsertCoinImpl(value)));
}

Quindi, l'attuazione privata restituisce un bool. Il metodo asincrono pubblico accetta un'azione che riceverà il valore di ritorno; sia l'implementazione privata e l'azione callback vengono eseguite sullo stesso filo.

Speriamo che la necessità di attendere in modo sincrono sarà il caso di minoranza. Ma quando ne avete bisogno, potrebbe essere alimentato da un metodo di supporto, lo scopo del tutto generale e non legato ad alcuna specifica attore o un messaggio tipo:

public static T Wait<T>(Action<Action<T>> activity)
{
    T result = default(T);
    var finished = new EventWaitHandle(false, EventResetMode.AutoReset);

    activity(r =>
        {
            result = r;
            finished.Set();
        });

    finished.WaitOne();
    return result;
}

Quindi, ora in qualche altro attore possiamo dire:

bool accepted = Helpers.Wait<bool>(r => chocMachine.InsertCoin(5, r));

Il tipo di argomento Wait può essere inutile, non ho provato la compilazione nulla di tutto questo. Ma Wait fondamentalmente magie-up di un callback per voi, in modo da poter passare a qualche metodo asincrono, e sulla parte esterna è semplicemente di avere indietro ciò che è stato passato al callback come valore di ritorno. Si noti che il lambda si passa a Wait ancora in realtà esegue sullo stesso thread che ha chiamato Wait.

Ora si ritorna al nostro regolare programma ...

Per quanto riguarda il problema reale hai chiesto, si invia un messaggio ad un attore per arrivare a fare qualcosa. I delegati sono utili qui. Essi consentono di ottenere in modo efficace il compilatore a generare una classe con alcuni dati, un costruttore che non hanno nemmeno bisogno di chiamare in modo esplicito e anche un metodo. Se stai dover scrivere un sacco di piccole classi, passa ai delegati.

abstract class Actor
{
    Queue<Action> _messages = new Queue<Action>();

    protected void Submit(Action action)
    {
        // take out a lock of course
        _messages.Enqueue(action);
    }

    // also a "run" that reads and executes the 
    // message delegates on background threads
}

Ora uno specifico attore derivato segue questo schema:

class ChocolateMachineActor : Actor
{
    private void InsertCoinImpl(int value) 
    {
        // whatever...
    }

    public void InsertCoin(int value)
    {
        Submit(() => InsertCoinImpl(value));
    }
}

Quindi, per inviare un messaggio a l'attore, basta chiamare i metodi pubblici. Il metodo Impl privato fa il vero lavoro. Non c'è bisogno di scrivere una serie di classi di segnalazione a mano.

Ovviamente ho lasciato fuori la roba su rispondere, ma che può essere fatto con più parametri. (Vedere aggiornare sopra).

Altri suggerimenti

Steve Gilham riassunta come il compilatore gestisce in realtà sindacati discriminati. Per il proprio codice, si potrebbe prendere in considerazione una versione semplificata di quello. Dato il seguente F #:

type QueueMessage<T> = ClearMessage | TryDequeueMessage | EnqueueMessage of T

Ecco un modo di emulare in C #:

public enum MessageType { ClearMessage, TryDequeueMessage, EnqueueMessage }

public abstract class QueueMessage<T>
{
    // prevents unwanted subclassing
    private QueueMessage() { }

    public abstract MessageType MessageType { get; }

    /// <summary>
    /// Only applies to EnqueueMessages
    /// </summary>
    public abstract T Item { get; }

    public static QueueMessage<T> MakeClearMessage() { return new ClearMessage(); }
    public static QueueMessage<T> MakeTryDequeueMessage() { return new TryDequeueMessage(); }
    public static QueueMessage<T> MakeEnqueueMessage(T item) { return new EnqueueMessage(item); }


    private sealed class ClearMessage : QueueMessage<T>
    {
        public ClearMessage() { }

        public override MessageType MessageType
        {
            get { return MessageType.ClearMessage; }
        }

        /// <summary>
        /// Not implemented by this subclass
        /// </summary>
        public override T Item
        {
            get { throw new NotImplementedException(); }
        }
    }

    private sealed class TryDequeueMessage : QueueMessage<T>
    {
        public TryDequeueMessage() { }

        public override MessageType MessageType
        {
            get { return MessageType.TryDequeueMessage; }
        }

        /// <summary>
        /// Not implemented by this subclass
        /// </summary>
        public override T Item
        {
            get { throw new NotImplementedException(); }
        }
    }

    private sealed class EnqueueMessage : QueueMessage<T>
    {
        private T item;
        public EnqueueMessage(T item) { this.item = item; }

        public override MessageType MessageType
        {
            get { return MessageType.EnqueueMessage; }
        }

        /// <summary>
        /// Gets the item to be enqueued
        /// </summary>
        public override T Item { get { return item; } }
    }
}

Ora, nel codice che viene dato un QueueMessage, è possibile attivare la proprietà MessageType in luogo di pattern matching, e assicurarsi che si accede alla proprietà Item solo su EnqueueMessages.

Modifica

Ecco un'altra alternativa, basata sul codice di Giulietta. Ho cercato di semplificare le cose in modo che esso ha un interfaccia più utilizzabile da C #, però. Questo è preferibile alla versione precedente in quanto non è possibile ottenere un'eccezione MethodNotImplemented.

public abstract class QueueMessage<T>
{
    // prevents unwanted subclassing
    private QueueMessage() { }

    public abstract TReturn Match<TReturn>(Func<TReturn> clearCase, Func<TReturn> tryDequeueCase, Func<T, TReturn> enqueueCase);

    public static QueueMessage<T> MakeClearMessage() { return new ClearMessage(); }
    public static QueueMessage<T> MakeTryDequeueMessage() { return new TryDequeueMessage(); }
    public static QueueMessage<T> MakeEnqueueMessage(T item) { return new EnqueueMessage(item); }

    private sealed class ClearMessage : QueueMessage<T>
    {
        public ClearMessage() { }

        public override TReturn Match<TReturn>(Func<TReturn> clearCase, Func<TReturn> tryDequeueCase, Func<T, TReturn> enqueueCase)
        {
            return clearCase();
        }
    }

    private sealed class TryDequeueMessage : QueueMessage<T>
    {
        public TryDequeueMessage() { }

        public override TReturn Match<TReturn>(Func<TReturn> clearCase, Func<TReturn> tryDequeueCase, Func<T, TReturn> enqueueCase)
        {
            return tryDequeueCase();
        }
    }

    private sealed class EnqueueMessage : QueueMessage<T>
    {
        private T item;
        public EnqueueMessage(T item) { this.item = item; }

        public override TReturn Match<TReturn>(Func<TReturn> clearCase, Func<TReturn> tryDequeueCase, Func<T, TReturn> enqueueCase)
        {
            return enqueueCase(item);
        }
    }
}

Si potrebbe utilizzare questo codice come questo:

public class MessageUserTest
{
    public void Use()
    {
        // your code to get a message here...
        QueueMessage<string> msg = null; 

        // emulate pattern matching, but without constructor names
        int i =
            msg.Match(
                clearCase:      () => -1,
                tryDequeueCase: () => -2,
                enqueueCase:     s =>  s.Length);
    }
}

tipi dell'Unione e pattern matching mappa abbastanza direttamente al modello visitatore, ho postato su questo un paio di volte prima di:

Quindi, se si desidera passare i messaggi con un sacco di diversi tipi, sei bloccato l'attuazione del modello visitatore.

(Attenzione, codice non testato avanti, ma dovrebbe darvi un'idea di come il suo fare)

Diciamo che abbiamo qualcosa di simile a questo:

type msg =
    | Add of int
    | Sub of int
    | Query of ReplyChannel<int>


let rec counts = function
    | [] -> (0, 0, 0)
    | Add(_)::xs -> let (a, b, c) = counts xs in (a + 1, b, c)
    | Sub(_)::xs -> let (a, b, c) = counts xs in (a, b + 1, c)
    | Query(_)::xs -> let (a, b, c) = counts xs in (a, b, c + 1)

Si finisce con questo codice ingombranti C #:

interface IMsgVisitor<T>
{
    T Visit(Add msg);
    T Visit(Sub msg);
    T Visit(Query msg);
}

abstract class Msg
{
    public abstract T Accept<T>(IMsgVistor<T> visitor)
}

class Add : Msg
{
    public readonly int Value;
    public Add(int value) { this.Value = value; }
    public override T Accept<T>(IMsgVisitor<T> visitor) { return visitor.Visit(this); }
}

class Sub : Msg
{
    public readonly int Value;
    public Add(int value) { this.Value = value; }
    public override T Accept<T>(IMsgVisitor<T> visitor) { return visitor.Visit(this); }
}

class Query : Msg
{
    public readonly ReplyChannel<int> Value;
    public Add(ReplyChannel<int> value) { this.Value = value; }
    public override T Accept<T>(IMsgVisitor<T> visitor) { return visitor.Visit(this); }
}

Ora ogni volta che si vuole fare qualcosa con il messaggio, è necessario implementare un visitatore:

class MsgTypeCounter : IMsgVisitor<MsgTypeCounter>
{
    public readonly Tuple<int, int, int> State;    

    public MsgTypeCounter(Tuple<int, int, int> state) { this.State = state; }

    public MsgTypeCounter Visit(Add msg)
    {
        Console.WriteLine("got Add of " + msg.Value);
        return new MsgTypeCounter(Tuple.Create(1 + State.Item1, State.Item2, State.Item3));
    }

    public MsgTypeCounter Visit(Sub msg)
    {
        Console.WriteLine("got Sub of " + msg.Value);
        return new MsgTypeCounter(Tuple.Create(State.Item1, 1 + State.Item2, State.Item3));
    }

    public MsgTypeCounter Visit(Query msg)
    {
        Console.WriteLine("got Query of " + msg.Value);
        return new MsgTypeCounter(Tuple.Create(State.Item1, 1 + State.Item2, State.Item3));
    }
}

Poi finalmente è possibile utilizzarlo in questo modo:

var msgs = new Msg[] { new Add(1), new Add(3), new Sub(4), new ReplyChannel(null) };
var counts = msgs.Aggregate(new MsgTypeVisitor(Tuple.Create(0, 0, 0)),
    (acc, x) => x.Accept(acc)).State;

Sì, la sua come ottusa come sembra, ma è così che si passa più messaggi di una classe in modo type-safe, e questo è anche il motivo per cui non si procede sindacati in C #;)

Un campo lungo, ma comunque ..

Io parto dal presupposto che discriminato-unione è F # per ADT (Abstract Data Type). Il che significa che il tipo potrebbe essere una delle molte cose.

Nel caso in cui ci sono due, si potrebbe provare e metterlo in una semplice classe generica con due parametri di tipo:

 public struct DiscriminatedUnion<T1,T2>
 {   
      public DiscriminatedUnion(T1 t1) { value = t1; }
      public DiscriminatedUnion(T2 t1) { value = t2; }


      public static implicit operator T1(DiscriminatedUnion<T1,T2> du) {return (T1)du.value; }
      public static implicit operator T2(DiscriminatedUnion<T1,T2> du) {return (T2)du.value; }

      object value;
 }

Per farlo funzionare per 3 o più, abbiamo bisogno di replicare questa classe di un certo numero di volte. Chiunque ha una soluzione per l'overloading di funzioni a seconda del tipo di runtime?

Se avete questo

type internal Either<'a, 'b> =
  | Left of 'a
  | Right of 'b

in F #, quindi il C # equivalente del CLR generato per la classe Either<'a, 'b> ha tipi interne come

internal  class _Left : Either<a, b>
{
     internal readonly a left1;
     internal _Left(a left1);
}

ciascuno con un tag, un getter e un metodo factory

internal const  int tag_Left = 0;
internal static  Either<a, b> Left(a Left1);
internal a Left1 {  get; }

più un discriminatore

internal int  Tag { get; }

e una serie di metodi per implementare le interfacce IStructuralEquatable, IComparable, IStructuralComparable

C'è un momento della compilazione checked discriminati tipo unione a discriminati unione in C #

private class ClearMessage
{
    public static readonly ClearMessage Instance = new ClearMessage();    
    private ClearMessage() { }
}

private class TryDequeueMessage 
{
    public static readonly TryDequeueMessage Instance = new TryDequeueMessage();    
    private TryDequeueMessage() { }
}

private class EnqueueMessage
{
    public TValue Item { get; private set; }    
    private EnqueueMessage(TValue item) { Item = item; }
}

Utilizzando l'unione discriminato potrebbe essere fatto nel seguente modo:

// New file
// Create an alias
using Message = Union<ClearMessage, TryDequeueMessage, EnqueMessage>;

int ProcessMessage(Message msg)
{
   return Message.Match(
      clear => 1,
      dequeue => 2,
      enqueue => 3);
}
Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top