Question

J'ai créé une nouvelle classe appelée acteur qui traite les messages qui lui sont transmis. Le problème que je suis en cours d'exécution en est de déterminer quelle est la manière la plus élégante de transmettre des messages liés mais différents à l'acteur. Ma première idée est d'utiliser l'héritage, mais il semble si pléthorique, mais il est fortement types qui est une exigence définie.

Vous avez des idées?

Exemple

private abstract class QueueMessage { }

private class ClearMessage : QueueMessage 
{
    public static readonly ClearMessage Instance = new ClearMessage();

    private ClearMessage() { }
}

private class TryDequeueMessage : QueueMessage 
{
    public static readonly TryDequeueMessage Instance = new TryDequeueMessage();

    private TryDequeueMessage() { }
}

private class EnqueueMessage : QueueMessage 
{
    public TValue Item { get; private set; }

    private EnqueueMessage(TValue item)
    {
        Item = item;
    }
}

Acteur classe

/// <summary>Represents a callback method to be executed by an Actor.</summary>
/// <typeparam name="TReply">The type of reply.</typeparam>
/// <param name="reply">The reply made by the actor.</param>
public delegate void ActorReplyCallback<TReply>(TReply reply);

/// <summary>Represents an Actor which receives and processes messages in concurrent applications.</summary>
/// <typeparam name="TMessage">The type of message this actor accepts.</typeparam>
/// <typeparam name="TReply">The type of reply made by this actor.</typeparam>
public abstract class Actor<TMessage, TReply> : IDisposable
{
    /// <summary>The default total number of threads to process messages.</summary>
    private const Int32 DefaultThreadCount = 1;


    /// <summary>Used to serialize access to the message queue.</summary>
    private readonly Locker Locker;

    /// <summary>Stores the messages until they can be processed.</summary>
    private readonly System.Collections.Generic.Queue<Message> MessageQueue;

    /// <summary>Signals the actor thread to process a new message.</summary>
    private readonly ManualResetEvent PostEvent;

    /// <summary>This tells the actor thread to stop reading from the queue.</summary>
    private readonly ManualResetEvent DisposeEvent;

    /// <summary>Processes the messages posted to the actor.</summary>
    private readonly List<Thread> ActorThreads;


    /// <summary>Initializes a new instance of the Genex.Concurrency&lt;TRequest, TResponse&gt; class.</summary>
    public Actor() : this(DefaultThreadCount) { }

    /// <summary>Initializes a new instance of the Genex.Concurrency&lt;TRequest, TResponse&gt; class.</summary>
    /// <param name="thread_count"></param>
    public Actor(Int32 thread_count)
    {
        if (thread_count < 1) throw new ArgumentOutOfRangeException("thread_count", thread_count, "Must be 1 or greater.");

        Locker = new Locker();
        MessageQueue = new System.Collections.Generic.Queue<Message>();
        EnqueueEvent = new ManualResetEvent(true);
        PostEvent = new ManualResetEvent(false);
        DisposeEvent = new ManualResetEvent(true);
        ActorThreads = new List<Thread>();

        for (Int32 i = 0; i < thread_count; i++)
        {
            var thread = new Thread(ProcessMessages);
            thread.IsBackground = true;
            thread.Start();
            ActorThreads.Add(thread);
        }
    }


    /// <summary>Posts a message and waits for the reply.</summary>
    /// <param name="value">The message to post to the actor.</param>
    /// <returns>The reply from the actor.</returns>
    public TReply PostWithReply(TMessage message)
    {
        using (var wrapper = new Message(message))
        {
            lock (Locker) MessageQueue.Enqueue(wrapper);
            PostEvent.Set();
            wrapper.Channel.CompleteEvent.WaitOne();
            return wrapper.Channel.Value;
        }
    }

    /// <summary>Posts a message to the actor and executes the callback when the reply is received.</summary>
    /// <param name="value">The message to post to the actor.</param>
    /// <param name="callback">The callback that will be invoked once the replay is received.</param>
    public void PostWithAsyncReply(TMessage value, ActorReplyCallback<TReply> callback)
    {
        if (callback == null) throw new ArgumentNullException("callback");
        ThreadPool.QueueUserWorkItem(state => callback(PostWithReply(value)));
    }

    /// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
    public void Dispose()
    {
        if (DisposeEvent.WaitOne(10))
        {
            DisposeEvent.Reset();
            PostEvent.Set();

            foreach (var thread in ActorThreads)
            {
                thread.Join();
            }

            ((IDisposable)PostEvent).Dispose();
            ((IDisposable)DisposeEvent).Dispose();
        }
    }

