Frage

Ich habe eine neue Klasse namens Schauspieler, die verarbeitet Nachrichten, die an Sie übergeben.Das problem habe ich ist es, herauszufinden, was ist der eleganteste Weg, um pass Verwandte, aber unterschiedliche Nachrichten an die Schauspieler.Meine erste Idee ist, die Vererbung verwenden, aber es scheint so aufgebläht aber es ist dringend-Typen, die eine bestimmte Anforderung.

Irgendwelche Ideen?

Beispiel

private abstract class QueueMessage { }

private class ClearMessage : QueueMessage 
{
    public static readonly ClearMessage Instance = new ClearMessage();

    private ClearMessage() { }
}

private class TryDequeueMessage : QueueMessage 
{
    public static readonly TryDequeueMessage Instance = new TryDequeueMessage();

    private TryDequeueMessage() { }
}

private class EnqueueMessage : QueueMessage 
{
    public TValue Item { get; private set; }

    private EnqueueMessage(TValue item)
    {
        Item = item;
    }
}

Actor-Klasse

/// <summary>Represents a callback method to be executed by an Actor.</summary>
/// <typeparam name="TReply">The type of reply.</typeparam>
/// <param name="reply">The reply made by the actor.</param>
public delegate void ActorReplyCallback<TReply>(TReply reply);

/// <summary>Represents an Actor which receives and processes messages in concurrent applications.</summary>
/// <typeparam name="TMessage">The type of message this actor accepts.</typeparam>
/// <typeparam name="TReply">The type of reply made by this actor.</typeparam>
public abstract class Actor<TMessage, TReply> : IDisposable
{
    /// <summary>The default total number of threads to process messages.</summary>
    private const Int32 DefaultThreadCount = 1;


    /// <summary>Used to serialize access to the message queue.</summary>
    private readonly Locker Locker;

    /// <summary>Stores the messages until they can be processed.</summary>
    private readonly System.Collections.Generic.Queue<Message> MessageQueue;

    /// <summary>Signals the actor thread to process a new message.</summary>
    private readonly ManualResetEvent PostEvent;

    /// <summary>This tells the actor thread to stop reading from the queue.</summary>
    private readonly ManualResetEvent DisposeEvent;

    /// <summary>Processes the messages posted to the actor.</summary>
    private readonly List<Thread> ActorThreads;


    /// <summary>Initializes a new instance of the Genex.Concurrency&lt;TRequest, TResponse&gt; class.</summary>
    public Actor() : this(DefaultThreadCount) { }

    /// <summary>Initializes a new instance of the Genex.Concurrency&lt;TRequest, TResponse&gt; class.</summary>
    /// <param name="thread_count"></param>
    public Actor(Int32 thread_count)
    {
        if (thread_count < 1) throw new ArgumentOutOfRangeException("thread_count", thread_count, "Must be 1 or greater.");

        Locker = new Locker();
        MessageQueue = new System.Collections.Generic.Queue<Message>();
        EnqueueEvent = new ManualResetEvent(true);
        PostEvent = new ManualResetEvent(false);
        DisposeEvent = new ManualResetEvent(true);
        ActorThreads = new List<Thread>();

        for (Int32 i = 0; i < thread_count; i++)
        {
            var thread = new Thread(ProcessMessages);
            thread.IsBackground = true;
            thread.Start();
            ActorThreads.Add(thread);
        }
    }


    /// <summary>Posts a message and waits for the reply.</summary>
    /// <param name="value">The message to post to the actor.</param>
    /// <returns>The reply from the actor.</returns>
    public TReply PostWithReply(TMessage message)
    {
        using (var wrapper = new Message(message))
        {
            lock (Locker) MessageQueue.Enqueue(wrapper);
            PostEvent.Set();
            wrapper.Channel.CompleteEvent.WaitOne();
            return wrapper.Channel.Value;
        }
    }

