2015-10-18 18 views
7

możemy korzystać z pamięci podręcznej (operator), aby uniknąć wykonywania długiego zadania (żądania HTTP) wiele razy, i ponowne jej wynik:Obserwowalne, ponownie w przypadku błędu i pamięci podręcznej tylko wtedy, gdy zakończone

Observable apiCall = createApiCallObservable().cache(); // notice the .cache() 

--------------------------------------------- 
// the first time we need it 
apiCall.andSomeOtherStuff() 
       .subscribe(subscriberA); 

--------------------------------------------- 
//in the future when we need it again 
apiCall.andSomeDifferentStuff() 
       .subscribe(subscriberB); 

po raz pierwszy, Żądanie http jest wykonywane, ale po raz drugi, ponieważ użyliśmy operatora cache() , to żądanie nie zostanie wykonane, ale będziemy mogli ponownie użyć pierwszego wyniku.

Działa to poprawnie, gdy pierwsze żądanie zakończy się pomyślnie. Ale jeśli onError zostanie wywołany przy pierwszej próbie, to następnym razem, gdy nowy subskrybent zarejestruje się w ten sam sposób, onError zostanie wywołany ponownie bez próby ponownego wysłania żądania http.

Staramy się, aby jeśli onError został wywołany po raz pierwszy, to następnym razem, gdy ktoś subskrybuje to samo, co obserwowalne, żądanie http zostanie wykonane od podstaw. Oznacza to, że obserwowalne będą buforować wyłącznie skuteczne wywołania api, tj. te, dla których wywołano funkcję onCompleted.

Wszelkie pomysły na temat postępowania? Próbowaliśmy używać operatorów retry() i cache() bez większego szczęścia.

Odpowiedz

4

To rozwiązanie skończyło się, po rozbudowie rozwiązanie akarnokd za:

public class OnErrorRetryCache<T> { 

    public static <T> Observable<T> from(Observable<T> source) { 
     return new OnErrorRetryCache<>(source).deferred; 
    } 

    private final Observable<T> deferred; 
    private final Semaphore singlePermit = new Semaphore(1); 

    private Observable<T> cache = null; 
    private Observable<T> inProgress = null; 

    private OnErrorRetryCache(Observable<T> source) { 
     deferred = Observable.defer(() -> createWhenObserverSubscribes(source)); 
    } 

    private Observable<T> createWhenObserverSubscribes(Observable<T> source) 
    { 
     singlePermit.acquireUninterruptibly(); 

     Observable<T> cached = cache; 
     if (cached != null) { 
      singlePermit.release(); 
      return cached; 
     } 

     inProgress = source 
       .doOnCompleted(this::onSuccess) 
       .doOnTerminate(this::onTermination) 
       .replay() 
       .autoConnect(); 

     return inProgress; 
    } 

    private void onSuccess() { 
     cache = inProgress; 
    } 

    private void onTermination() { 
     inProgress = null; 
     singlePermit.release(); 
    } 
} 

Musieliśmy buforować wynik żądania HTTP z modernizacji. Zostało to stworzone, z obserwowalnym, które emituje jeden element na uwadze.

Jeżeli obserwator subskrybowanych natomiast żądanie http był stracony, chcieliśmy poczekać i nie realizuje wniosek dwukrotnie, chyba że ktoś zawiódł w toku. W tym celu semafor umożliwia pojedynczy dostęp do bloku, który tworzy lub zwraca obserwowalną pamięć podręczną, a jeśli zostanie utworzona nowa obserwowalna, czekamy, aż ta się zakończy. Testy dla powyższego można znaleźć here

3

Musisz wykonać pewne czynności związane z obsługą państwa. Oto, jak to zrobić:

public class CachedRetry { 

    public static final class OnErrorRetryCache<T> { 
     final AtomicReference<Observable<T>> cached = 
       new AtomicReference<>(); 

     final Observable<T> result; 

     public OnErrorRetryCache(Observable<T> source) { 
      result = Observable.defer(() -> { 
       for (;;) { 
        Observable<T> conn = cached.get(); 
        if (conn != null) { 
         return conn; 
        } 
        Observable<T> next = source 
          .doOnError(e -> cached.set(null)) 
          .replay() 
          .autoConnect(); 

        if (cached.compareAndSet(null, next)) { 
         return next; 
        } 
       } 
      }); 
     } 

     public Observable<T> get() { 
      return result; 
     } 
    } 

