2017-02-14 33 views
6

muszę wdrożyć następujący algorytm w Rx.NET:Handling nadciśnieniem w Rx.NET bez onBackpressureLatest

  1. Take ostatnią pozycję z stream, lub czekać na nowego elementu bez blokowania, jeśli nie ma żadnych nowych elementów . Tylko ostatni przedmiot ma znaczenie, inne mogą zostać usunięte.
  2. Wprowadź pozycję do SlowFunction i wydrukuj dane wyjściowe.
  3. Powtórz od kroku 1.

Naiwny rozwiązaniem jest:

let PrintLatestData (stream: IObservable<_>) = 
    stream.Select(SlowFunction).Subscribe(printfn "%A") 

Jednak to rozwiązanie nie działa, ponieważ średnio stream emituje przedmioty szybciej niż SlowFunction można je spożywać. Ponieważ Select nie upuszcza elementów, ale zamiast tego próbuje przetwarzać każdy element w kolejności od najstarszego do najnowszego, opóźnienie między emitowanym i drukowanym przedmiotem będzie wzrastało w nieskończoność podczas działania programu. Z strumienia należy pobrać tylko najnowszy najnowszy przedmiot, aby uniknąć nieskończenie rosnącego ciśnienia wstecznego.

Przeszukałem dokumentację i znalazłem metodę o nazwie onBackpressureLatest w języku RxJava, która zgodnie z moim rozumieniem umożliwiłaby to, co opisałem powyżej. Jednak metoda nie istnieje w Rx.NET. Jak wdrożyć to w Rx.NET?

+1

czym problem z 'SlowFunction' jest wolniejszy niż w strumieniu? –

+0

@FyodorSoikin Jeśli funkcja 'SlowFunction' działa wolniej niż strumień, nowe elementy są emitowane szybciej, niż mogą być przetwarzane i drukowane. W związku z uruchomieniem programu opóźnienie/opóźnienie między emitowanym nowym obiektem a wyjściami funkcji "SlowFunction" dla drukowanego elementu zwiększa się w nieskończoność. Jest to niedopuszczalne, ponieważ muszę monitorować dane w czasie rzeczywistym. Dbam tylko o najnowszy przedmiot. – Steve

+1

Czy 'SlowFunction' jest synchroniczny? Lub Obserwowalne/Asynchroniczne? – Shlomo

Odpowiedz

7

Myślę, że chcesz użyć czegoś takiego jak ObserveLatestOn. Skutecznie zastępuje kolejkę przychodzących zdarzeń pojedynczą wartością i flagą.

James Światowy napisał o nim tutaj http://www.zerobugbuild.com/?p=192

Koncepcja jest używany intensywnie w GUI aplikacji, które mogę zaufać, jak szybko serwer może zapisywać dane w nim.

Można również zobaczyć implementację w reaktywnym Trader https://github.com/AdaptiveConsulting/ReactiveTrader/blob/83a6b7f312b9ba9d70327f03d8d326934b379211/src/Adaptive.ReactiveTrader.Shared/Extensions/ObservableExtensions.cs#L64 i prezentacjach wspierające wyjaśniający ReactiveTrader https://leecampbell.com/presentations/#ReactConfLondon2014

Aby być jasne, że jest to algorytm zrzutu obciążenia, a nie algorytm ciśnienie wsteczne.

+0

'ObserveLatestOn' było tym, czego potrzebowałem, dziękuję! – Steve

+1

