Stworzę obserwowalne (za pomocą różnych środków) i zwracam je zainteresowanym stronom, ale kiedy skończy się słuchanie, chciałbym zburzyć obserwowalne, aby nie zużywać zasobów. Innym sposobem, aby myśleć o tym, jak tworzenie tematów w systemie sub pub. Gdy nikt już nie subskrybuje danego tematu, nie chcesz już trzymać tematu i jego filtrowania.Jak utworzyć obserwowalny obiekt Rx, który zatrzymuje publikowanie zdarzeń, gdy ostatni obserwator anuluje subskrypcję?
6
A
Odpowiedz
10
Rx ma już operatora do własnych potrzeb - również dwie rzeczywistości - Publish
& RefCount
.
Oto jak z nich korzystać:
IObservable xs = ...
var rxs = xs.Publish().RefCount();
var sub1 = rxs.Subscribe(x => { });
var sub2 = rxs.Subscribe(x => { });
//later
sub1.Dispose();
//later
sub2.Dispose();
//The underlying subscription to `xs` is now disposed of.
proste.
1
Jeśli zrozumiałem twoje pytanie, chcesz utworzyć obserwowalne tak, że gdy wszyscy subskrybenci odłożyli swoją subskrypcję, tj. Nie ma już abonenta, to chcesz wykonać funkcję czyszczenia, która zatrzyma obserwowalne od produkcji dalsze wartości . Jeśli to, co chcesz, to możesz zrobić coś jak poniżej: Przykład
//Wrap a disposable
public class WrapDisposable : IDisposable
{
IDisposable disp;
Action act;
public WrapDisposable(IDisposable _disp, Action _act)
{
disp = _disp;
act = _act;
}
void IDisposable.Dispose()
{
act();
disp.Dispose();
}
}
//Observable that we want to clean up after all subs are done
public static IObservable<long> GenerateObs(out Action cleanup)
{
cleanup =() =>
{
Console.WriteLine("All subscribers are done. Do clean up");
};
return Observable.Interval(TimeSpan.FromSeconds(1));
}
//Wrap the observable
public static IObservable<T> WrapToClean<T>(IObservable<T> obs, Action onAllDone)
{
int count = 0;
return Observable.CreateWithDisposable<T>(ob =>
{
var disp = obs.Subscribe(ob);
Interlocked.Increment(ref count);
return new WrapDisposable(disp,() =>
{
if (Interlocked.Decrement(ref count) == 0)
{
onAllDone();
}
});
});
}
// Zastosowanie:
Action cleanup;
var obs = GenerateObs(out cleanup);
var newObs = WrapToClean(obs, cleanup);
newObs.Take(6).Subscribe(Console.WriteLine);
newObs.Take(5).Subscribe(Console.WriteLine);