2017-01-17 55 views
6

Rozważmy przez chwilę podany kodRx distinctUntilChanged umożliwiają powtórzenie po ustawionym czasie pomiędzy wydarzeniami

Rx.Observable.merge(
    Rx.Observable.just(1), 
    Rx.Observable.just(1).delay(1000) 
).distinctUntilChanged() 
    .subscribe(x => console.log(x)) 

Oczekujemy, że 1 jest rejestrowane tylko raz. Co jednak, jeśli chcemy pozwolić na powtórzenie wartości, jeśli jej ostatnia emisja była konfigurowalna dawno temu? Chodzi mi o to, aby rejestrować oba zdarzenia:.

Na przykład byłoby fajnie mieć coś takiego następującym

Rx.Observable.merge(
    Rx.Observable.just(1), 
    Rx.Observable.just(1).delay(1000) 
).distinctUntilChanged(1000) 
    .subscribe(x => console.log(x)) 

W którym distinctUntilChanged() akceptuje jakiś limit czasu, aby umożliwić powtórzenie następnego elementu. Jednak coś takiego nie istnieje i zastanawiałem się, czy ktokolwiek zna elegancki sposób, aby to osiągnąć, korzystając z operatorów o wysokim poziomie bez zakłócania działania filtra, który wymagałby obsługi stanu:

Odpowiedz

8

Chyba jestem nieporozumienie Jestem całkiem pewien, to może być realizowane w sposób stosunkowo prosta ze windowTime:

Observable 
    .merge(
    Observable.of(1), 
    Observable.of(1).delay(250), // Ignored 
    Observable.of(1).delay(700), // Ignored 
    Observable.of(1).delay(2000), 
    Observable.of(1).delay(2200), //Ignored 
    Observable.of(2).delay(2300) 
) 
    // Converts the stream into a stream of streams each 1000 milliseconds long 
    .windowTime(1000) 
    // Flatten each of the streams and emit only the latest (there should only be one active 
    // at a time anyway 
    // We apply the distinctUntilChanged to the windows before flattening 
    .switchMap(source => source.distinctUntilChanged()) 
    .timeInterval() 
    .subscribe(
    value => console.log(value), 
    error => console.log('error: ' + error), 
    () => console.log('complete') 
); 

Patrz przykład here (pożyczoną @ przykładowych wejść Marcina)

+0

Awesome, bardzo sprytne rozwiązanie problemu - powinno być zaakceptowaną odpowiedzią! – olsn

+1

Nie wiedziałem o operatorze okna. Wprowadziłem coś podobnego w skomplikowany sposób, ale to naprawdę upraszcza. Dzięki! Chociaż zauważyłem, że może wysyłać powtarzające się elementy, jeśli występują w pobliżu granicy okna (na przykład 990ms i 1010ms). Zmieniłem go, aby ustawić limit czasu granicznego względem ostatniego odrębnego elementu: 'obs.window (obs.distinctUntilChanged(). SwitchMap (s => Observable.timer (1000))). Zobacz zmodyfikowany [przykład] (https://jsbin.com/soyovumeja/edit?js,console). –

2

Jest to interesujący przypadek użycia. Zastanawiam się, czy istnieje prostsze rozwiązanie niż kopalnia (zauważ, że używam RxJS 5):

let timedDistinctUntil = Observable.defer(() => { 
    let innerObs = null; 
    let innerSubject = null; 
    let delaySub = null; 

    function tearDown() { 
     delaySub.unsubscribe(); 
     innerSubject.complete(); 
    } 

    return Observable 
     .merge(
      Observable.of(1), 
      Observable.of(1).delay(250), // ignored 
      Observable.of(1).delay(700), // ignored 
      Observable.of(1).delay(2000), 
      Observable.of(1).delay(2200), // ignored 
      Observable.of(2).delay(2300) 
     ) 
     .do(undefined, undefined,() => tearDown()) 
     .map(value => { 
      if (innerObs) { 
       innerSubject.next(value); 
       return null; 
      } 

      innerSubject = new BehaviorSubject(value); 

      delaySub = Observable.of(null).delay(1000).subscribe(() => { 
       innerObs = null; 
      }); 

      innerObs = innerSubject.distinctUntilChanged(); 
      return innerObs; 
     }) 
     // filter out all skipped Observable emissions 
     .filter(observable => observable) 
     .switch(); 
}); 

timedDistinctUntil 
    .timestamp() 
    .subscribe(
     value => console.log(value), 
     error => console.log('error: ' + error), 
     () => console.log('complete') 
    ); 

Zobacz demo na żywo: https://jsbin.com/sivuxo/5/edit?js,console

Cała logika jest owinięty w Observable.defer() metoda statyczna, ponieważ wymaga pewnych dodatkowych zmienne.

Kilka wskazuje, jak to wszystko działa:

  1. merge() jest źródłem przedmiotów.

  2. Używam do(), aby prawidłowo złapać po zakończeniu źródła, aby można było wyłączyć wewnętrzny zegar i wysłać odpowiednie pełne powiadomienie.

  3. Operator map() to miejsce, w którym odbywają się najciekawsze rzeczy. Ponownie zwracam wartość, którą otrzymałem, a następnie zwrócę null, jeśli istnieje już poprawna obserwowalna (została utworzona mniej niż 1000ms temu = innerObs != null). Następnie ostatecznie utworzę nowy temat, w którym zamierzam ponownie sprawdzić wszystkie pozycje i zwrócić je BehaviorSubject przykuty łańcuchem przy pomocy .distinctUntilChanged(). Na koniec planuję 1-sekundowe opóźnienie, aby ustawić innerObs = null, co oznacza, że ​​gdy pojawi się inna wartość, zwróci nowe Obserwowalne z nowym .distinctUntilChanged().

  4. Następnie filter() pozwoli mi zignorować wszystkie zwrócone wartości. Oznacza to, że nie będzie emitować nowego obserwatora więcej niż raz na sekundę.

  5. Teraz muszę pracować z tak zwanych wyższych rzędów obserwabli (obserwable emitujące obserwabli. Z tego powodu używam switch() operatora, że ​​zawsze subskrybuje tylko najnowszy Obserwowalne emitowanego przez źródło. W naszym przypadku możemy emitować obserwable tylko max - raz na sekundę (dzięki zastosowanemu powyżej filter()) i temu wewnętrznemu samemu Observable może emitować tyle wartości, ile chce i wszystkie zostaną przekazane przez distinctUntilChanged(), więc duplikaty są ignorowane.

Wyjście z tego demo będzie wyglądać następująco wyjścia:

Timestamp { value: 1, timestamp: 1484670434528 } 
Timestamp { value: 1, timestamp: 1484670436475 } 
Timestamp { value: 2, timestamp: 1484670436577 } 
complete 

Jak widać wartość 1 jest emitowany dwukrotnie cca 2s opóźnieniem. Jednak wartość 2 przeszła bez problemu po 100 ms dzięki distinctUntilChanged().

wiem, że to nie jest proste, ale mam nadzieję, że to ma sens do ciebie :)

+0

Nicea odpowiedź - lubiłem ten USECASE a także - może być nawet warto stworzyć operatora niestandardowy - mogę sobie wyobrazić, więcej ludzi będzie mieć ten problem. – olsn