如何从rx subscribe回调async函数?
-
21-12-2019 - |
题
我想在Rx订阅中回调一个异步函数。
例如。像那样:
public class Consumer
{
private readonly Service _service = new Service();
public ReplaySubject<string> Results = new ReplaySubject<string>();
public void Trigger()
{
Observable.Timer(TimeSpan.FromMilliseconds(100)).Subscribe(async _ => await RunAsync());
}
public Task RunAsync()
{
return _service.DoAsync();
}
}
public class Service
{
public async Task<string> DoAsync()
{
return await Task.Run(() => Do());
}
private static string Do()
{
Thread.Sleep(TimeSpan.FromMilliseconds(200));
throw new ArgumentException("invalid!");
return "foobar";
}
}
[Test]
public async Task Test()
{
var sut = new Consumer();
sut.Trigger();
var result = await sut.Results.FirstAsync();
}
为了正确捕获异常,需要做些什么?
解决方案
你不想通过一个 async
方法来 Subscribe
, ,因为这将创建一个 async void
方法。尽最大努力避免 async void
.
就你而言,我 想想 你想要的是打电话给 async
方法为序列的每个元素,然后缓存所有结果。在这种情况下,使用 SelectMany
致电 async
每个元素的方法,以及 Replay
要缓存(加一个 Connect
让球滚动):
public class Consumer
{
private readonly Service _service = new Service();
public IObservable<string> Trigger()
{
var connectible = Observable.Timer(TimeSpan.FromMilliseconds(100))
.SelectMany(_ => RunAsync())
.Replay();
connectible.Connect();
return connectible;
}
public Task<string> RunAsync()
{
return _service.DoAsync();
}
}
我改变了 Results
将从 Trigger
方法代替,我认为这更有意义,所以测试现在看起来像:
[Test]
public async Task Test()
{
var sut = new Consumer();
var results = sut.Trigger();
var result = await results.FirstAsync();
}
其他提示
paul betts'答案在大多数情况下都是在大多数情况下的工作,但如果你想在等待异步函数的同时阻止流,你需要这样的东西:
Observable.Interval(TimeSpan.FromSeconds(1))
.Select(l => Observable.FromAsync(asyncMethod))
.Concat()
.Subscribe();
.
或:
Observable.Interval(TimeSpan.FromSeconds(1))
.Select(_ => Observable.Defer(() => asyncMethod().ToObservable()))
.Concat()
.Subscribe();
. 将其更改为:
Observable.Timer(TimeSpan.FromMilliseconds(100))
.SelectMany(async _ => await RunAsync())
.Subscribe();
.
订阅不会在可观察到的内部保持异步操作。
不隶属于 StackOverflow