2017-10-19 54 views
5

że mam API, które, na podstawie pewnych kryteriów zapytania, znajdzie lub budowę widżet:asynchroniczny, composable zwracana wartość dla danych wektorowych/potok w Javie 9

Widget getMatchingWidget(WidgetCriteria c) throws Throwable 

The (synchroniczne) wygląda kod klienta jak:

try { 
    Widget w = getMatchingWidget(criteria); 
    processWidget(w); 
} catch (Throwable t) { 
    handleError(t); 
} 

teraz powiedzieć odkrycie lub konstruowania widżet jest nieprzewidywalnie drogie, i nie chcę, aby zablokować klientów czekając na niego. Więc go zmienić na:

CompletableFuture<Widget> getMatchingWidget(WidgetCriteria c) 

Klienci mogą następnie napisać albo:

CompletableFuture<Widget> f = getMatchingWidget(criteria); 
f.thenAccept(this::processWidget) 
f.exceptionally(t -> { handleError(t); return null; }) 

lub:

getMatchingWidget(criteria).whenComplete((t, w) -> { 
    if (t != null) { handleError(t); } 
    else { processWidget(t); } 
}); 

Teraz, powiedzmy, że zamiast synchroniczny API może powrócić 0 do n widżetów :

Stream<Widget> getMatchingWidgets(WidgetCriteria c) 

Naiwnie, mogę napisać:

CompletableFuture<Stream<Widget>> getMatchingWidgets(WidgetCriteria c) 

to jednak faktycznie nie uczynić kod nieblokujące, to po prostu wypycha blokujące wokół - albo Future bloki aż wszystkie Widgets są dostępne, lub kod iterujący po blokach Stream czekających na każdy Widget. Co chcę to coś, co pozwoli mi przetworzyć każdy widżet po ich otrzymaniu, coś jak:

void forEachMatchingWidget(WidgetCriteria c, Consumer<Widget> widgetProcessor) 

Ale to nie oferuje obsługę błędów, a nawet gdybym dodatkową Consumer<Throwable> errorHandler, nie pozwól mi na przykład skompiluj moje pobieranie widgetów z innymi zapytaniami lub przekształć wyniki.

Poszukuję jakiejś rzeczy, która łączy w sobie cechy Stream (iterowalność, transformowalność) z cechami CompletableFuture (asynchroniczny wynik i obsługa błędów). (I chociaż jesteśmy przy tym, ciśnienie zwrotne może być miłe.)

Czy to jest java.util.concurrent.Flow.Publisher? An io.reactivex.Observable? Coś bardziej skomplikowanego? Coś prostszego?

+2

Byłbym skłonny popychać rzeczy do kolejki i wyciągać rzeczy z kolejki. –

Odpowiedz

6

Twój przypadek użycia w naturalny sposób wkracza w świat, do którego adresuje RxJava. Jeśli mamy do zaobserwowania:

Observable<Widget> getMatchingWidgets(wc); 

która produkuje zero lub więcej widżety na podstawie kryteriów, a następnie można przetworzyć każdy widżet jak wydaje się na podstawie:

getMatchingWidgets(wc) 
    .subscribeOn(backgroundScheduler) 
    .subscribe(w -> processWidget(w), 
       error -> handleError(error)); 

obserwowalnym łańcuch będzie działać na backgroundScheduler , który jest często opakowaniem dla usługi executora puli wątków.Jeśli trzeba zrobić ostateczną obróbkę każdego widgetu w interfejsie użytkownika, można użyć operatora observeOn(), aby przełączyć się do terminarza UI przed obróbką:

getMatchingWidgets(wc) 
    .subscribeOn(backgroundScheduler) 
    .observeOn(uiScheduler) 
    .subscribe(w -> processWidget(w), 
       error -> handleError(error)); 

Dla mnie elegancja podejścia RxJava to chodzi w płynny sposób obsługuje tak wiele nakrętek i śrub zarządzania rurociągiem. Patrząc na ten łańcuch obserwatorów, wiesz dokładnie, co się dzieje i gdzie.