كيفية معاودة الاتصال بوظيفة غير متزامنة من اشتراك rx؟
-
21-12-2019 - |
سؤال
أرغب في إعادة الاتصال بوظيفة غير متزامنة ضمن اشتراك Rx.
على سبيل المثالمثل هذا:
public class Consumer
{
private readonly Service _service = new Service();
public ReplaySubject<string> Results = new ReplaySubject<string>();
public void Trigger()
{
Observable.Timer(TimeSpan.FromMilliseconds(100)).Subscribe(async _ => await RunAsync());
}
public Task RunAsync()
{
return _service.DoAsync();
}
}
public class Service
{
public async Task<string> DoAsync()
{
return await Task.Run(() => Do());
}
private static string Do()
{
Thread.Sleep(TimeSpan.FromMilliseconds(200));
throw new ArgumentException("invalid!");
return "foobar";
}
}
[Test]
public async Task Test()
{
var sut = new Consumer();
sut.Trigger();
var result = await sut.Results.FirstAsync();
}
ما الذي يجب فعله حتى تتمكن من اكتشاف الاستثناء بشكل صحيح؟
المحلول
أنت لا تريد تمرير async
طريقة ل Subscribe
, ، لأن ذلك سيؤدي إلى إنشاء async void
طريقة.بذل قصارى جهدك لتجنب async void
.
في حالتك، أنا يفكر ما تريده هو استدعاء async
طريقة لكل عنصر من عناصر التسلسل ثم قم بتخزين كافة النتائج.في هذه الحالة، استخدم SelectMany
للاتصال ب async
طريقة لكل عنصر، و Replay
للتخزين المؤقت (بالإضافة إلى أ Connect
لتبدأ الكرة):
public class Consumer
{
private readonly Service _service = new Service();
public IObservable<string> Trigger()
{
var connectible = Observable.Timer(TimeSpan.FromMilliseconds(100))
.SelectMany(_ => RunAsync())
.Replay();
connectible.Connect();
return connectible;
}
public Task<string> RunAsync()
{
return _service.DoAsync();
}
}
لقد غيرت Results
الممتلكات التي سيتم إرجاعها من Trigger
بدلاً من ذلك، والتي أعتقد أنها أكثر منطقية، لذا يبدو الاختبار الآن كما يلي:
[Test]
public async Task Test()
{
var sut = new Consumer();
var results = sut.Trigger();
var result = await results.FirstAsync();
}
نصائح أخرى
إجابة Paul Betts تعمل في معظم السيناريوهات، ولكن إذا كنت ترغب في حظر الدفق أثناء انتظار وظيفة ASYNC لإنهاءك، فأنت بحاجة إلى شيء مثل هذا:
giveacodicetagpre.أو:
giveacodicetagpre.تغيير هذا إلى:
giveacodicetagpre.الاشتراك لا يحتفظ بعملية ASYNC داخل الملاحظ.