    /// <summary>Posts a message to the actor and executes the callback when the reply is received.</summary>
    /// <param name="value">The message to post to the actor.</param>
    /// <param name="callback">The callback that will be invoked once the replay is received.</param>
    public void PostWithAsyncReply(TMessage value, ActorReplyCallback<TReply> callback)
    {
        if (callback == null) throw new ArgumentNullException("callback");
        ThreadPool.QueueUserWorkItem(state => callback(PostWithReply(value)));
    }

    /// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
    public void Dispose()
    {
        if (DisposeEvent.WaitOne(10))
        {
            DisposeEvent.Reset();
            PostEvent.Set();

            foreach (var thread in ActorThreads)
            {
                thread.Join();
            }

            ((IDisposable)PostEvent).Dispose();
            ((IDisposable)DisposeEvent).Dispose();
        }
    }

    /// <summary>Processes a message posted to the actor.</summary>
    /// <param name="message">The message to be processed.</param>
    protected abstract void ProcessMessage(Message message);

    /// <summary>Dequeues the messages passes them to ProcessMessage.</summary>
    private void ProcessMessages()
    {
        while (PostEvent.WaitOne() && DisposeEvent.WaitOne(10))
        {
            var message = (Message)null;

            while (true)
            {
                lock (Locker)
                {
                    message = MessageQueue.Count > 0 ?
                        MessageQueue.Dequeue() :
                        null;

                    if (message == null)
                    {
                        PostEvent.Reset();
                        break;
                    }
                }

                try
                {
                    ProcessMessage(message);
                }
                catch
                {

                }
            }
        }
    }


    /// <summary>Represents a message that is passed to an actor.</summary>
    protected class Message : IDisposable
    {
        /// <summary>The actual value of this message.</summary>
        public TMessage Value { get; private set; }

        /// <summary>The channel used to give a reply to this message.</summary>
        public Channel Channel { get; private set; }


        /// <summary>Initializes a new instance of Genex.Concurrency.Message class.</summary>
        /// <param name="value">The actual value of the message.</param>
        public Message(TMessage value)
        {
            Value = value;
            Channel = new Channel();
        }


        /// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
        public void Dispose()
        {
            Channel.Dispose();
        }
    }

    /// <summary>Represents a channel used by an actor to reply to a message.</summary>         
    protected class Channel : IDisposable
    {
        /// <summary>The value of the reply.</summary>
        public TReply Value { get; private set; }

        /// <summary>Signifies that the message has been replied to.</summary>
        public ManualResetEvent CompleteEvent { get; private set; }


        /// <summary>Initializes a new instance of Genex.Concurrency.Channel class.</summary>
        public Channel()
        {
            CompleteEvent = new ManualResetEvent(false);
        }

        /// <summary>Reply to the message received.</summary>
        /// <param name="value">The value of the reply.</param>
        public void Reply(TReply value)
        {
            Value = value;
            CompleteEvent.Set();
        }

        /// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
        public void Dispose()
        {
            ((IDisposable)CompleteEvent).Dispose();
        }
    }
}
War es hilfreich?

Lösung

In Ihrem Beispiel Code implementieren Sie PostWithAsyncReply in Bezug auf PostWithReply. Das ist nicht ideal, weil es bedeutet, dass, wenn Sie PostWithAsyncReply nennen und der Schauspieler dauert eine Weile, sie zu handhaben, gibt es tatsächlich zwei Threads gebunden: die eine Ausführung die Schauspieler und die eine für sich bis zum Ende warten. Es wäre besser, die ein Thread die Ausführung der Schauspieler zu haben und dann im asynchronen Fall den Rückruf aufrufen. (Offensichtlich im synchronen Fall gibt es keine meidet das Anbinden von zwei Threads).

Update:

Mehr zu den oben: Sie einen Schauspieler mit einem Argument konstruieren es zu sagen, wie viele Threads zu laufen. Der Einfachheit halber alle Schauspieler läuft mit einem Thread annehmen (eigentlich eine recht gute Situation, weil Akteure dann interne Zustand ohne Verriegelung auf sie haben kann, als nur ein Thread es greift direkt auf).

