Как я могу дублировать тип дискриминируемого объединения F# в C#?

StackOverflow https://stackoverflow.com/questions/2320919

  •  22-09-2019
  •  | 
  •  

Вопрос

Я создал новый класс под названием Actor, который обрабатывает передаваемые ему сообщения.Проблема, с которой я сталкиваюсь, заключается в том, чтобы выяснить, какой самый элегантный способ передать связанные, но разные сообщения Актеру.Моя первая идея — использовать наследование, но оно кажется таким раздутым, но оно строго типизировано, что является определенным требованием.

Есть идеи?

Пример

private abstract class QueueMessage { }

private class ClearMessage : QueueMessage 
{
    public static readonly ClearMessage Instance = new ClearMessage();

    private ClearMessage() { }
}

private class TryDequeueMessage : QueueMessage 
{
    public static readonly TryDequeueMessage Instance = new TryDequeueMessage();

    private TryDequeueMessage() { }
}

private class EnqueueMessage : QueueMessage 
{
    public TValue Item { get; private set; }

    private EnqueueMessage(TValue item)
    {
        Item = item;
    }
}

Актерский класс

/// <summary>Represents a callback method to be executed by an Actor.</summary>
/// <typeparam name="TReply">The type of reply.</typeparam>
/// <param name="reply">The reply made by the actor.</param>
public delegate void ActorReplyCallback<TReply>(TReply reply);

/// <summary>Represents an Actor which receives and processes messages in concurrent applications.</summary>
/// <typeparam name="TMessage">The type of message this actor accepts.</typeparam>
/// <typeparam name="TReply">The type of reply made by this actor.</typeparam>
public abstract class Actor<TMessage, TReply> : IDisposable
{
    /// <summary>The default total number of threads to process messages.</summary>
    private const Int32 DefaultThreadCount = 1;


    /// <summary>Used to serialize access to the message queue.</summary>
    private readonly Locker Locker;

    /// <summary>Stores the messages until they can be processed.</summary>
    private readonly System.Collections.Generic.Queue<Message> MessageQueue;

    /// <summary>Signals the actor thread to process a new message.</summary>
    private readonly ManualResetEvent PostEvent;

    /// <summary>This tells the actor thread to stop reading from the queue.</summary>
    private readonly ManualResetEvent DisposeEvent;

    /// <summary>Processes the messages posted to the actor.</summary>
    private readonly List<Thread> ActorThreads;


    /// <summary>Initializes a new instance of the Genex.Concurrency&lt;TRequest, TResponse&gt; class.</summary>
    public Actor() : this(DefaultThreadCount) { }

    /// <summary>Initializes a new instance of the Genex.Concurrency&lt;TRequest, TResponse&gt; class.</summary>
    /// <param name="thread_count"></param>
    public Actor(Int32 thread_count)
    {
        if (thread_count < 1) throw new ArgumentOutOfRangeException("thread_count", thread_count, "Must be 1 or greater.");

        Locker = new Locker();
        MessageQueue = new System.Collections.Generic.Queue<Message>();
        EnqueueEvent = new ManualResetEvent(true);
        PostEvent = new ManualResetEvent(false);
        DisposeEvent = new ManualResetEvent(true);
        ActorThreads = new List<Thread>();

        for (Int32 i = 0; i < thread_count; i++)
        {
            var thread = new Thread(ProcessMessages);
            thread.IsBackground = true;
            thread.Start();
            ActorThreads.Add(thread);
        }
    }


    /// <summary>Posts a message and waits for the reply.</summary>
    /// <param name="value">The message to post to the actor.</param>
    /// <returns>The reply from the actor.</returns>
    public TReply PostWithReply(TMessage message)
    {
        using (var wrapper = new Message(message))
        {
            lock (Locker) MessageQueue.Enqueue(wrapper);
            PostEvent.Set();
            wrapper.Channel.CompleteEvent.WaitOne();
            return wrapper.Channel.Value;
        }
    }

    /// <summary>Posts a message to the actor and executes the callback when the reply is received.</summary>
    /// <param name="value">The message to post to the actor.</param>
    /// <param name="callback">The callback that will be invoked once the replay is received.</param>
    public void PostWithAsyncReply(TMessage value, ActorReplyCallback<TReply> callback)
    {
        if (callback == null) throw new ArgumentNullException("callback");
        ThreadPool.QueueUserWorkItem(state => callback(PostWithReply(value)));
    }