    /// <summary>Processes a message posted to the actor.</summary>
    /// <param name="message">The message to be processed.</param>
    protected abstract void ProcessMessage(Message message);

    /// <summary>Dequeues the messages passes them to ProcessMessage.</summary>
    private void ProcessMessages()
    {
        while (PostEvent.WaitOne() && DisposeEvent.WaitOne(10))
        {
            var message = (Message)null;

            while (true)
            {
                lock (Locker)
                {
                    message = MessageQueue.Count > 0 ?
                        MessageQueue.Dequeue() :
                        null;

                    if (message == null)
                    {
                        PostEvent.Reset();
                        break;
                    }
                }

                try
                {
                    ProcessMessage(message);
                }
                catch
                {

                }
            }
        }
    }


    /// <summary>Represents a message that is passed to an actor.</summary>
    protected class Message : IDisposable
    {
        /// <summary>The actual value of this message.</summary>
        public TMessage Value { get; private set; }

        /// <summary>The channel used to give a reply to this message.</summary>
        public Channel Channel { get; private set; }


        /// <summary>Initializes a new instance of Genex.Concurrency.Message class.</summary>
        /// <param name="value">The actual value of the message.</param>
        public Message(TMessage value)
        {
            Value = value;
            Channel = new Channel();
        }


        /// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
        public void Dispose()
        {
            Channel.Dispose();
        }
    }

    /// <summary>Represents a channel used by an actor to reply to a message.</summary>         
    protected class Channel : IDisposable
    {
        /// <summary>The value of the reply.</summary>
        public TReply Value { get; private set; }

        /// <summary>Signifies that the message has been replied to.</summary>
        public ManualResetEvent CompleteEvent { get; private set; }


        /// <summary>Initializes a new instance of Genex.Concurrency.Channel class.</summary>
        public Channel()
        {
            CompleteEvent = new ManualResetEvent(false);
        }

        /// <summary>Reply to the message received.</summary>
        /// <param name="value">The value of the reply.</param>
        public void Reply(TReply value)
        {
            Value = value;
            CompleteEvent.Set();
        }

        /// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
        public void Dispose()
        {
            ((IDisposable)CompleteEvent).Dispose();
        }
    }
}
Était-ce utile?

La solution

Dans votre exemple de code, vous implémentez PostWithAsyncReply en termes de PostWithReply. Ce n'est pas idéal, car cela signifie que lorsque vous appelez PostWithAsyncReply et l'acteur prend un certain temps pour le gérer, il y a effectivement deux fils attachés: celui d'exécuter l'acteur et celui d'attendre qu'elle se termine. Il serait préférable d'avoir un fil d'exécuter l'acteur, puis en appelant la fonction de rappel dans le cas asynchrone. (Bien entendu dans le cas synchrone, il n'y a pas d'éviter l'immobilisation de deux fils).

Mise à jour:

