سؤال

This may seem a basic question to some other, but as I am just getting started using Rx, I was wondering if someone could point me in the right direction. I am trying to subscribe to a stream that is being updated by a timer. I have some test code below which should hopefully produced lots of integers. However, although the list is populated the Write handler is never called, so no output. The code is a just test code so I haven't disposed of anything. Eventually I want to get the contents of a directory by polling and handle duplicates.

using System;
using System.Collections.Generic;
using System.Reactive.Linq;
using System.Timers;
using Timer = System.Timers.Timer;

namespace ConsoleApplication1
{

    public class ObservableTest
    {
        private readonly List<int> m_numbers = new List<int>();
        private readonly Random m_random = new Random();
        public ObservableTest()
        {
            var timer = new Timer(1000);
            timer.Elapsed += MTimerOnElapsed;
            timer.Enabled = true;
        }

        private void MTimerOnElapsed(object sender, ElapsedEventArgs elapsedEventArgs)
        {
            var randomNumber = m_random.Next(0, 100);
            m_numbers.Add(randomNumber);
        }

        public IObservable<int> GetSequence()
        {
            return m_numbers.ToObservable();
        }

    }

    class Program
    {
        static void Main(string[] args)
        {

            var observable = new ObservableTest();
            observable.GetSequence().Subscribe(Write);

            Console.ReadLine();

        }

        private static void Write(int obj)
        {
            Console.WriteLine(obj);
        }
    }
}
هل كانت مفيدة؟

المحلول

In your code above, the line m_numbers.ToObservable() creates an observable sequence that iterates through all the values in m_numbers straight away and then calls OnCompleted. As there are no values there when it's called at first, nothing happens. You can see this by adding a System.Threading.Thread.Sleep(3100) before you call subscribe. This will print out three numbers.

A better approach is to use one of Rx's built in methods for creating sequences. One way to get a stream of random numbers one second apart is to use Observable.Interval:

static void Main(string[] args)
{
    var random = new Random();
    var source = Observable
         .Interval(TimeSpan.FromSeconds(1))
         .Select(_ => random.Next(1, 100));

    source.Subscribe(Console.WriteLine);

    Console.ReadLine();
}

The select function ignores the value from Interval (which is just a counter) and returns a random one.

نصائح أخرى

Take a look on this version

internal class Program
{
    private static readonly Random _random = new Random();

    private static void Main(string[] args)
    {
        IDisposable subscription = TimeSequence();
        Console.ReadLine();
        subscription.Dispose();
    }

    private static IDisposable TimeSequence()
    {
        IObservable<long> timer = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1));
        return timer.Subscribe(GenerateRandom);
    }

    private static void GenerateRandom(long tick)
    {
        int randomNumber = _random.Next(0, 100);
        Console.WriteLine(randomNumber);
    }
}

Here's more about Creating an Observable sequence, very usefull too - 101 Rx Samples

مرخصة بموجب: CC-BY-SA مع الإسناد
لا تنتمي إلى StackOverflow
scroll top