Schauspieler A ruft Schauspieler B, auf eine Antwort warten. Schauspieler C. So, jetzt A und B nur Threads nennen warten, und C ist der einzige wirklich braucht, um die Anforderung, Schauspieler B zu behandeln gibt die CPU jede Arbeit zu tun. So viel zum Thema Multi-Threading! Aber das ist, was Sie bekommen, wenn Sie nach Antworten die ganze Zeit warten.

Okay, könnte man die Anzahl der Threads Sie in jedem Akteur starten erhöhen. Aber Sie würden beginnen sie sein, so könnten sie sitzen um nichts zu tun. Ein Stapel verbraucht viel Speicher und Kontextwechsel kann teuer werden.

So ist es besser, asynchron Nachrichten zu senden, mit einem Callback-Mechanismus, so dass Sie das fertige Ergebnis abholen können. Das Problem bei der Implementierung ist, dass Sie einen anderen Thread aus dem Thread-Pool greifen, rein zu sitzen und warten. So bewerben Sie sich im Grunde die Abhilfe der die Anzahl der Threads erhöhen. Sie ordnen einen Thread, um die Aufgabe von niemals ausgeführt .

Es wäre besser, PostWithReply in Bezug auf PostWithAsyncReply zu implementieren, das heißt das Gegenteil umgekehrt. Die asynchrone Version ist Low-Level. Aufbauend auf meinen Delegierten-basiertes Beispiel (weil es weniger Eingabe von Code beinhaltet!):

private bool InsertCoinImpl(int value) 
{
    // only accept dimes/10p/whatever it is in euros
    return (value == 10);
}

public void InsertCoin(int value, Action<bool> accepted)
{
    Submit(() => accepted(InsertCoinImpl(value)));
}

So die private Implementierung gibt einen bool. Die öffentliche asynchrone Methode akzeptiert eine Aktion, die den Rückgabewert erhalten wird; sowohl die private Implementierung und die Callback-Aktion auf dem gleichen Thread ausgeführt.

Hoffentlich wird die Notwendigkeit synchron zu warten, wird die Minderheit Fall sein. Aber wenn Sie es brauchen, könnte es durch eine Hilfsmethode geliefert werden, ganz allgemeine Zwecke und nicht gebunden an einen bestimmten Schauspieler oder Nachrichtentyp:

public static T Wait<T>(Action<Action<T>> activity)
{
    T result = default(T);
    var finished = new EventWaitHandle(false, EventResetMode.AutoReset);

    activity(r =>
        {
            result = r;
            finished.Set();
        });

    finished.WaitOne();
    return result;
}

So, jetzt in einem anderen Schauspieler können wir sagen:

bool accepted = Helpers.Wait<bool>(r => chocMachine.InsertCoin(5, r));

Das Argument Typ zu Wait unnötig sein kann, habe nicht versucht, irgendetwas davon zu kompilieren. Aber Wait grundsätzlich magics-up einen Rückruf für Sie, so dass Sie es zu einem gewissen asynchroner Methode passieren können, und auf der Außenseite einfach zurück, was auch immer an die Callback als Rückgabewert übergeben wurde. Beachten Sie, dass das Lambda Sie Wait passieren noch führt tatsächlich auf dem gleichen Thread, dass Wait genannt.

Wir haben jetzt Sie zu unserem regulären Programm zurück ...

Was das eigentliche Problem, das Sie gefragt, senden Sie eine Nachricht an einen Schauspieler, um es etwas zu tun. Die Delegierten sind hier hilfreich. Sie lassen Sie effektiv die Compiler erhalten Sie mit einigen Daten, die eine Klasse zu generieren, einen Konstruktor, dass Sie nicht einmal anrufen müssen explizit und auch ein Verfahren. Wenn Sie sich mit einem Bündel von kleinen Klassen, wechseln Sie auf die Delegierten schreiben.

abstract class Actor
{
    Queue<Action> _messages = new Queue<Action>();