    /// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
    public void Dispose()
    {
        if (DisposeEvent.WaitOne(10))
        {
            DisposeEvent.Reset();
            PostEvent.Set();

            foreach (var thread in ActorThreads)
            {
                thread.Join();
            }

            ((IDisposable)PostEvent).Dispose();
            ((IDisposable)DisposeEvent).Dispose();
        }
    }

    /// <summary>Processes a message posted to the actor.</summary>
    /// <param name="message">The message to be processed.</param>
    protected abstract void ProcessMessage(Message message);

    /// <summary>Dequeues the messages passes them to ProcessMessage.</summary>
    private void ProcessMessages()
    {
        while (PostEvent.WaitOne() && DisposeEvent.WaitOne(10))
        {
            var message = (Message)null;

            while (true)
            {
                lock (Locker)
                {
                    message = MessageQueue.Count > 0 ?
                        MessageQueue.Dequeue() :
                        null;

                    if (message == null)
                    {
                        PostEvent.Reset();
                        break;
                    }
                }

                try
                {
                    ProcessMessage(message);
                }
                catch
                {

                }
            }
        }
    }


    /// <summary>Represents a message that is passed to an actor.</summary>
    protected class Message : IDisposable
    {
        /// <summary>The actual value of this message.</summary>
        public TMessage Value { get; private set; }

        /// <summary>The channel used to give a reply to this message.</summary>
        public Channel Channel { get; private set; }


        /// <summary>Initializes a new instance of Genex.Concurrency.Message class.</summary>
        /// <param name="value">The actual value of the message.</param>
        public Message(TMessage value)
        {
            Value = value;
            Channel = new Channel();
        }


        /// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
        public void Dispose()
        {
            Channel.Dispose();
        }
    }

    /// <summary>Represents a channel used by an actor to reply to a message.</summary>         
    protected class Channel : IDisposable
    {
        /// <summary>The value of the reply.</summary>
        public TReply Value { get; private set; }

        /// <summary>Signifies that the message has been replied to.</summary>
        public ManualResetEvent CompleteEvent { get; private set; }


        /// <summary>Initializes a new instance of Genex.Concurrency.Channel class.</summary>
        public Channel()
        {
            CompleteEvent = new ManualResetEvent(false);
        }

        /// <summary>Reply to the message received.</summary>
        /// <param name="value">The value of the reply.</param>
        public void Reply(TReply value)
        {
            Value = value;
            CompleteEvent.Set();
        }

        /// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
        public void Dispose()
        {
            ((IDisposable)CompleteEvent).Dispose();
        }
    }
}
Это было полезно?

Решение

В вашем примере кода вы реализуете PostWithAsyncReply с точки зрения PostWithReply.Это не идеально, потому что это означает, что когда вы звоните PostWithAsyncReply и актеру требуется некоторое время, чтобы справиться с этим, на самом деле связаны два потока:тот, кто казнит актера, и тот, кто ждет, пока он закончится.Было бы лучше, если бы один поток выполнял актера, а затем вызывал обратный вызов в асинхронном случае.(Очевидно, что в синхронном случае нельзя избежать связывания двух потоков).

Обновлять:

Подробнее о вышесказанном:вы создаете актера с аргументом, сообщающим ему, сколько потоков нужно запустить.Для простоты предположим, что каждый актер работает с одним потоком (на самом деле это довольно хорошая ситуация, поскольку тогда актеры могут иметь внутреннее состояние без его блокировки, поскольку только один поток обращается к нему напрямую).

Актер А звонит актеру Б, ожидая ответа.Чтобы обработать запрос, актеру B необходимо позвонить актеру C.Итак, теперь ждут только потоки A и B, а поток C — единственный, который действительно дает процессору какую-либо работу.Вот вам и многопоточность!Но это то, что вы получите, если будете все время ждать ответов.

Хорошо, вы могли бы увеличить количество потоков, запускаемых в каждом актере.Но вы бы начали их, чтобы они могли сидеть и ничего не делать.Стек использует много памяти, а переключение контекста может быть дорогостоящим.

Так что лучше отправлять сообщения асинхронно, с механизмом обратного вызова, чтобы можно было забрать готовый результат.Проблема с вашей реализацией заключается в том, что вы захватываете другой поток из пула потоков просто для того, чтобы сидеть и ждать.Таким образом, вы в основном применяете обходной путь увеличения количества потоков.Вы выделяете поток для задачи никогда не бегает.

