Domanda

I have a list of source elements Foo, and an asynchronous method GetBoosAsync() to convert them to elements of type Boo:

public class Foo
{
    public string Value { get; set; }
}

public class Boo
{
    public string Value { get; set; }
}

static void Main(string[] args)
{
    IReadOnlyCollection<Foo> foos = GetFoos();

    Task<IReadOnlyCollection<Boo>> boosTask = GetBoosAsync(foos);

    boosTask.Wait();

    foreach (Boo boo in boosTask.Result)
    {
        Console.WriteLine(boo.Value);
    }

    Console.ReadKey();
}

public static IReadOnlyCollection<Foo> GetFoos()
{
    return Enumerable.Range(1, 100).Select(i => new Foo
    {
        Value = i.ToString(CultureInfo.CurrentCulture)

    }).ToList();
}

public static async Task<IReadOnlyCollection<Boo>> GetBoosAsync(IReadOnlyCollection<Foo> foos)
{
    List<Task<Boo>> booTasks = foos.Select(ConvertFooToBooAsync).ToList();

    // Waiting for ALL conversions of Foos to Boos
    await Task.WhenAll(booTasks.ToArray<Task>());

    return booTasks.Select(booTask => booTask.Result).ToList();
}

public static Task<Boo> ConvertFooToBooAsync(Foo foo)
{
    return Task.Factory.StartNew(() =>
    {
        Thread.Sleep(100);
        return new Boo { Value = foo.Value };
    });
}

The method GetBoosAsync() calls ConvertFoosToBoosAsync for each element, waits for completion of all conversions and then returns list of results.

The problem:

How to implement the method GetBoosAsync to return IObservable<Boo> instead, and asynchronously return individual elements as they get processed?

i.e.

public static IObservable<Boo> OGetBoos(IReadOnlyCollection<Foo> foos)
{
    ...
}

and use it like this:

static void Main(string[] args)
{
    IReadOnlyCollection<Foo> foos = GetFoos();

    IObservable<Boo> boos = OGetBoos(foos);

    boos = boos.Do(boo =>
    {
        Console.WriteLine(boo.Value);
    });

    boos.Wait();

    Console.ReadKey();
}
È stato utile?

Soluzione

Would this work for you:

public static IObservable<Boo> OGetBoos(IReadOnlyCollection<Foo> foos)
{
    var query =
        from f in foos.ToObservable()
        from b in ConvertFooToBooAsync(f).ToObservable()
        select b;

    return query;
}

BTW, thanks for easy to compile working code in your question!

Altri suggerimenti

You can try this approach as well.

public static IObservable<Boo> GetBoosAsync(IEnumerable<Foo> foos)
{
    return foos.Select(t => Observable.FromAsync(() => ConvertFooToBooAsync(t))).Merge();
}
Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top