2016-03-26 18 views
7

Próbuję dowiedzieć się, jak błędy obsługi podczas mapowania elementów wewnątrz strumienia.Jak radzić sobie z błędami podczas wykonywania Flux.map()

Na przykład, jestem parsowania ciąg CSV do jednego z moich POJOs biznesowych:

myflux.map(stock -> converter.convertHistoricalCSVToStockQuotation(stock)); 

Niektóre z tych linii może zawierać błędy, więc co mam w dzienniku jest:

reactor.core.publisher.FluxLog: onNext([SOME_BOGUS_QUOTE]@38.09 (Fri Apr 08 00:00:00 CEST 2016) H(38.419998)/L(37.849998)/O(37.970001)) 
reactor.core.publisher.FluxLog: onNext([SOME_BOGUS_QUOTE]@38.130001 (Thu Apr 07 00:00:00 CEST 2016) H(38.189999)/L(37.610001)/O(37.799999)) 
reactor.core.publisher.FluxLog: onError(java.lang.IllegalArgumentException: Invalid CSV stock quotation: SOME_BOGUS_QUOTE,trololo) 
reactor.core.publisher.FluxLog: java.lang.IllegalArgumentException: Invalid CSV stock quotation: SOME_BOGUS_QUOTE,trololo 

czytam w API pewne metody obsługi błędów, ale większość skieruję do zwrócenie „wartość błędu” lub za pomocą fallback Flux, jak ten:

Flux.onErrorResumeWith(myflux, x -> Mono.fromCallable(() -> ... do stuff); 

Jednak użycie tego z moim myflux oznacza, że ​​cały strumień jest przetwarzany ponownie.

Czy istnieje sposób radzenia sobie z błędami podczas przetwarzania poszczególnych elementów (np. Ich ignorowanie/rejestrowanie) i kontynuowania przetwarzania pozostałej części strumienia?

UPDATE z @akarnokd obejście

public Flux<StockQuotation> getQuotes(List<String> tickers) 
{ 
    Flux<StockQuotation> processingFlux = Flux.fromIterable(tickers) 
    // Get each set of quotes in a separate thread 
    .flatMap(s -> Mono.fromCallable(() -> feeder.getCSVQuotes(s))) 
    // Convert each list of raw quotes string in a new Flux<String> 
    .flatMap(list -> Flux.fromIterable(list)) 
    // Convert the string to POJOs 
    .flatMap(x -> { 
      try { 
       return Flux.just(converter.convertHistoricalCSVToStockQuotation(x));  
      } 
      catch (IllegalArgumentException ex){ 
       System.out.println("Error decoding stock quotation: " + x); 
       return Flux.empty(); 
      } 
    }); 

    return processingFlux; 
} 

To działa jak czar, jednak jak widać kod jest mniej elegancki niż dotychczas. Czy API Flux nie ma żadnej metody, aby zrobić to, co robi ten kod?

retry(...) 
retryWhen(...) 
onErrorResumeWith(...) 
onErrorReturn(...) 

Odpowiedz

5

Trzeba flatMap zamiast która pozwala ci zwróci pusty ciąg jeśli przetwarzanie nie powiodło się:

myflux.flatMap(v -> { 
    try { 
     return Flux.just(converter.convertHistoricalCSVToStockQuotation(stock)); 
    } catch (IllegalArgumentException ex) { 
     return Flux.empty(); 
    } 
}); 
+0

Działa świetnie (zamiar przyjąć tę odpowiedź), ale chciałbym wiedzieć, czy można to zrobić z API. Jeśli nie, otworzę żądanie funkcji. Dzięki! – Victor

+0

To jest standardowy interfejs API służący do wykonywania takiego zachowania. Błędy są zdarzeniami terminalowymi i musisz je przekształcić w coś innego w lambdach, aby uniknąć zakończenia. – akarnokd

+0

Ok. Zaproponowałem stworzenie nowej metody radzenia sobie z poszczególnymi niepowodzeniami (być może publikowanie tych błędów jako strumienia "martwej litery"?). Może to może być pomocne ... – Victor