Было бы лучше реализовать PostWithReply с точки зрения PostWithAsyncReply, т.е.наоборот.Асинхронная версия является низкоуровневой.Опираясь на мой пример на основе делегатов (поскольку он требует меньше ввода кода!):

private bool InsertCoinImpl(int value) 
{
    // only accept dimes/10p/whatever it is in euros
    return (value == 10);
}

public void InsertCoin(int value, Action<bool> accepted)
{
    Submit(() => accepted(InsertCoinImpl(value)));
}

Таким образом, частная реализация возвращает логическое значение.Публичный асинхронный метод принимает действие, которое получит возвращаемое значение;и частная реализация, и действие обратного вызова выполняются в одном потоке.

Будем надеяться, что необходимость синхронного ожидания останется в меньшинстве.Но когда вам это нужно, его можно предоставить с помощью вспомогательного метода, абсолютно общего назначения и не привязанного к какому-либо конкретному актеру или типу сообщения:

public static T Wait<T>(Action<Action<T>> activity)
{
    T result = default(T);
    var finished = new EventWaitHandle(false, EventResetMode.AutoReset);

    activity(r =>
        {
            result = r;
            finished.Set();
        });

    finished.WaitOne();
    return result;
}

Итак, теперь о каком-то другом актере мы можем сказать:

bool accepted = Helpers.Wait<bool>(r => chocMachine.InsertCoin(5, r));

Аргумент типа для Wait может быть ненужным, не пробовал ничего из этого компилировать.Но Wait по сути, создает для вас обратный вызов, поэтому вы можете передать его какому-то асинхронному методу, а снаружи вы просто возвращаете все, что было передано в обратный вызов, в качестве возвращаемого значения.Обратите внимание, что лямбда, которую вы передаете Wait все еще фактически выполняется в том же потоке, который вызвал Wait.

Теперь мы возвращаем вас к нашей обычной программе...

Что касается реальной проблемы, о которой вы спросили, вы отправляете сообщение актеру, чтобы заставить его что-то сделать.Делегаты здесь полезны.Они позволяют вам эффективно заставить компилятор сгенерировать вам класс с некоторыми данными, конструктор, который вам даже не нужно вызывать явно, а также метод.Если вам нужно написать кучу маленьких классов, переключитесь на делегатов.

abstract class Actor
{
    Queue<Action> _messages = new Queue<Action>();

    protected void Submit(Action action)
    {
        // take out a lock of course
        _messages.Enqueue(action);
    }

    // also a "run" that reads and executes the 
    // message delegates on background threads
}

Теперь конкретный производный актер следует этому шаблону:

class ChocolateMachineActor : Actor
{
    private void InsertCoinImpl(int value) 
    {
        // whatever...
    }

    public void InsertCoin(int value)
    {
        Submit(() => InsertCoinImpl(value));
    }
}

Итак, чтобы отправить сообщение актеру, вы просто вызываете публичные методы.Частный Impl метод делает реальную работу.Нет необходимости писать кучу классов сообщений вручную.

Очевидно, я не упомянул об ответах, но все это можно сделать, используя больше параметров.(См. обновление выше).

Другие советы

Стив Гилхэм резюмировал, как на самом деле компилятор обрабатывает дискриминируемые союзы.Для вашего собственного кода вы можете рассмотреть упрощенную версию этого.Учитывая следующий F #:

type QueueMessage<T> = ClearMessage | TryDequeueMessage | EnqueueMessage of T

Вот один из способов эмулировать это на C#:

public enum MessageType { ClearMessage, TryDequeueMessage, EnqueueMessage }

public abstract class QueueMessage<T>
{
    // prevents unwanted subclassing
    private QueueMessage() { }

    public abstract MessageType MessageType { get; }

    /// <summary>
    /// Only applies to EnqueueMessages
    /// </summary>
    public abstract T Item { get; }

    public static QueueMessage<T> MakeClearMessage() { return new ClearMessage(); }
    public static QueueMessage<T> MakeTryDequeueMessage() { return new TryDequeueMessage(); }
    public static QueueMessage<T> MakeEnqueueMessage(T item) { return new EnqueueMessage(item); }


    private sealed class ClearMessage : QueueMessage<T>
    {
        public ClearMessage() { }

        public override MessageType MessageType
        {
            get { return MessageType.ClearMessage; }
        }

        /// <summary>
        /// Not implemented by this subclass
        /// </summary>
        public override T Item
        {
            get { throw new NotImplementedException(); }
        }
    }

