Należy pamiętać, że wywołania zwrotne, takie jak te używane w EnumWindows
są subtelnie różne od Rx. W szczególności wywołanie zwrotne może komunikować się z powrotem z osobą dzwoniącą poprzez jego wartość zwracaną. Obserwatorzy Rx nie mogą tego zrobić. Również wywołania zwrotne mogą odbierać wiele parametrów, ale obserwatorzy Rx otrzymują pojedynczą wartość. Musisz więc opakować wiele parametrów w jeden obiekt.
Mając to na uwadze, alternatywą dla używania Subject
jest użycie Observable.Create
. W ten sposób zarejestrujesz wywołanie zwrotne tylko wtedy, gdy faktycznie jest obserwator i wyrejestrujesz go, jeśli ten obserwator zrezygnuje z subskrypcji.
Dla synchronicznego interfejsu API, którego użyłeś w przykładzie, możesz zrobić coś takiego. Uwaga: w tym przykładzie nie ma sposobu na wyrejestrowanie pośredniego strumienia oddzwaniania, ponieważ wszystko dzieje się synchronicznie, zanim będziemy mogli kiedykolwiek zrezygnować z subskrypcji.
public static IObservable<Foo> WrapFooApi(string arg1, string arg2)
{
return Observable.Create<Foo>(observer =>
{
FooApi.enumerate(arg1, arg2, e =>
{
observer.OnNext(new Foo(e));
return true;
});
// In your case, FooApi.enumerate is actually synchronous
// so when we get to this line of code, we know
// the stream is complete.
observer.OnCompleted();
return Disposable.Empty;
});
}
// Usage
WrapFooApi("a", "b").Take(1).Subscribe(...); // only takes first item
Możemy rozwiązać problem z byciem w stanie zatrzymać wcześnie wprowadzając trochę asynchroniczność, które dadzą czas obserwatora, aby uzyskać jednorazowe, że można go wyrzucać do powiadamiania. Możemy użyć CreateAsync
, aby uzyskać CancellationToken
, który zostanie anulowany, gdy obserwator zrezygnuje z subskrypcji. I możemy uruchomić kod FooApi
wewnątrz Task.Run
:
public static IObservable<Foo> WrapFooApi(string arg1, string arg2)
{
return Observable.CreateAsync<Foo>(async (observer, ct) =>
{
await Task.Run(() => FooApi.register_callback(arg1, arg2, e =>
{
observer.OnNext(e);
// Returning false will stop the enumeration
return !ct.IsCancellationRequested;
}));
observer.OnCompleted();
});
}
W bardziej tradycyjnych asynchronicznych wywołań zwrotnych API, gdzie można zarejestrować się w pewnym momencie i wyrejestrowania w jakimś innym miejscu, może mieć coś więcej tak:
public static IObservable<Foo> WrapFooApi(string args)
{
return Observable.Create<Foo>(observer =>
{
FooToken token = default(FooToken);
var unsubscribe = Disposable.Create(() => FooApi.Unregister(token));
token = FooApi.Register(args, e =>
{
observer.OnNext(new Foo(e));
});
return unsubscribe;
});
}
Albo możesz po prostu użyć 'Subject' i wywołać' OnNext' na tym wywołanie zwrotnym. – Dirk
@Dirk, ciekawe, dzięki. A więc "Subject.OnNext", a następnie "Subject.OnComplete", gdy nie ma już żadnych przedmiotów? – avo
Tak, Subject implementuje zarówno "IObservable', jak i' IObserver'. Wywołaj OnNext/OnError/OnCompleted, aby wydać te polecenia subskrybentom Tematu.Są serwerami jako rodzaj bramy od kodu innego niż Rx do Rx. – Dirk