Chociaż może to teoretycznie odpowiedzieć na pytanie, [byłoby lepiej] (// meta.stackoverflow.com/q/8259), aby uwzględnić istotne części odpowiedzi tutaj, i podać link dla odniesienia. – Draken

+0

Lub zauważ, że może to być duplikat http://stackoverflow.com/questions/6384312/how-can-i-observe-values-in-a-non-blocking-way-using-rx i http://stroverflowflow.com/questions/11010602/with-rx-how-do-i-ignore-all-except- the-latest-value-when-my-subscribe-method-is/. Odpowiedź, którą rzekomo próbowałem, nie byłam pewna, czy OP chce ciśnienia wstecznego (zgodnie z tytułem), czy odłączenia obciążenia (zgodnie z opisem). –

1

Sugestia synchronizacji/asynchroniczna może nieznacznie pomóc, ale biorąc pod uwagę, że wolna funkcja jest zawsze wolniejsza niż strumień zdarzeń, to asynchronizacja może umożliwić paralelezowanie obsługi (z obserwacją na puli wątków) kosztem (ewentualnie) po prostu kończą się wątki lub dodają więcej opóźnień przy przełączaniu kontekstów. To nie brzmi jak rozwiązanie dla mnie.

Proponuję popatrzeć na operatory open source Rxx "Introspective" autorstwa Dave'a Sextona. Mogą one zmieniać okres buforowania/przepustnicy, z którego otrzymujesz najnowsze dane, ponieważ kolejka tworzy kopię zapasową z powodu wolnego użytkownika. Jeśli funkcja spowolnienia nagle stanie się szybsza, nie będzie w ogóle buforować rzeczy. Jeśli zrobi się wolniej, będzie go bardziej buforować. Musisz sprawdzić, czy istnieje typ "najnowsze od", lub po prostu zmodyfikować istniejący w zależności od potrzeb. Na przykład. Użyj bufora i po prostu weź ostatni element w buforze lub ulepszaj, aby wewnętrznie przechowywać tylko najnowsze. Google "Rxx", a znajdziesz go gdzieś na Githubie.

Prostsze podejście, jeśli czas "powolnej funkcji" jest dość przewidywalny, to po prostu zmniejszyć przepustowość o kwotę przekraczającą ten czas. Oczywiście nie mam na myśli standardowej "przepustnicy" rx, ale takiej, która pozwala na nowszą aktualizację zamiast starej. Istnieje wiele rozwiązań tego rodzaju problemu, dostępnych tutaj.

1

To samo pytanie przydarzyło mi się również jakiś czas temu i nie znalazłem wbudowanego operatora, który to robi. Napisałem więc własne, które nazwałem Latest. Nie trywialne do implementacji, ale okazało się bardzo przydatne w moim obecnym projekcie.

Działa to w ten sposób: podczas gdy obserwator jest zajęty przetwarzaniem poprzedniego powiadomienia (oczywiście w swoim wątku), kolejkuje do ostatniego powiadomienia (n> = 0) i OnNext staje się bezczynny. Więc:

  • Latest(0): tylko obserwować przedmioty przybywających podczas gdy obserwator jest bezczynny
  • Latest(1): zawsze przestrzegać najnowszy
  • Latest(1000) (np): Zazwyczaj przetwarzać wszystkie przedmioty, ale jeśli coś utknie w dół linii, raczej Tęsknię za czasem, niż dostaniemy OutOfMemoryException
  • Latest(int.MaxValue): Nigdy nie przegap produktu, ale zachowaj równowagę między producentem a konsumentem.

Twój kod byłoby zatem: stream.Latest(1).Select(SlowFunction).Subscribe(printfn "%A")

Podpis wygląda następująco:

/// <summary> 
/// Avoids backpressure by enqueuing items when the <paramref name="source"/> produces them more rapidly than the observer can process. 
/// </summary> 
/// <param name="source">The source sequence.</param> 
/// <param name="maxQueueSize">Maximum queue size. If the queue gets full, less recent items are discarded from the queue.</param> 
/// <param name="scheduler">Optional, default: <see cref="Scheduler.Default"/>: <see cref="IScheduler"/> on which to observe notifications.</param> 
/// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception> 
/// <exception cref="ArgumentOutOfRangeException"><paramref name="maxQueueSize"/> is negative.</exception> 
/// <remarks> 
/// A <paramref name="maxQueueSize"/> of 0 observes items only if the subscriber is ready. 
/// A <paramref name="maxQueueSize"/> of 1 guarantees to observe the last item in the sequence, if any. 
/// To observe the whole source sequence, specify <see cref="int.MaxValue"/>. 
/// </remarks> 
public static IObservable<TSource> Latest<TSource>(this IObservable<TSource> source, int maxQueueSize, IScheduler scheduler = null) 

realizacja jest zbyt duża, aby pisać tutaj, ale jeśli ktoś jest zainteresowany, ja bym chętnie dzielić się nią . Daj mi znać.

1

Możesz przesyłać strumień w przedziale, który znasz SlowFunction. Oto przykład w Java:

TestScheduler ts = new TestScheduler(); 

Observable<Long> stream = Observable.interval(1, TimeUnit.MILLISECONDS, ts).take(500); 
stream.sample(100, TimeUnit.MILLISECONDS, ts).subscribe(System.out::println); 

ts.advanceTimeBy(1000, TimeUnit.MILLISECONDS); 
98 
198 
298 
398 
498 
499 

sample nie powoduje ciśnienie wsteczne i zawsze chwyta ostatnią wartość w strumieniu, tak że spełnia swoje wymagania. Dodatkowo sample nie wyśle ​​dwukrotnie tej samej wartości (co można zobaczyć z góry jako 499 jest drukowane tylko raz).

myślę, że będzie to ważny C#/F# rozwiązanie:

static IDisposable PrintLatestData<T>(IObservable<T> stream) { 
    return stream.Sample(TimeSpan.FromMilliseconds(100)) 
     .Select(SlowFunction) 
     .Subscribe(Console.WriteLine); 
} 
let PrintLatestData (stream: IObservable<_>) = 
    stream.Sample(TimeSpan.FromMilliseconds(100)) 
     .Select(SlowFunction) 
     .Subscribe(printfn "%A") 
+1

To jest bardzo proste rozwiązanie problemu, ale w moim konkretnym przypadku użycia czas działania funkcji SlowFunction jest zbyt zróżnicowany, aby takie podejście działało. Dziękuję za udostępnienie. – Steve