    private sealed class TryDequeueMessage : QueueMessage<T>
    {
        public TryDequeueMessage() { }

        public override MessageType MessageType
        {
            get { return MessageType.TryDequeueMessage; }
        }

        /// <summary>
        /// Not implemented by this subclass
        /// </summary>
        public override T Item
        {
            get { throw new NotImplementedException(); }
        }
    }

    private sealed class EnqueueMessage : QueueMessage<T>
    {
        private T item;
        public EnqueueMessage(T item) { this.item = item; }

        public override MessageType MessageType
        {
            get { return MessageType.EnqueueMessage; }
        }

        /// <summary>
        /// Gets the item to be enqueued
        /// </summary>
        public override T Item { get { return item; } }
    }
}

Теперь в коде, которому задано QueueMessage, вы можете включить MessageType вместо сопоставления с образцом и убедитесь, что у вас есть доступ к Item недвижимость только на EnqueueMessageс.

РЕДАКТИРОВАТЬ

Вот еще одна альтернатива, основанная на коде Джульетты.Однако я попытался упростить процесс, чтобы у него был более удобный интерфейс C#.Это предпочтительнее предыдущей версии, поскольку вы не можете получить MethodNotImplemented исключение.

public abstract class QueueMessage<T>
{
    // prevents unwanted subclassing
    private QueueMessage() { }

    public abstract TReturn Match<TReturn>(Func<TReturn> clearCase, Func<TReturn> tryDequeueCase, Func<T, TReturn> enqueueCase);

    public static QueueMessage<T> MakeClearMessage() { return new ClearMessage(); }
    public static QueueMessage<T> MakeTryDequeueMessage() { return new TryDequeueMessage(); }
    public static QueueMessage<T> MakeEnqueueMessage(T item) { return new EnqueueMessage(item); }

    private sealed class ClearMessage : QueueMessage<T>
    {
        public ClearMessage() { }

        public override TReturn Match<TReturn>(Func<TReturn> clearCase, Func<TReturn> tryDequeueCase, Func<T, TReturn> enqueueCase)
        {
            return clearCase();
        }
    }

    private sealed class TryDequeueMessage : QueueMessage<T>
    {
        public TryDequeueMessage() { }

        public override TReturn Match<TReturn>(Func<TReturn> clearCase, Func<TReturn> tryDequeueCase, Func<T, TReturn> enqueueCase)
        {
            return tryDequeueCase();
        }
    }

    private sealed class EnqueueMessage : QueueMessage<T>
    {
        private T item;
        public EnqueueMessage(T item) { this.item = item; }

        public override TReturn Match<TReturn>(Func<TReturn> clearCase, Func<TReturn> tryDequeueCase, Func<T, TReturn> enqueueCase)
        {
            return enqueueCase(item);
        }
    }
}

Вы бы использовали этот код следующим образом:

public class MessageUserTest
{
    public void Use()
    {
        // your code to get a message here...
        QueueMessage<string> msg = null; 

        // emulate pattern matching, but without constructor names
        int i =
            msg.Match(
                clearCase:      () => -1,
                tryDequeueCase: () => -2,
                enqueueCase:     s =>  s.Length);
    }
}

Типы объединений и сопоставление шаблонов напрямую связаны с шаблоном посетителя, я уже писал об этом несколько раз:

Итак, если вы хотите передавать сообщения большого количества разных типов, вы застряли в реализации шаблона посетителя.

(Внимание, впереди непроверенный код, но он должен дать вам представление о том, как это делается)

Допустим, у нас есть что-то вроде этого:

type msg =
    | Add of int
    | Sub of int
    | Query of ReplyChannel<int>


let rec counts = function
    | [] -> (0, 0, 0)
    | Add(_)::xs -> let (a, b, c) = counts xs in (a + 1, b, c)
    | Sub(_)::xs -> let (a, b, c) = counts xs in (a, b + 1, c)
    | Query(_)::xs -> let (a, b, c) = counts xs in (a, b, c + 1)

В итоге вы получите такой громоздкий код C#:

interface IMsgVisitor<T>
{
    T Visit(Add msg);
    T Visit(Sub msg);
    T Visit(Query msg);
}

abstract class Msg
{
    public abstract T Accept<T>(IMsgVistor<T> visitor)
}

class Add : Msg
{
    public readonly int Value;
    public Add(int value) { this.Value = value; }
    public override T Accept<T>(IMsgVisitor<T> visitor) { return visitor.Visit(this); }
}

