2016-06-16 22 views
16

W documentation for RetryWhen przykładzie nie idzie tak:błąd Złap jeśli retryWhen: s prób zabraknie

Observable.create((Subscriber<? super String> s) -> { 
    System.out.println("subscribing"); 
    s.onError(new RuntimeException("always fails")); 
}).retryWhen(attempts -> { 
    return attempts.zipWith(Observable.range(1, 3), (n, i) -> i).flatMap(i -> { 
     System.out.println("delay retry by " + i + " second(s)"); 
     return Observable.timer(i, TimeUnit.SECONDS); 
    }); 
}).toBlocking().forEach(System.out::println); 

Ale w jaki sposób propagować błąd, jeśli zabraknie prób?

Dodanie .doOnError(System.out::println)po klauzuli retryWhen nie przechwytuje błędu. Czy to jest nawet emitowane?

Dodawanie .doOnError(System.out::println)przed retryWhen wyświetlacze always fails dla wszystkich prób.

Odpowiedz

1

Jedną z opcji jest użycie Observable.materialize() do konwersji elementów Observable.range() na powiadomienia. Potem raz onCompleted() jest wydawane, można propagować błąd w dół (w próbce poniżej Pair służy do zawijania Observable.range() powiadomień i wyjątek od Observable)

@Test 
    public void retryWhen() throws Exception { 

    Observable.create((Subscriber<? super String> s) -> { 
     System.out.println("subscribing"); 
     s.onError(new RuntimeException("always fails")); 
    }).retryWhen(attempts -> { 
     return attempts.zipWith(Observable.range(1, 3).materialize(), Pair::new) 
      .flatMap(notifAndEx -> { 
      System.out.println("delay retry by " + notifAndEx + " second(s)"); 
      return notifAndEx.getRight().isOnCompleted() 
        ? Observable.<Integer>error(notifAndEx.getLeft()) 
        : Observable.timer(notifAndEx.getRight().getValue(), TimeUnit.SECONDS); 
     }); 
    }).toBlocking().forEach(System.out::println); 
} 

    private static class Pair<L,R> { 
     private final L left; 
     private final R right; 

     public Pair(L left, R right) { 
      this.left = left; 
      this.right = right; 
     } 

     public L getLeft() { 
      return left; 
     } 

     public R getRight() { 
      return right; 
     } 
    } 
0

można uzyskać żądane zachowanie pomocą RetryWhen budowniczy w rxjava-extras który jest na Maven Central. Użyj najnowszej wersji.

Observable.create((Subscriber<? super String> s) -> { 
    System.out.println("subscribing"); 
    s.onError(new RuntimeException("always fails")); 
}) 
.retryWhen(RetryWhen 
    .delays(Observable.range(1, 3) 
       .map(n -> (long) n), 
      TimeUnit.SECONDS).build()) 
.doOnError(e -> e.printStackTrace()) 
.toBlocking().forEach(System.out::println); 
+0

Hmm, dlaczego głosowanie w dół? To jest biblioteka z testami jednostkowymi. –

7

W Javadoc dla retryWhen stwierdza, że:

If that Observable calls onComplete or onError then retry will call onCompleted or onError on the child subscription.

Mówiąc prościej, jeśli chcesz propagować wyjątek, trzeba rethrow oryginalny wyjątek raz miałeś wystarczająco dużo powtórzeń.

Łatwym sposobem jest ustawienie wartości o 1 razy większej niż liczba powtórzeń.

Następnie w funkcji zip przetestuj bieżącą liczbę ponownych prób. Jeśli jest równa NUMBER_OF_RETRIES + 1, zwróć Observable.error(throwable) lub ponownie wyrzuć wyjątek.

EG

Observable.create((Subscriber<? super String> s) -> { 
      System.out.println("subscribing"); 
      s.onError(new RuntimeException("always fails")); 
     }).retryWhen(attempts -> { 
      return attempts.zipWith(Observable.range(1, NUMBER_OF_RETRIES + 1), (throwable, attempt) -> { 
       if (attempt == NUMBER_OF_RETRIES + 1) { 
        throw Throwables.propagate(throwable); 
       } 
       else { 
        return attempt; 
       } 
      }).flatMap(i -> { 
       System.out.println("delaying retry by " + i + " second(s)"); 
       return Observable.timer(i, TimeUnit.SECONDS); 
      }); 
     }).toBlocking().forEach(System.out::println); 

Tak na marginesie doOnError nie wpływa obserwowalnym w jakikolwiek sposób - po prostu zapewnia z hakiem, aby wykonać pewne działania w przypadku wystąpienia błędu. Typowym przykładem jest rejestrowanie.

-1

Trzeba użyć onErrorResumeNext po retryWhen

W przykładzie

Observable.create((Subscriber<? super String> s) -> { 
     System.out.println("subscribing"); 
     s.onError(new RuntimeException("always fails")); 
    }).retryWhen(attempts -> { 
     return attempts.zipWith(Observable.range(1, NUMBER_OF_RETRIES + 1), (n, i) -> { 
      if (i == NUMBER_OF_RETRIES + 1) { 
       throw Throwables.propagate(n); 
      } 
      else { 
       return i; 
      } 
     }).flatMap(i -> { 
      System.out.println("delay retry by " + i + " second(s)"); 
      return Observable.timer(i, TimeUnit.SECONDS); 
     }); 
    }) 
    .onErrorResumeNext(t -> {System.out.println("Error after all retries:" + t.getMessage()); 
               return Observable.error(t); 
              }) 
    .toBlocking().forEach(System.out::println); 

Na dole tej klasie można zobaczyć praktyczny przykład, aby zrozumieć, jak działa. https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/errors/ObservableExceptions.java

-1

Można użyć funkcji skanowania, która zwraca parę z nagromadzonych indeksu i zdecydować, czy przekazać błąd:

.retryWhen(attempts -> 
    return .scan(Pair.create(0, null), (index, value) -> Pair.create(index.first + 1, value)) 
      .flatMap(pair -> { 
       if(pair.first > MAX_RETRY_COUNT) { 
        throw new RuntimeException(pair.second); 
       } 
       return Observable.timer(pair.first, TimeUnit.SECONDS); 
      }); 

Lub można trzymać z zipWith operatora ale zwiększenia liczby w range Obserwować i zwracać parę, zamiast samego indeksu.W ten sposób nie stracisz informacji o poprzednim throwable.

attempts 
    .zipWith(Observable.range(1, MAX_RETRY_COUNT + 1), (throwable, i) -> Pair.create(i, throwable)) 
    .flatMap(pair -> { 
     if(pair.first > MAX_RETRY_COUNT) throw new RuntimeException(pair.second); 
     System.out.println("delay retry by " + pair.first + " second(s)"); 
     return Observable.timer(pair.first, TimeUnit.SECONDS); 
    }); 
9

Doc dla retryWhen mówi, że przechodzi onError zgłaszania swoich abonentów i kończy. Możesz więc zrobić coś takiego:

final int ATTEMPTS = 3; 

    Observable.create((Subscriber<? super String> s) -> { 
     System.out.println("subscribing"); 
     s.onError(new RuntimeException("always fails")); 
    }).retryWhen(attempts -> attempts 
      .zipWith(Observable.range(1, ATTEMPTS), (n, i) -> 
        i < ATTEMPTS ? 
          Observable.timer(i, SECONDS) : 
          Observable.error(n)) 
      .flatMap(x -> x)) 
      .toBlocking() 
      .forEach(System.out::println);