    protected void Submit(Action action)
    {
        // take out a lock of course
        _messages.Enqueue(action);
    }

    // also a "run" that reads and executes the 
    // message delegates on background threads
}

Nun wird ein spezifisches abgeleitet Schauspieler folgt diesem Muster:

class ChocolateMachineActor : Actor
{
    private void InsertCoinImpl(int value) 
    {
        // whatever...
    }

    public void InsertCoin(int value)
    {
        Submit(() => InsertCoinImpl(value));
    }
}

So eine Nachricht an den Schauspieler zu senden, die Sie gerade die öffentlichen Methoden aufrufen. Die private Impl Methode macht die eigentliche Arbeit. Keine Notwendigkeit, eine Reihe von Nachrichtenklassen von Hand zu schreiben.

Natürlich habe ich die Sachen über die Beantwortung ausgelassen, aber das kann alles mit mehreren Parametern erfolgen. (Siehe Update oben).

Andere Tipps

Steve Gilham zusammengefasst, wie der Compiler tatsächlich Griff Gewerkschaften benachteiligt. Für Ihren eigenen Code, können Sie eine vereinfachte Version davon in Betracht ziehen. In Anbetracht der folgenden F #:

type QueueMessage<T> = ClearMessage | TryDequeueMessage | EnqueueMessage of T

Hier ist eine Möglichkeit, es in C # zu emulieren:

public enum MessageType { ClearMessage, TryDequeueMessage, EnqueueMessage }

public abstract class QueueMessage<T>
{
    // prevents unwanted subclassing
    private QueueMessage() { }

    public abstract MessageType MessageType { get; }

    /// <summary>
    /// Only applies to EnqueueMessages
    /// </summary>
    public abstract T Item { get; }

    public static QueueMessage<T> MakeClearMessage() { return new ClearMessage(); }
    public static QueueMessage<T> MakeTryDequeueMessage() { return new TryDequeueMessage(); }
    public static QueueMessage<T> MakeEnqueueMessage(T item) { return new EnqueueMessage(item); }


    private sealed class ClearMessage : QueueMessage<T>
    {
        public ClearMessage() { }

        public override MessageType MessageType
        {
            get { return MessageType.ClearMessage; }
        }

        /// <summary>
        /// Not implemented by this subclass
        /// </summary>
        public override T Item
        {
            get { throw new NotImplementedException(); }
        }
    }

    private sealed class TryDequeueMessage : QueueMessage<T>
    {
        public TryDequeueMessage() { }

        public override MessageType MessageType
        {
            get { return MessageType.TryDequeueMessage; }
        }

        /// <summary>
        /// Not implemented by this subclass
        /// </summary>
        public override T Item
        {
            get { throw new NotImplementedException(); }
        }
    }

    private sealed class EnqueueMessage : QueueMessage<T>
    {
        private T item;
        public EnqueueMessage(T item) { this.item = item; }

        public override MessageType MessageType
        {
            get { return MessageType.EnqueueMessage; }
        }

        /// <summary>
        /// Gets the item to be enqueued
        /// </summary>
        public override T Item { get { return item; } }
    }
}

Jetzt im Code, die eine QueueMessage gegeben ist, können Sie schalten die MessageType Eigenschaft anstelle von Pattern-Matching, und stellen Sie sicher, dass Sie die Item Eigenschaft Zugriff nur auf EnqueueMessages.

Bearbeiten

Hier ist eine andere Alternative, basierend auf Julias Code. Ich habe so zu rationalisieren Dinge versucht, dass es eine nutzbare Schnittstelle von C # bekam, though. Dies ist bevorzugt gegenüber der Vorversion, dass Sie keine MethodNotImplemented Ausnahme erhalten können.

public abstract class QueueMessage<T>
{
    // prevents unwanted subclassing
    private QueueMessage() { }

    public abstract TReturn Match<TReturn>(Func<TReturn> clearCase, Func<TReturn> tryDequeueCase, Func<T, TReturn> enqueueCase);

