Próbuję dowiedzieć się, jak błędy obsługi podczas mapowania elementów wewnątrz strumienia.Jak radzić sobie z błędami podczas wykonywania Flux.map()
Na przykład, jestem parsowania ciąg CSV do jednego z moich POJOs biznesowych:
myflux.map(stock -> converter.convertHistoricalCSVToStockQuotation(stock));
Niektóre z tych linii może zawierać błędy, więc co mam w dzienniku jest:
reactor.core.publisher.FluxLog: onNext([SOME_BOGUS_QUOTE]@38.09 (Fri Apr 08 00:00:00 CEST 2016) H(38.419998)/L(37.849998)/O(37.970001))
reactor.core.publisher.FluxLog: onNext([SOME_BOGUS_QUOTE]@38.130001 (Thu Apr 07 00:00:00 CEST 2016) H(38.189999)/L(37.610001)/O(37.799999))
reactor.core.publisher.FluxLog: onError(java.lang.IllegalArgumentException: Invalid CSV stock quotation: SOME_BOGUS_QUOTE,trololo)
reactor.core.publisher.FluxLog: java.lang.IllegalArgumentException: Invalid CSV stock quotation: SOME_BOGUS_QUOTE,trololo
czytam w API pewne metody obsługi błędów, ale większość skieruję do zwrócenie „wartość błędu” lub za pomocą fallback Flux, jak ten:
Flux.onErrorResumeWith(myflux, x -> Mono.fromCallable(() -> ... do stuff);
Jednak użycie tego z moim myflux
oznacza, że cały strumień jest przetwarzany ponownie.
Czy istnieje sposób radzenia sobie z błędami podczas przetwarzania poszczególnych elementów (np. Ich ignorowanie/rejestrowanie) i kontynuowania przetwarzania pozostałej części strumienia?
UPDATE z @akarnokd obejście
public Flux<StockQuotation> getQuotes(List<String> tickers)
{
Flux<StockQuotation> processingFlux = Flux.fromIterable(tickers)
// Get each set of quotes in a separate thread
.flatMap(s -> Mono.fromCallable(() -> feeder.getCSVQuotes(s)))
// Convert each list of raw quotes string in a new Flux<String>
.flatMap(list -> Flux.fromIterable(list))
// Convert the string to POJOs
.flatMap(x -> {
try {
return Flux.just(converter.convertHistoricalCSVToStockQuotation(x));
}
catch (IllegalArgumentException ex){
System.out.println("Error decoding stock quotation: " + x);
return Flux.empty();
}
});
return processingFlux;
}
To działa jak czar, jednak jak widać kod jest mniej elegancki niż dotychczas. Czy API Flux nie ma żadnej metody, aby zrobić to, co robi ten kod?
retry(...)
retryWhen(...)
onErrorResumeWith(...)
onErrorReturn(...)
Działa świetnie (zamiar przyjąć tę odpowiedź), ale chciałbym wiedzieć, czy można to zrobić z API. Jeśli nie, otworzę żądanie funkcji. Dzięki! – Victor
To jest standardowy interfejs API służący do wykonywania takiego zachowania. Błędy są zdarzeniami terminalowymi i musisz je przekształcić w coś innego w lambdach, aby uniknąć zakończenia. – akarnokd
Ok. Zaproponowałem stworzenie nowej metody radzenia sobie z poszczególnymi niepowodzeniami (być może publikowanie tych błędów jako strumienia "martwej litery"?). Może to może być pomocne ... – Victor