muszę wdrożyć następujący algorytm w Rx.NET:Handling nadciśnieniem w Rx.NET bez onBackpressureLatest
- Take ostatnią pozycję z
stream
, lub czekać na nowego elementu bez blokowania, jeśli nie ma żadnych nowych elementów . Tylko ostatni przedmiot ma znaczenie, inne mogą zostać usunięte. - Wprowadź pozycję do
SlowFunction
i wydrukuj dane wyjściowe. - Powtórz od kroku 1.
Naiwny rozwiązaniem jest:
let PrintLatestData (stream: IObservable<_>) =
stream.Select(SlowFunction).Subscribe(printfn "%A")
Jednak to rozwiązanie nie działa, ponieważ średnio stream
emituje przedmioty szybciej niż SlowFunction
można je spożywać. Ponieważ Select
nie upuszcza elementów, ale zamiast tego próbuje przetwarzać każdy element w kolejności od najstarszego do najnowszego, opóźnienie między emitowanym i drukowanym przedmiotem będzie wzrastało w nieskończoność podczas działania programu. Z strumienia należy pobrać tylko najnowszy najnowszy przedmiot, aby uniknąć nieskończenie rosnącego ciśnienia wstecznego.
Przeszukałem dokumentację i znalazłem metodę o nazwie onBackpressureLatest
w języku RxJava, która zgodnie z moim rozumieniem umożliwiłaby to, co opisałem powyżej. Jednak metoda nie istnieje w Rx.NET. Jak wdrożyć to w Rx.NET?
czym problem z 'SlowFunction' jest wolniejszy niż w strumieniu? –
@FyodorSoikin Jeśli funkcja 'SlowFunction' działa wolniej niż strumień, nowe elementy są emitowane szybciej, niż mogą być przetwarzane i drukowane. W związku z uruchomieniem programu opóźnienie/opóźnienie między emitowanym nowym obiektem a wyjściami funkcji "SlowFunction" dla drukowanego elementu zwiększa się w nieskończoność. Jest to niedopuszczalne, ponieważ muszę monitorować dane w czasie rzeczywistym. Dbam tylko o najnowszy przedmiot. – Steve
Czy 'SlowFunction' jest synchroniczny? Lub Obserwowalne/Asynchroniczne? – Shlomo