Frage

Kopieren Sie den folgenden Code in neuer C # Konsolenanwendung einfügen.

class Program
{
    static void Main(string[] args)
    {
        var enumerator = new QueuedEnumerator<long>();
        var listenerWaitHandle = Listener(enumerator);

        Publisher(enumerator);
        listenerWaitHandle.WaitOne();
    }

    private static AutoResetEvent Listener(IEnumerator<long> items)
    {
        var @event = new AutoResetEvent(false);
        ThreadPool.QueueUserWorkItem((o) =>
        {
            while (items.MoveNext())
            {
                Console.WriteLine("Received : " + items.Current);
                Thread.Sleep(2 * 1000);
            }
            (o as AutoResetEvent).Set();
        }, @event);
        return @event;
    }

    private static void Publisher(QueuedEnumerator<long> enumerator)
    {
        for (int i = 0; i < 10; i++)
        {
            enumerator.Set(i);
            Console.WriteLine("Sended : " + i);
            Thread.Sleep(1 * 1000);
        }
        enumerator.Finish();
    }

    class QueuedEnumerator<T> : IEnumerator<T>
    {
        private Queue _internal = Queue.Synchronized(new Queue());
        private T _current;
        private bool _finished;
        private AutoResetEvent _setted = new AutoResetEvent(false);

        public void Finish()
        {
            _finished = true;
            _setted.Set();
        }

        public void Set(T item)
        {
            if (_internal.Count > 3)
            {
                Console.WriteLine("I'm full, give the listener some slack !");
                Thread.Sleep(3 * 1000);
                Set(item);
            }
            else
            {
                _internal.Enqueue(item);
                _setted.Set();
            }
        }

        public T Current
        {
            get { return _current; }
        }

        public void Dispose()
        {
        }


        object System.Collections.IEnumerator.Current
        {
            get { return _current; }
        }

        public bool MoveNext()
        {
            if (_finished && _internal.Count == 0)
                return false;
            else if (_internal.Count > 0)
            {
                _current = (T)_internal.Dequeue();
                return true;
            }
            else
            {
                _setted.WaitOne();
                return MoveNext();
            }
        }

        public void Reset()
        {
        }
    }
}

2 Fäden (A, B)

kann ein Thread eine Instanz zu einer Zeit zur Verfügung stellen, und fordert die Set-Methode B Gewinde wollen

eine Folge von Instanzen (A durch Gewinde versehen) empfangen,

So wahrsten Sinne des Wortes ein Add Transformation (Artikel), Hinzufügen (Artikel), .. zu einer IEnumerable zwischen verschiedenen Threads

Andere Lösungen begrüßen auch natürlich!

War es hilfreich?

Lösung

Sicher - dieser Code ist möglicherweise nicht die am besten Art und Weise, es zu tun, aber hier war mein erster Stich an es:

Subject<Item> toAddObservable;
ListObservable<Item> buffer;

void Init()
{
    // Subjects are an IObservable we can trigger by-hand, they're the 
    // mutable variables of Rx
    toAddObservable = new Subject(Scheduler.TaskPool);

    // ListObservable will hold all our items until someone asks for them
    // It will yield exactly *one* item, but only when toAddObservable
    // is completed.
    buffer = new ListObservable<Item>(toAddObservable);
}

void Add(Item to_add)
{
    lock (this) {
        // Subjects themselves are thread-safe, but we still need the lock
        // to protect against the reset in FetchResults
        ToAddOnAnotherThread.OnNext(to_add);
    }
}

IEnumerable<Item> FetchResults()
{
    IEnumerable<Item> ret = null;
    buffer.Subscribe(x => ret = x);

    lock (this) {
        toAddObservable.OnCompleted();
        Init();     // Recreate everything
    }

    return ret;
}
Lizenziert unter: CC-BY-SA mit Zuschreibung
Nicht verbunden mit StackOverflow
scroll top