2014-04-11 12 views
15

Chciałbym oddzwonić funkcja asynchroniczne w ramach subskrypcji Rx.Jak wywołać funkcję asynchroniczną z subskrypcji rx?

E.g. w ten sposób:

public class Consumer 
{ 
    private readonly Service _service = new Service(); 

    public ReplaySubject<string> Results = new ReplaySubject<string>(); 

    public void Trigger() 
    { 
     Observable.Timer(TimeSpan.FromMilliseconds(100)).Subscribe(async _ => await RunAsync()); 
    } 

    public Task RunAsync() 
    { 
     return _service.DoAsync(); 
    } 
} 

public class Service 
{ 
    public async Task<string> DoAsync() 
    { 
     return await Task.Run(() => Do()); 
    } 

    private static string Do() 
    { 
     Thread.Sleep(TimeSpan.FromMilliseconds(200)); 
     throw new ArgumentException("invalid!"); 
     return "foobar"; 
    } 
} 

[Test] 
public async Task Test() 
{ 
    var sut = new Consumer(); 
    sut.Trigger(); 
    var result = await sut.Results.FirstAsync(); 
} 

Co należy zrobić, aby właściwie uchwycić wyjątek?

+1

Po prostu pomyślałem, że mogę umieścić async na pierwszym miejscu. Ale niestety to nie rozwiązuje problemu, z którym się borykam. Opublikuję bardziej wyraźny przykład. –

+0

możliwy duplikat [Czy istnieje sposób na subskrypcję obserwatora jako asynchronicznego] (http://stackoverflow.com/questions/18814805/is-there-a-way-to-subscribe-an-observer-as-async) – Zache

Odpowiedz

9

Nie chcesz przekazać metody async do Subscribe, ponieważ utworzy ona metodę async void. Jak najlepiej, aby uniknąć async void.

W twoim przypadku, I myślę, że to, co chcesz, to wywołanie metody async dla każdego elementu sekwencji, a następnie buforowanie wszystkich wyników. W takim przypadku należy użyć SelectMany wywołać metodę async dla każdego elementu i Replay do pamięci podręcznej (plus Connect aby uzyskać toczenia piłki):

public class Consumer 
{ 
    private readonly Service _service = new Service(); 

    public IObservable<string> Trigger() 
    { 
     var connectible = Observable.Timer(TimeSpan.FromMilliseconds(100)) 
      .SelectMany(_ => RunAsync()) 
      .Replay(); 
     connectible.Connect(); 
     return connectible; 
    } 

    public Task<string> RunAsync() 
    { 
     return _service.DoAsync(); 
    } 
} 

Zmieniłem właściwość Results zostać zwrócone z metody Trigger zamiast , co moim zdaniem ma więcej sensu, więc test teraz wygląda:

[Test] 
public async Task Test() 
{ 
    var sut = new Consumer(); 
    var results = sut.Trigger(); 
    var result = await results.FirstAsync(); 
} 
+1

To niesamowite rozwiązanie. Dzięki Stephen. –

+0

Co zrobić, jeśli chcesz przechwycić wyjątki generowane przez RunAsync? Co mam jest: obs.SelectMany (x => Observable.FromAsync (() => RunAsync()) .Catch (Observable.Empty ())) Czy istnieje lepszy sposób? –

+0

@VictorGrigoriu: Polecam zadać własne pytanie i oznaczyć je słowem "system.reactive". –

14

zmienić na:

Observable.Timer(TimeSpan.FromMilliseconds(100)) 
    .SelectMany(async _ => await RunAsync()) 
    .Subscribe(); 

Subskrybuj nie zachowuje operacji asynchronicznej wewnątrz obserwowalnej.

+1

Dzięki Paul, za twoją sugestię. Bardzo interesujące. Czy masz coś, co mogę przeczytać o tym dalej? –

20

odpowiedź Paul Betts' działa w większości scenariuszy, ale jeśli chcesz zablokować strumień czekając funkcji asynchronicznej skończyć trzeba coś takiego:

Observable.Interval(TimeSpan.FromSeconds(1)) 
      .Select(l => Observable.FromAsync(asyncMethod)) 
      .Concat() 
      .Subscribe(); 

czyli

Observable.Interval(TimeSpan.FromSeconds(1)) 
      .Select(_ => Observable.Defer(() => asyncMethod().ToObservable())) 
      .Concat() 
      .Subscribe(); 
+1

To kolejny przypadek użycia, ale bardzo interesujący. –