    public static QueueMessage<T> MakeClearMessage() { return new ClearMessage(); }
    public static QueueMessage<T> MakeTryDequeueMessage() { return new TryDequeueMessage(); }
    public static QueueMessage<T> MakeEnqueueMessage(T item) { return new EnqueueMessage(item); }

    private sealed class ClearMessage : QueueMessage<T>
    {
        public ClearMessage() { }

        public override TReturn Match<TReturn>(Func<TReturn> clearCase, Func<TReturn> tryDequeueCase, Func<T, TReturn> enqueueCase)
        {
            return clearCase();
        }
    }

    private sealed class TryDequeueMessage : QueueMessage<T>
    {
        public TryDequeueMessage() { }

        public override TReturn Match<TReturn>(Func<TReturn> clearCase, Func<TReturn> tryDequeueCase, Func<T, TReturn> enqueueCase)
        {
            return tryDequeueCase();
        }
    }

    private sealed class EnqueueMessage : QueueMessage<T>
    {
        private T item;
        public EnqueueMessage(T item) { this.item = item; }

        public override TReturn Match<TReturn>(Func<TReturn> clearCase, Func<TReturn> tryDequeueCase, Func<T, TReturn> enqueueCase)
        {
            return enqueueCase(item);
        }
    }
}

Sie würden diesen Code verwenden wie folgt aus:

public class MessageUserTest
{
    public void Use()
    {
        // your code to get a message here...
        QueueMessage<string> msg = null; 

        // emulate pattern matching, but without constructor names
        int i =
            msg.Match(
                clearCase:      () => -1,
                tryDequeueCase: () => -2,
                enqueueCase:     s =>  s.Length);
    }
}

Union Typen und Mustervergleich Karte ziemlich direkt an das Besuchermuster, ich habe ein paar Mal darüber geschrieben vor:

Also, wenn Sie Nachrichten mit vielen verschiedenen Arten, Sie sitzen fest Durchführung des Besuchermuster übergeben werden sollen.

(Achtung, nicht getesteten Code vor, aber sollen Sie eine Vorstellung davon, wie es geht)

Lassen Sie uns sagen, wir haben so etwas wie folgt aus:

type msg =
    | Add of int
    | Sub of int
    | Query of ReplyChannel<int>


let rec counts = function
    | [] -> (0, 0, 0)
    | Add(_)::xs -> let (a, b, c) = counts xs in (a + 1, b, c)
    | Sub(_)::xs -> let (a, b, c) = counts xs in (a, b + 1, c)
    | Query(_)::xs -> let (a, b, c) = counts xs in (a, b, c + 1)

Mit diesem sperrig C # -Code am Ende:

interface IMsgVisitor<T>
{
    T Visit(Add msg);
    T Visit(Sub msg);
    T Visit(Query msg);
}

abstract class Msg
{
    public abstract T Accept<T>(IMsgVistor<T> visitor)
}

class Add : Msg
{
    public readonly int Value;
    public Add(int value) { this.Value = value; }
    public override T Accept<T>(IMsgVisitor<T> visitor) { return visitor.Visit(this); }
}

class Sub : Msg
{
    public readonly int Value;
    public Add(int value) { this.Value = value; }
    public override T Accept<T>(IMsgVisitor<T> visitor) { return visitor.Visit(this); }
}

class Query : Msg
{
    public readonly ReplyChannel<int> Value;
    public Add(ReplyChannel<int> value) { this.Value = value; }
    public override T Accept<T>(IMsgVisitor<T> visitor) { return visitor.Visit(this); }
}

Nun, wenn Sie etwas mit der Nachricht tun möchten, müssen Sie einen Besucher implementieren:

class MsgTypeCounter : IMsgVisitor<MsgTypeCounter>
{
    public readonly Tuple<int, int, int> State;    

    public MsgTypeCounter(Tuple<int, int, int> state) { this.State = state; }