En savoir plus sur ce qui précède: vous construisez un acteur avec un argument qui indique combien de threads pour exécuter. Par souci de simplicité suppose chaque acteur fonctionne avec un fil (en fait une très bonne situation parce que les acteurs peuvent alors l'état interne sans verrouillage sur elle, comme un seul thread accède directement).

Acteur Un acteur appelle B, dans l'attente d'une réponse. Pour faire face à la demande, l'acteur B doit appeler l'acteur C. Alors maintenant A et B de fils seulement sont en attente, et C est de la seule donnant réellement l'unité centrale de traitement tout à faire. Tant pour le multi-threading! Mais c'est ce que vous obtenez si vous attendez des réponses tout le temps.

D'accord, vous pouvez augmenter le nombre de threads que vous commencez à chaque acteur. Mais vous seriez de les commencer afin qu'ils puissent rester à ne rien faire. Une pile utilise beaucoup de mémoire, et le changement de contexte peut être coûteux.

Il est donc préférable d'envoyer des messages de manière asynchrone, avec un mécanisme de rappel afin que vous puissiez ramasser le résultat final. Le problème avec votre mise en œuvre est que vous prenez un autre thread du pool de fil, uniquement pour asseoir et attendre. Donc, vous appliquez essentiellement la solution de contournement d'augmenter le nombre de threads. Vous allouez un fil à la tâche de jamais courir .

Il serait préférable de mettre en œuvre PostWithReply en termes de PostWithAsyncReply, à savoir le tour de sens inverse. La version asynchrone est faible niveau. Fort de mon exemple à base de délégué (car elle implique moins de frappe de code!):

private bool InsertCoinImpl(int value) 
{
    // only accept dimes/10p/whatever it is in euros
    return (value == 10);
}

public void InsertCoin(int value, Action<bool> accepted)
{
    Submit(() => accepted(InsertCoinImpl(value)));
}

Ainsi, la mise en œuvre privée retourne un bool. La méthode asynchrone public accepte une action qui recevra la valeur de retour; à la fois la mise en œuvre privée et l'action de rappel sont exécutées sur le même fil.

Si tout va bien la nécessité d'attendre synchroniquement va être le cas minoritaire. Mais quand vous en avez besoin, il pourrait être fourni par une méthode d'assistance, le but tout à fait général et ne sont pas liés à tout type d'acteur ou d'un message spécifique:

public static T Wait<T>(Action<Action<T>> activity)
{
    T result = default(T);
    var finished = new EventWaitHandle(false, EventResetMode.AutoReset);

    activity(r =>
        {
            result = r;
            finished.Set();
        });

    finished.WaitOne();
    return result;
}

Alors maintenant dans un autre acteur, on peut dire:

bool accepted = Helpers.Wait<bool>(r => chocMachine.InsertCoin(5, r));

L'argument de type à Wait peut être inutile, n'ont pas essayé de compiler tout cela. Mais Wait essentiellement magies-up un rappel pour vous, afin que vous puissiez passer à une méthode asynchrone, et à l'extérieur, vous obtenez tout juste ce qui a été passé à la fonction de rappel comme valeur de retour. Notez que le lambda vous passez à Wait encore exécute en fait sur le même thread qui a appelé Wait.

Nous vous Retournons maintenant à notre programme régulier ...

En ce qui concerne le problème réel que vous avez parlé, vous envoyez un message à un acteur pour le faire faire quelque chose. Les délégués sont utiles ici. Ils vous permettent d'obtenir efficacement le compilateur pour vous générer une classe avec des données, un constructeur que vous n'avez même pas appeler explicitement et aussi une méthode. Si vous êtes d'avoir à écrire un tas de petites cours, passer aux délégués.

abstract class Actor
{
    Queue<Action> _messages = new Queue<Action>();

    protected void Submit(Action action)
    {
        // take out a lock of course
        _messages.Enqueue(action);
    }

    // also a "run" that reads and executes the 
    // message delegates on background threads
}

un acteur dérivé spécifique suit ce schéma:

class ChocolateMachineActor : Actor
{
    private void InsertCoinImpl(int value) 
    {
        // whatever...
    }

    public void InsertCoin(int value)
    {
        Submit(() => InsertCoinImpl(value));
    }
}

Donc, pour envoyer un message à l'acteur, vous appelez simplement les méthodes publiques. La méthode Impl privée fait le travail réel. Pas besoin d'écrire un tas de classes de messages à la main.

Il est évident que je l'ai laissé de côté les choses au sujet de répondre, mais cela peut être fait avec plus de paramètres. (Voir ci-dessus mise à jour).

Autres conseils

Steve Gilham résumé comment le compilateur gère en fait les syndicats discriminés. Pour votre propre code, vous pourriez envisager une version simplifiée de cela. Dans le cas suivant F #:

type QueueMessage<T> = ClearMessage | TryDequeueMessage | EnqueueMessage of T

Voici une façon d'imiter en C #:

public enum MessageType { ClearMessage, TryDequeueMessage, EnqueueMessage }

public abstract class QueueMessage<T>
{
    // prevents unwanted subclassing
    private QueueMessage() { }

    public abstract MessageType MessageType { get; }

    /// <summary>
    /// Only applies to EnqueueMessages
    /// </summary>
    public abstract T Item { get; }

    public static QueueMessage<T> MakeClearMessage() { return new ClearMessage(); }
    public static QueueMessage<T> MakeTryDequeueMessage() { return new TryDequeueMessage(); }
    public static QueueMessage<T> MakeEnqueueMessage(T item) { return new EnqueueMessage(item); }


    private sealed class ClearMessage : QueueMessage<T>
    {
        public ClearMessage() { }

        public override MessageType MessageType
        {
            get { return MessageType.ClearMessage; }
        }

        /// <summary>
        /// Not implemented by this subclass
        /// </summary>
        public override T Item
        {
            get { throw new NotImplementedException(); }
        }
    }

    private sealed class TryDequeueMessage : QueueMessage<T>
    {
        public TryDequeueMessage() { }

        public override MessageType MessageType
        {
            get { return MessageType.TryDequeueMessage; }
        }

        /// <summary>
        /// Not implemented by this subclass
        /// </summary>
        public override T Item
        {
            get { throw new NotImplementedException(); }
        }
    }

    private sealed class EnqueueMessage : QueueMessage<T>
    {
        private T item;
        public EnqueueMessage(T item) { this.item = item; }

        public override MessageType MessageType
        {
            get { return MessageType.EnqueueMessage; }
        }

        /// <summary>
        /// Gets the item to be enqueued
        /// </summary>
        public override T Item { get { return item; } }
    }
}

Maintenant, dans le code qui est donné un QueueMessage, vous pouvez activer la propriété MessageType au lieu de correspondance de motif, et assurez-vous que vous accédez à la propriété Item uniquement sur EnqueueMessages.

EDIT

Voici une autre alternative, basée sur le code de Juliette. J'ai essayé de simplifier les choses afin qu'il a une interface plus utilisable à partir de C #, cependant. Ceci est préférable à la version précédente que vous ne pouvez pas obtenir une exception MethodNotImplemented.

public abstract class QueueMessage<T>
{
    // prevents unwanted subclassing
    private QueueMessage() { }

    public abstract TReturn Match<TReturn>(Func<TReturn> clearCase, Func<TReturn> tryDequeueCase, Func<T, TReturn> enqueueCase);

    public static QueueMessage<T> MakeClearMessage() { return new ClearMessage(); }
    public static QueueMessage<T> MakeTryDequeueMessage() { return new TryDequeueMessage(); }
    public static QueueMessage<T> MakeEnqueueMessage(T item) { return new EnqueueMessage(item); }

    private sealed class ClearMessage : QueueMessage<T>
    {
        public ClearMessage() { }

        public override TReturn Match<TReturn>(Func<TReturn> clearCase, Func<TReturn> tryDequeueCase, Func<T, TReturn> enqueueCase)
        {
            return clearCase();
        }
    }

    private sealed class TryDequeueMessage : QueueMessage<T>
    {
        public TryDequeueMessage() { }

        public override TReturn Match<TReturn>(Func<TReturn> clearCase, Func<TReturn> tryDequeueCase, Func<T, TReturn> enqueueCase)
        {
            return tryDequeueCase();
        }
    }

    private sealed class EnqueueMessage : QueueMessage<T>
    {
        private T item;
        public EnqueueMessage(T item) { this.item = item; }

        public override TReturn Match<TReturn>(Func<TReturn> clearCase, Func<TReturn> tryDequeueCase, Func<T, TReturn> enqueueCase)
        {
            return enqueueCase(item);
        }
    }
}

Vous utiliseriez ce code comme ceci:

public class MessageUserTest
{
    public void Use()
    {
        // your code to get a message here...
        QueueMessage<string> msg = null; 

        // emulate pattern matching, but without constructor names
        int i =
            msg.Match(
                clearCase:      () => -1,
                tryDequeueCase: () => -2,
                enqueueCase:     s =>  s.Length);
    }
}

Types de l'Union et la carte correspondant à joli motif directement sur le modèle des visiteurs, j'ai posté sur ce quelques fois avant:

Donc, si vous voulez passer des messages avec beaucoup de différents types, vous êtes coincé mise en œuvre du schéma de visiteur.

(avertissement, code non testé avant, mais devrait vous donner une idée de la façon dont son fait)

Disons que nous avons quelque chose comme ceci:

type msg =
    | Add of int
    | Sub of int
    | Query of ReplyChannel<int>


let rec counts = function
    | [] -> (0, 0, 0)
    | Add(_)::xs -> let (a, b, c) = counts xs in (a + 1, b, c)
    | Sub(_)::xs -> let (a, b, c) = counts xs in (a, b + 1, c)
    | Query(_)::xs -> let (a, b, c) = counts xs in (a, b, c + 1)

Vous vous retrouvez avec ce volumineux code C #:

interface IMsgVisitor<T>
{
    T Visit(Add msg);
    T Visit(Sub msg);
    T Visit(Query msg);
}

abstract class Msg
{
    public abstract T Accept<T>(IMsgVistor<T> visitor)
}

class Add : Msg
{
    public readonly int Value;
    public Add(int value) { this.Value = value; }
    public override T Accept<T>(IMsgVisitor<T> visitor) { return visitor.Visit(this); }
}

class Sub : Msg
{
    public readonly int Value;
    public Add(int value) { this.Value = value; }
    public override T Accept<T>(IMsgVisitor<T> visitor) { return visitor.Visit(this); }
}

class Query : Msg
{
    public readonly ReplyChannel<int> Value;
    public Add(ReplyChannel<int> value) { this.Value = value; }
    public override T Accept<T>(IMsgVisitor<T> visitor) { return visitor.Visit(this); }
}

Maintenant, chaque fois que vous voulez faire quelque chose avec le message, vous avez besoin de mettre en œuvre un visiteur:

class MsgTypeCounter : IMsgVisitor<MsgTypeCounter>
{
    public readonly Tuple<int, int, int> State;    

    public MsgTypeCounter(Tuple<int, int, int> state) { this.State = state; }

    public MsgTypeCounter Visit(Add msg)
    {
        Console.WriteLine("got Add of " + msg.Value);
        return new MsgTypeCounter(Tuple.Create(1 + State.Item1, State.Item2, State.Item3));
    }

    public MsgTypeCounter Visit(Sub msg)
    {
        Console.WriteLine("got Sub of " + msg.Value);
        return new MsgTypeCounter(Tuple.Create(State.Item1, 1 + State.Item2, State.Item3));
    }

    public MsgTypeCounter Visit(Query msg)
    {
        Console.WriteLine("got Query of " + msg.Value);
        return new MsgTypeCounter(Tuple.Create(State.Item1, 1 + State.Item2, State.Item3));
    }
}

Alors, enfin, vous pouvez l'utiliser comme ceci:

var msgs = new Msg[] { new Add(1), new Add(3), new Sub(4), new ReplyChannel(null) };
var counts = msgs.Aggregate(new MsgTypeVisitor(Tuple.Create(0, 0, 0)),
    (acc, x) => x.Accept(acc)).State;

Oui, son aussi obtus que cela puisse paraître, mais c'est la façon dont vous passez plusieurs messages d'une classe d'une manière de type sécurisé, et qui est aussi la raison pour laquelle nous ne mettons pas en œuvre les syndicats en C #;)

Un tir, mais de toute façon ..

Je suppose que discriminées union est F # pour ADT (Résumé Type de données). Ce qui signifie que le type pourrait être l'une de plusieurs choses.

Dans le cas où il y a deux, vous pouvez essayer de le mettre dans une classe simple générique avec deux paramètres de type:

 public struct DiscriminatedUnion<T1,T2>
 {   
      public DiscriminatedUnion(T1 t1) { value = t1; }
      public DiscriminatedUnion(T2 t1) { value = t2; }


      public static implicit operator T1(DiscriminatedUnion<T1,T2> du) {return (T1)du.value; }
      public static implicit operator T2(DiscriminatedUnion<T1,T2> du) {return (T2)du.value; }

      object value;
 }

Pour le faire fonctionner pour 3 ou plus, nous avons besoin de reproduire cette classe un certain nombre de fois. Toute personne a une solution pour la surcharge de fonction en fonction du type d'exécution?

Si vous avez cette

type internal Either<'a, 'b> =
  | Left of 'a
  | Right of 'b

en fa #, puis le C # équivalent du CLR généré pour Either<'a, 'b> de classe a types internes comme

internal  class _Left : Either<a, b>
{
     internal readonly a left1;
     internal _Left(a left1);
}

chacun avec une étiquette, un getter et une méthode de fabrication

internal const  int tag_Left = 0;
internal static  Either<a, b> Left(a Left1);
internal a Left1 {  get; }

ainsi qu'un discriminateur

internal int  Tag { get; }

et une série de méthodes pour mettre en oeuvre des interfaces IStructuralEquatable, IComparable, IStructuralComparable

Il y a une compilation discriminé type contrôlée Cette union union discriminée C #

private class ClearMessage
{
    public static readonly ClearMessage Instance = new ClearMessage();    
    private ClearMessage() { }
}

private class TryDequeueMessage 
{
    public static readonly TryDequeueMessage Instance = new TryDequeueMessage();    
    private TryDequeueMessage() { }
}

private class EnqueueMessage
{
    public TValue Item { get; private set; }    
    private EnqueueMessage(TValue item) { Item = item; }
}

Utilisation de l'union de discrimination peut se faire comme suit:

// New file
// Create an alias
using Message = Union<ClearMessage, TryDequeueMessage, EnqueMessage>;

int ProcessMessage(Message msg)
{
   return Message.Match(
      clear => 1,
      dequeue => 2,
      enqueue => 3);
}
Licencié sous: CC-BY-SA avec attribution
Non affilié à StackOverflow
scroll top