2015-09-09 7 views
11

Dołączam dwa obserwowalne, aby wyświetlić dane z pamięci podręcznej, a po pierwsze rozpocząć ładowanie danych z sieci i pokazywać zaktualizowane dane.ReactXX concat nie produkuje OnNext z pierwszego obserwowalnego, jeśli sekunda nie powiedzie się natychmiast

Observable.concat(
    getContentFromCache.subscribeOn(dbScheduler), 
    getContentFromNetwork.subscibeOn(networkScheduler) 
).observeOn(AndroidSchedulers.mainThread()) 
.subscribe(subscriber); 

Jeśli nie ma połączenia sieciowego, drugie obserwowalne zawiedzie natychmiast po wywołaniu funkcji OnSubscribe.

W przypadku, gdy druga obserwowalna natychmiast ulegnie awarii, dane z pierwszego obserwowalnego zostaną utracone. Metoda onNext nigdy nie jest wywoływana w subskrybencie.

myślę, że to może być z powodu następującego kodu w OperatorConcat.ConcatSubscriber

@Override 
    public void onNext(Observable<? extends T> t) { 
     queue.add(nl.next(t)); 
     if (WIP_UPDATER.getAndIncrement(this) == 0) { 
      subscribeNext(); 
     } 
    } 

    @Override 
    public void onError(Throwable e) { 
     child.onError(e); 
     unsubscribe(); 
    } 

wygląda po otrzymywał błędu jest to unsubscribes, a wszystko w oczekiwaniu onNext zostaną utracone.

Jaki jest najlepszy sposób na rozwiązanie mojego problemu?

Aktualizacja

Wygląda na to, znalazłem rozwiązanie, zamiast ustawiania observOn dla łączonych obserwowalne ustawić observOn dla każdego widoczne.

+0

Czy kolejność pozycji jest dla ciebie ważna? – marwinXXII

+0

Tak, to ważne. –

Odpowiedz

3

domyślne zachowanie observeOn że onError zdarzenia mogą skakać z przodu kolejki, oto cytat z docs:

Zauważ, że onError powiadomienia odetnie przed onNext powiadomień na gwincie emisji, jeżeli Scheduler jest naprawdę asynchroniczny.

Oto Mały test w celu zilustrowania rzecz:

Scheduler newThreadScheduler = Schedulers.newThread(); 

Observable<Integer> stream = Observable.create(integerEmitter -> { 
    integerEmitter.onNext(1); 
    integerEmitter.onNext(2); 
    integerEmitter.onNext(3); 
    integerEmitter.onNext(4); 
    integerEmitter.onNext(5); 
    integerEmitter.onError(new RuntimeException()); 
}, Emitter.BackpressureMode.NONE); 

TestSubscriber<Integer> subscriber = new TestSubscriber<>(); 
stream.subscribeOn(Schedulers.computation()) 
     .observeOn(newThreadScheduler).subscribe(subscriber); 

subscriber.awaitTerminalEvent(); 

subscriber.assertValues(1, 2, 3, 4, 5); 
subscriber.assertError(RuntimeException.class); 

normalnie konsument może oczekiwać na następującej sekwencji: 1> 2> 3> 4> 5>Error. Ale użycie tylko observeOn może spowodować błąd i test zakończy się niepowodzeniem.

To zachowanie zostało zaimplementowane dawno temu tutaj https://github.com/ReactiveX/RxJava/issues/1680, sprawdź motywację, dlaczego tak zostało zrobione.Aby uniknąć takiego zachowania można użyć przeciążony observeOn z delayError parametru:

wskazuje, czy zgłoszenie onError nie może wyciąć przed onNext powiadomienia na drugiej stronie granicy planowania. Jeśli true sekwencja kończy się onError będą odtwarzane w tej samej kolejności, jak został przyjęty z upstream

To jest to, czego normalnie spodziewać, więc zmieniając observeOn(newThreadScheduler) do observeOn(newThreadScheduler, true) rozwiąże test.

Następnie na pytanie @Neil: dlaczego rozwiązanie zaproponowane przez @Rostyslav działa? Działa, ponieważ nie ma przełącznika wątku dla końcowej sekwencji.

W proponowanym rozwiązaniu końcowa sekwencja jest tworzona z dwóch sekwencji na tym samym wątku: pierwsza sekwencja to dane z pamięci podręcznej, druga sekwencja to po prostu błąd z sieci. Są one tworzone razem na tym samym wątku, a po wyłączeniu przełącznika wątku - subskrybent obserwuje na urządzeniu AndroidSchedulers.mainThread(). Jeśli spróbujesz zmienić ostatnią Scheduler na inną, to się to nie uda.

+0

Dzięki, zaoszczędziłem mój dzień. Możemy również dodać .onErrorResumeNext (Observable.emtpy()) do sieci obserwowalnej, ale wtedy nie zwróci ona błędu. – SohailAziz

2

Operatory w RxJava są zaprojektowane do generowania ogólnych powiadomień o wystąpieniu błędów. Ponieważ łączone obserwowalne są asynchroniczne źródła, to występuje zwarcie. Jeśli nie chcesz, aby zwarcie wtedy można zrobić concat na zmaterializowanych obserwabli a następnie wykonać obróbkę pragnienie:

Observable.concat(
    getContentFromCache.materialize().subscribeOn(dbScheduler), 
    getContentFromNetwork.materialize().subscribeOn(networkScheduler) 
) 

Innym rozwiązaniem byłoby użyć onErrorResumeNext:

Observable.concat(
    getContentFromCache.subscribeOn(dbScheduler), 
    getContentFromNetwork.onErrorResumeNext(something) 
     .subscibeOn(networkScheduler) 
) 
+1

Nie chcę spożywać błędów, ponieważ subskrybent wie, jak poprawnie przetwarzać. –

+0

Myślę, że materializowanie może mi pomóc. Istnieje jednak problem, po zdematerializowaniu obserwowalnego konkludu, pojawienie się pierwszego i drugiego obserwowalnego obiektu. A po zdematerializowaniu pierwszego abonenta powiadamiania OnCompleted nie otrzymają następujących pozycji. –

1

@ Rostyslav, twoja aktualizacja z pewnością załatwia sprawę. Walczyłem ze scenariuszem EXACT, który opisałeś. Czy możesz wyjaśnić, dlaczego działa to jednak? Nie do końca rozumiem, jak poruszanie obserwacją do wewnętrznych obserwowalnych działało, gdy nie miało to miejsca w obserwowalnym sprzęcie.