class Sub : Msg
{
    public readonly int Value;
    public Add(int value) { this.Value = value; }
    public override T Accept<T>(IMsgVisitor<T> visitor) { return visitor.Visit(this); }
}

class Query : Msg
{
    public readonly ReplyChannel<int> Value;
    public Add(ReplyChannel<int> value) { this.Value = value; }
    public override T Accept<T>(IMsgVisitor<T> visitor) { return visitor.Visit(this); }
}

Теперь, когда вы захотите что-то сделать с сообщением, вам нужно реализовать посетителя:

class MsgTypeCounter : IMsgVisitor<MsgTypeCounter>
{
    public readonly Tuple<int, int, int> State;    

    public MsgTypeCounter(Tuple<int, int, int> state) { this.State = state; }

    public MsgTypeCounter Visit(Add msg)
    {
        Console.WriteLine("got Add of " + msg.Value);
        return new MsgTypeCounter(Tuple.Create(1 + State.Item1, State.Item2, State.Item3));
    }

    public MsgTypeCounter Visit(Sub msg)
    {
        Console.WriteLine("got Sub of " + msg.Value);
        return new MsgTypeCounter(Tuple.Create(State.Item1, 1 + State.Item2, State.Item3));
    }

    public MsgTypeCounter Visit(Query msg)
    {
        Console.WriteLine("got Query of " + msg.Value);
        return new MsgTypeCounter(Tuple.Create(State.Item1, 1 + State.Item2, State.Item3));
    }
}

Затем, наконец, вы можете использовать его следующим образом:

var msgs = new Msg[] { new Add(1), new Add(3), new Sub(4), new ReplyChannel(null) };
var counts = msgs.Aggregate(new MsgTypeVisitor(Tuple.Create(0, 0, 0)),
    (acc, x) => x.Accept(acc)).State;

Да, это так глупо, как кажется, но именно так вы передаете несколько сообщений классу типобезопасным способом, и именно поэтому мы не реализуем объединения в C#;)

Далеко, но все равно..

Я предполагаю, что дискриминируемое объединение — это F # для ADT (абстрактный тип данных).Это означает, что тип может быть одним из нескольких.

Если их два, вы можете попробовать поместить их в простой универсальный класс с двумя параметрами типа:

 public struct DiscriminatedUnion<T1,T2>
 {   
      public DiscriminatedUnion(T1 t1) { value = t1; }
      public DiscriminatedUnion(T2 t1) { value = t2; }


      public static implicit operator T1(DiscriminatedUnion<T1,T2> du) {return (T1)du.value; }
      public static implicit operator T2(DiscriminatedUnion<T1,T2> du) {return (T2)du.value; }

      object value;
 }

Чтобы он работал для 3 или более человек, нам нужно повторить этот класс несколько раз.У кого-нибудь есть решение для перегрузки функций в зависимости от типа среды выполнения?

Если у вас есть это

type internal Either<'a, 'b> =
  | Left of 'a
  | Right of 'b

в F#, то эквивалент C# CLR, созданный для класса Either<'a, 'b> имеет внутренние типы, такие как

internal  class _Left : Either<a, b>
{
     internal readonly a left1;
     internal _Left(a left1);
}

каждый с тегом, геттером и фабричным методом

internal const  int tag_Left = 0;
internal static  Either<a, b> Left(a Left1);
internal a Left1 {  get; }

плюс дискриминатор

internal int  Tag { get; }

и множество методов для реализации интерфейсов IStructuralEquatable, IComparable, IStructuralComparable

Существует тип распознаваемого объединения, проверяемый во время компиляции: Дискриминируемое объединение в C#

private class ClearMessage
{
    public static readonly ClearMessage Instance = new ClearMessage();    
    private ClearMessage() { }
}

private class TryDequeueMessage 
{
    public static readonly TryDequeueMessage Instance = new TryDequeueMessage();    
    private TryDequeueMessage() { }
}

private class EnqueueMessage
{
    public TValue Item { get; private set; }    
    private EnqueueMessage(TValue item) { Item = item; }
}

Использование дискриминируемого объединения можно сделать следующим образом:

// New file
// Create an alias
using Message = Union<ClearMessage, TryDequeueMessage, EnqueMessage>;

int ProcessMessage(Message msg)
{
   return Message.Match(
      clear => 1,
      dequeue => 2,
      enqueue => 3);
}
Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top