    public static void main(String[] args) { 
     AtomicInteger calls = new AtomicInteger(); 
     Observable<Integer> source = Observable 
       .just(1) 
       .doOnSubscribe(() -> 
        System.out.println("Subscriptions: " + (1 + calls.get()))) 
       .flatMap(v -> { 
        if (calls.getAndIncrement() == 0) { 
         return Observable.error(new RuntimeException()); 
        } 
        return Observable.just(42); 
       }); 

     Observable<Integer> o = new OnErrorRetryCache<>(source).get(); 

     o.subscribe(System.out::println, 
       Throwable::printStackTrace, 
       () -> System.out.println("Done")); 

     o.subscribe(System.out::println, 
       Throwable::printStackTrace, 
       () -> System.out.println("Done")); 

     o.subscribe(System.out::println, 
       Throwable::printStackTrace, 
       () -> System.out.println("Done")); 
    } 
} 

Działa poprzez buforowanie w pełni pomyślnego źródła i zwraca je wszystkim. W przeciwnym razie (częściowo) nie powiodło się źródło crear pamięci podręcznej, a następny obserwator wywołania wywoła resubscription.

+0

Dzięki akarnokd, to wygląda dobrze. Mam tylko pewne problemy, gdy źródłem jest długo działające żądanie http (kilka sekund), a drugie, trzecie subskrybowanie subskrybuje, podczas gdy pierwsze są w toku. W takim przypadku wszystkie zawodzą, a próba nie jest podejmowana więcej niż jeden raz. Operator cache() zachowuje się inaczej. Przyjrzę się temu bardziej i spróbuję powtórzyć problem, o którym wspomniałem, korzystając z przykładu, a wkrótce skontaktuję się z Tobą. – Plato

0

Czy rozważałeś użycie AsyncSubject do wdrożenia pamięci podręcznej dla żądania sieci? Zrobiłem przykładową aplikację RxApp, aby sprawdzić, jak to może działać. Używam modelu singleton, aby uzyskać odpowiedź z sieci. Dzięki temu można buforować odpowiedzi, uzyskiwać dostęp do danych z wielu fragmentów, subskrybować oczekujące żądania, a także dostarczać fałszywych danych do zautomatyzowanych testów interfejsu użytkownika.

+0

Jeśli użyliśmy modułu AsyncSubject, a pierwsze żądanie HTTP nie powiodło się, wszyscy przyszli subskrybenci otrzymają powiadomienie o błędzie, zamiast próby ponownej próby połączenia HTTP, prawda? – Plato

+0

To prawda, ale model powinien dostarczyć np. metoda reset(), którą interfejs użytkownika może wywołać po obsłudze błędu, pokazując błąd użytkownikowi. – pmellaaho

5

Dla każdego, kto wciąż jest zainteresowany, myślę, że mam lepszy sposób na osiągnięcie tego przy pomocy rx.

Kluczową informacją jest użycie metody onErrorResumeNext, która umożliwi zastąpienie obserwowalnego w przypadku błędu. tak powinno to wyglądać mniej więcej tak:

Observable<Object> apiCall = createApiCallObservable().cache(1); 
//future call 
apiCall.onErrorResumeNext(new Func1<Throwable, Observable<? extends Object>>() { 
    public Observable<? extends Object> call(Throwable throwable) { 
     return createApiCallObservable(); 
     } 
    }); 

ten sposób, jeśli pierwsze połączenie nie powiodło się w przyszłości połączenie będzie tylko przywołać go (tylko jeden raz).

, ale każdy inny rozmówca, który spróbuje użyć pierwszego obserwowalnego, nie spełni swojej prośby.

Podałeś odniesienie do oryginalnego obserwowalnego, zaktualizujmy go.

tak leniwy Getter:

Observable<Object> apiCall; 
private Observable<Object> getCachedApiCall() { 
    if (apiCall == null){ 
     apiCall = createApiCallObservable().cache(1); 
    } 
    return apiCall; 
} 

Teraz, przebojowa, że ​​ponowi próbę, jeśli poprzedni został failed:

private Observable<Object> getRetryableCachedApiCall() { 
    return getCachedApiCall().onErrorResumeNext(new Func1<Throwable, Observable<? extends Object>>() { 
     public Observable<? extends Object> call(Throwable throwable) { 
      apiCall = null; 
      return getCachedApiCall(); 
     } 
    }); 
} 

Należy pamiętać, że będzie to ponownie tylko raz za każdym razem jest to nazywa.

Teraz Twój kod będzie wyglądać mniej więcej tak:

--------------------------------------------- 
// the first time we need it - this will be without a retry if you want.. 
getCachedApiCall().andSomeOtherStuff() 
       .subscribe(subscriberA); 

--------------------------------------------- 
//in the future when we need it again - for any other call so we will have a retry 
getRetryableCachedApiCall().andSomeDifferentStuff() 
       .subscribe(subscriberB);