    public MsgTypeCounter Visit(Add msg)
    {
        Console.WriteLine("got Add of " + msg.Value);
        return new MsgTypeCounter(Tuple.Create(1 + State.Item1, State.Item2, State.Item3));
    }

    public MsgTypeCounter Visit(Sub msg)
    {
        Console.WriteLine("got Sub of " + msg.Value);
        return new MsgTypeCounter(Tuple.Create(State.Item1, 1 + State.Item2, State.Item3));
    }

    public MsgTypeCounter Visit(Query msg)
    {
        Console.WriteLine("got Query of " + msg.Value);
        return new MsgTypeCounter(Tuple.Create(State.Item1, 1 + State.Item2, State.Item3));
    }
}

Dann endlich können Sie es wie folgt verwendet werden:

var msgs = new Msg[] { new Add(1), new Add(3), new Sub(4), new ReplyChannel(null) };
var counts = msgs.Aggregate(new MsgTypeVisitor(Tuple.Create(0, 0, 0)),
    (acc, x) => x.Accept(acc)).State;

Ja, es ist so stumpf wie es scheint, aber das ist, wie Sie mehrere Nachrichten eine Klasse in einer typsichere Weise passieren, und das auch ist, warum wir nicht implementieren Gewerkschaften in C #;)

Ein langer Schuss, aber trotzdem ..

Ich bin, der diskriminiert gewerkschafts unter der Annahme, F # für ADT (Abstract Datentyp). Was bedeutet, dass der Typ ein von mehreren Dingen sein könnte.

Im Fall gibt es zwei, könnten Sie versuchen, und stecken es in einer einfachen generischen Klasse mit zwei Typparametern:

 public struct DiscriminatedUnion<T1,T2>
 {   
      public DiscriminatedUnion(T1 t1) { value = t1; }
      public DiscriminatedUnion(T2 t1) { value = t2; }


      public static implicit operator T1(DiscriminatedUnion<T1,T2> du) {return (T1)du.value; }
      public static implicit operator T2(DiscriminatedUnion<T1,T2> du) {return (T2)du.value; }

      object value;
 }

Um es für 3 oder mehr funktioniert, müssen wir diese Klasse einige Male wiederholen. Jeder hat eine Lösung für die Funktion Überlastung in Abhängigkeit von dem Laufzeittyp?

Wenn Sie diese

type internal Either<'a, 'b> =
  | Left of 'a
  | Right of 'b

in F#, dann das C# - äquivalent der CLR generiert für Klasse Either<'a, 'b> hat inneren Arten wie

internal  class _Left : Either<a, b>
{
     internal readonly a left1;
     internal _Left(a left1);
}

jedes mit einem tag, eine getter-und eine factory-Methode

internal const  int tag_Left = 0;
internal static  Either<a, b> Left(a Left1);
internal a Left1 {  get; }

plus ein Diskriminator

internal int  Tag { get; }

und eine Reihe von Methoden zum implementieren von Schnittstellen IStructuralEquatable, IComparable, IStructuralComparable

Es gibt eine Kompilierung-diskriminierte Union-Typen unter diskriminierter Union in C # überprüft

private class ClearMessage
{
    public static readonly ClearMessage Instance = new ClearMessage();    
    private ClearMessage() { }
}

private class TryDequeueMessage 
{
    public static readonly TryDequeueMessage Instance = new TryDequeueMessage();    
    private TryDequeueMessage() { }
}

private class EnqueueMessage
{
    public TValue Item { get; private set; }    
    private EnqueueMessage(TValue item) { Item = item; }
}

Mit der diskriminierte Union getan werden könnte, wie folgt:

// New file
// Create an alias
using Message = Union<ClearMessage, TryDequeueMessage, EnqueMessage>;

int ProcessMessage(Message msg)
{
   return Message.Match(
      clear => 1,
      dequeue => 2,
      enqueue => 3);
}
Lizenziert unter: CC-BY-SA mit Zuschreibung
Nicht verbunden mit StackOverflow
scroll top