2016-09-17 9 views
5

Jak mogę zbudować obserwowalny, który emituje w pewnym określonym przedziale czasu, ale może również być emitowany, gdy emituje drugie obserwowalne, w którym to momencie interwał zostanie "zresetowany", aby rozpocząć emisję ponownie w pierwotny interwał rozpoczynający się od punktu drugiego pingowania?Jak zresetować Observable.interval

Na przykład, powiedzmy, że przerwa to 10 minut. To, co można zaobserwować, wyemituje na poziomie 10, 20, 30 itd. Ale powiedzmy, że drugie może być obserwowane w czasie emisji 15. Wtedy ogólne obserwowalne powinno zadzwonić pod 10, 15, 25, 35 itd.

Odpowiedz

1

Oto moja próba. Robi to, co chcesz, ale nie jest szczególnie elegancki.

import * as Rx from "rxjs/Rx"; 

const resetter = new Rx.Subject(); 

const resettableInterval = Rx.Observable.of(0) 
    .concat(resetter) 
    .switchMap((value, index) => { 

     let interval = Rx.Observable.interval(1000); 
     if (index > 0) { 
      interval = Rx.Observable.of(-1).concat(interval).map((value) => value + 1); 
     } 
     return interval; 
    }); 

const since = Date.now(); 
resettableInterval.subscribe(
    (value) => { console.log(`${((Date.now() - since)/1000).toFixed(1)}: ${value}`); } 
); 
setTimeout(() => { resetter.next(0); }, 1500); 

Początkowy obserwowalne zawiera pojedynczą wartość, która zaczyna kopać interwał korzystając switchMap. Obserwowalny resetter jest łączony, więc za każdym razem, gdy emituje interwał, jest resetowany. Parametr index dostarczany przez operatora switchMap służy do określania, czy emitowana jest wartość początkowa przedziału. (Jeśli nie dbają o pierwotnych liczb, które są emitowane, można usunąć map - jest stosowany wyłącznie w celu zapewnienia numery emitujące są wyzerowane, etc.)

Wyjście powinno być:

1.0: 0 
1.5: 0 
2.5: 1 
3.5: 2 
4.5: 3 
5.5: 4 
... 
5

Możesz switchMap pierwszy strumień pod drugim.

//Outer timer fires once initially and then every 15 minutes 
Rx.Observable.timer(0, 15 * 60 * 1000 /*15 minutes*/) 
    //Each outer event cancels the previous inner one and starts a new one 
    .switchMap(outer => Rx.Observable.interval(10 * 60 * 1000 /*10 minutes*/)) 
    .subscribe(x => console.log(x)); 

Wynikiem powyższego byłoby Observable który emituje co dziesięć minut, ale dostaje zresetować kiedy zewnętrzna Observable pożarów.

3

W kątową 4 udało mi się przywrócić Interval z następującym

private ngUnSubscribe: Subject<void> = new Subject<void>(); 

ngOnDestroy() { 
     this.ngUnSubscribe.next(); 
     this.ngUnSubscribe.complete(); 
} 

ngOnInit() { 
    this.pillar_timer_interval = IntervalObservable.create(3500); 
    this.startInterval(); 
} 

startIntervale() { 
    this.pillar_timer_interval 
    .takeUntil(this.ngUnSubscribe) 
    .subscribe((value) => { 
     //whatever function you calling every 3.5s 
    }); 
} 
resetInterval() { 
    this.ngUnSubscribe.next(); 
    this.startInterval(); 
}