2015-02-09 15 views
18

W this blog, podaje this (skopiować/wkleić następujący kod) przykład piekła oddzwonienia. Nie ma jednak wzmianki o tym, w jaki sposób można wyeliminować problem za pomocą rozszerzeń reaktywnych.Jak komponować Observables w celu uniknięcia podanych zagnieżdżonych i zależnych wywołań zwrotnych?

Tak więc tutaj F3 zależy od wypełnienia F1, a F4 i F5 zależą od zakończenia F2.

  1. Zastanawiasz się, jaki byłby funkcjonalny odpowiednik w Rx.
  2. Jak przedstawić w Rx, że F1, F2, F3, F4 i F5 powinny być ciągnione asynchronicznie?

UWAGA: Jestem obecnie próbuje owinąć głowę wokół Rx więc nie spróbować rozwiązać ten przykład przed tym pytaniem.

import java.util.concurrent.CountDownLatch; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.LinkedBlockingQueue; 
import java.util.concurrent.ThreadPoolExecutor; 
import java.util.concurrent.TimeUnit; 
import java.util.concurrent.atomic.AtomicReference; 

public class CallbackB { 

    /** 
    * Demonstration of nested callbacks which then need to composes their responses together. 
    * <p> 
    * Various different approaches for composition can be done but eventually they end up relying upon 
    * synchronization techniques such as the CountDownLatch used here or converge on callback design 
    * changes similar to <a href="https://github.com/Netflix/RxJava">Rx</a>. 
    */ 
    public static void run() throws Exception { 
     final ExecutorService executor = new ThreadPoolExecutor(4, 4, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>()); 
     /* the following are used to synchronize and compose the asynchronous callbacks */ 
     final CountDownLatch latch = new CountDownLatch(3); 
     final AtomicReference<String> f3Value = new AtomicReference<String>(); 
     final AtomicReference<Integer> f4Value = new AtomicReference<Integer>(); 
     final AtomicReference<Integer> f5Value = new AtomicReference<Integer>(); 

     try { 
      // get f3 with dependent result from f1 
      executor.execute(new CallToRemoteServiceA(new Callback<String>() { 

       @Override 
       public void call(String f1) { 
        executor.execute(new CallToRemoteServiceC(new Callback<String>() { 

         @Override 
         public void call(String f3) { 
          // we have f1 and f3 now need to compose with others 
          System.out.println("intermediate callback: " + f3 + " => " + ("f4 * f5")); 
          // set to thread-safe variable accessible by external scope 
          f3Value.set(f3); 
          latch.countDown(); 
         } 

        }, f1)); 
       } 

      })); 

      // get f4/f5 after dependency f2 completes 
      executor.execute(new CallToRemoteServiceB(new Callback<Integer>() { 

       @Override 
       public void call(Integer f2) { 
        executor.execute(new CallToRemoteServiceD(new Callback<Integer>() { 

         @Override 
         public void call(Integer f4) { 
          // we have f2 and f4 now need to compose with others 
          System.out.println("intermediate callback: f3" + " => " + (f4 + " * f5")); 
          // set to thread-safe variable accessible by external scope 
          f4Value.set(f4); 
          latch.countDown(); 
         } 

        }, f2)); 
        executor.execute(new CallToRemoteServiceE(new Callback<Integer>() { 

         @Override 
         public void call(Integer f5) { 
          // we have f2 and f5 now need to compose with others 
          System.out.println("intermediate callback: f3" + " => " + ("f4 * " + f5)); 
          // set to thread-safe variable accessible by external scope 
          f5Value.set(f5); 
          latch.countDown(); 
         } 

        }, f2)); 
       } 

      })); 

      /* we must wait for all callbacks to complete */ 
      latch.await(); 
      System.out.println(f3Value.get() + " => " + (f4Value.get() * f5Value.get())); 
     } finally { 
      executor.shutdownNow(); 
     } 
    } 

    public static void main(String[] args) { 
     try { 
      run(); 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } 
    } 

    private static final class CallToRemoteServiceA implements Runnable { 

     private final Callback<String> callback; 

     private CallToRemoteServiceA(Callback<String> callback) { 
      this.callback = callback; 
     } 

     @Override 
     public void run() { 
      // simulate fetching data from remote service 
      try { 
       Thread.sleep(100); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
      callback.call("responseA"); 
     } 
    } 

    private static final class CallToRemoteServiceB implements Runnable { 

     private final Callback<Integer> callback; 

     private CallToRemoteServiceB(Callback<Integer> callback) { 
      this.callback = callback; 
     } 

     @Override 
     public void run() { 
      // simulate fetching data from remote service 
      try { 
       Thread.sleep(40); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
      callback.call(100); 
     } 
    } 

    private static final class CallToRemoteServiceC implements Runnable { 

     private final Callback<String> callback; 
     private final String dependencyFromA; 

     private CallToRemoteServiceC(Callback<String> callback, String dependencyFromA) { 
      this.callback = callback; 
      this.dependencyFromA = dependencyFromA; 
     } 

     @Override 
     public void run() { 
      // simulate fetching data from remote service 
      try { 
       Thread.sleep(60); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
      callback.call("responseB_" + dependencyFromA); 
     } 
    } 

    private static final class CallToRemoteServiceD implements Runnable { 

     private final Callback<Integer> callback; 
     private final Integer dependencyFromB; 

     private CallToRemoteServiceD(Callback<Integer> callback, Integer dependencyFromB) { 
      this.callback = callback; 
      this.dependencyFromB = dependencyFromB; 
     } 

     @Override 
     public void run() { 
      // simulate fetching data from remote service 
      try { 
       Thread.sleep(140); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
      callback.call(40 + dependencyFromB); 
     } 
    } 

    private static final class CallToRemoteServiceE implements Runnable { 

     private final Callback<Integer> callback; 
     private final Integer dependencyFromB; 

     private CallToRemoteServiceE(Callback<Integer> callback, Integer dependencyFromB) { 
      this.callback = callback; 
      this.dependencyFromB = dependencyFromB; 
     } 

     @Override 
     public void run() { 
      // simulate fetching data from remote service 
      try { 
       Thread.sleep(55); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
      callback.call(5000 + dependencyFromB); 
     } 
    } 

    private static interface Callback<T> { 
     public void call(T value); 
    } 
} 
+0

Nie jestem mistrzem Rx, ale z sesji, którą dał: możesz uniknąć piekła oddzwaniania, komponując Observables. Jeśli @benjchristensen jest w pobliżu - może być w stanie podać więcej szczegółów. – alfasin

+2

@alfasin Dowiedziałem się wystarczająco dużo o RxJavie, aby wiedzieć, że należy go rozwiązać, tworząc obserwowalne elementy. Pytanie brzmi: W JAKI SPOSÓB można to skomponować :-) –

+0

W takim przypadku zmieniłbym pytanie z "jak uniknąć piekła oddzwaniania?" do "jak komponować obserwowalne?";) – alfasin

Odpowiedz

27

Jestem oryginalny autor odwołuje blogu opublikuj informacje o callbackach i Java Futures. Oto przykład użycia flatMap, zip i scalania do asynchronicznego wykonywania kompozycji usług.

Pobiera obiekt użytkownika, a następnie jednocześnie pobiera dane Social i PersonalizedCatalog, a następnie dla każdego wideo z PersonalizedCatalogu jednocześnie pobiera Bookmark, Rating i Metadata, zamienia je razem i łączy wszystkie odpowiedzi w strumieniu progresywnym jako Zdarzenia wysłane przez serwer.

return getUser(userId).flatMap(user -> { 
    Observable<Map<String, Object>> catalog = getPersonalizedCatalog(user) 
      .flatMap(catalogList -> catalogList.videos().<Map<String, Object>> flatMap(
        video -> { 
         Observable<Bookmark> bookmark = getBookmark(video); 
         Observable<Rating> rating = getRatings(video); 
         Observable<VideoMetadata> metadata = getVideoMetadata(video); 
         return Observable.zip(bookmark, rating, metadata, (b, r, m) -> combineVideoData(video, b, r, m)); 
        })); 

    Observable<Map<String, Object>> social = getSocial(user).map(s -> { 
     return s.getDataAsMap(); 
    }); 

    return Observable.merge(catalog, social); 
}).flatMap(data -> { 
    String json = SimpleJson.mapToJson(data); 
    return response.writeStringAndFlush("data: " + json + "\n"); 
}); 

Ten przykład może być postrzegane w kontekście aplikacji działającej na https://github.com/Netflix/ReactiveLab/blob/952362b89a4d4115ae0eecf0e73f273ecb27ba98/reactive-lab-gateway/src/main/java/io/reactivex/lab/gateway/routes/RouteForDeviceHome.java#L33

Ponieważ nie mogę ewentualnie dostarczyć wszystkich informacji, tutaj można również znaleźć wyjaśnienie w formie prezentacji (z linkiem do filmu) o https://speakerdeck.com/benjchristensen/reactive-streams-with-rx-at-javaone-2014?slide=32.

+0

Dziękuję. Zakładam, że implementacje getBookmark (...) itp. Używają wewnętrznie subscribeOn (Schedulers.io()) w prawo? Ponadto operacja zip działa na wątku wywołującym. –

+1

Tak, mogą używać subscribeOn z programem planującym IO do blokowania IO asynchronizacji. W Netflix zazwyczaj korzystamy z Hystrix, aby zająć się tym dla nas i zapewnić masowe nagłówki, limity czasowe, awarie, dane itp. Jeśli IO nie blokuje, np. Przez Netty, subskrybowanie jest zbędne. – benjchristensen

5

Zgodnie z kodem. Załóżmy, że zdalne wywołanie jest wykonywane przy użyciu Observable.

Observable<Integer> callRemoveServiceA() { /* async call */ } 

/* .... */ 

Observable<Integer> callRemoveServiceE(Integer f2) { /* async call */ } 

co chcesz:

  • połączenia serviceA następnie wywołać serviceB z wynikiem serviceA
  • rozmowy serviceC następnie wywołać serviceD i serviceE z wynikiem serviceC
  • z wynikiem serviceE i serviceD, zbuduj nową wartość
  • wyświetlić nową wartość wyniku serviceB

Z RxJava, można to osiągnąć z tym kodem:

Observable<Integer> f3 = callRemoveServiceA() // call serviceA 
      // call serviceB with the result of serviceA 
      .flatMap((f1) -> callRemoveServiceB(f1)); 


Observable<Integer> f4Andf5 = callRemoveServiceC() // call serviceC 
        // call serviceD and serviceE then build a new value 
        .flatMap((f2) -> callRemoveServiceD(f2).zipWith(callRemoveServiceE(f2), (f4, f5) -> f4 * f5)); 

// compute the string to display from f3, and the f4, f5 pair 
f3.zipWith(f4Andf5, (childF3, childF4Andf5) -> childF3 + " => " + childF4Andf5) 
      // display the value 
      .subscribe(System.out::println); 

ważnym elementem jest tu zastosowanie flapMapzip i (lub zipWith)

można uzyskać więcej informacji na temat flapMap tutaj: When do you use map vs flatMap in RxJava?

+0

RW tym przypadku mogę założyć, że po zasubskrybowaniu f4Andf5 może wykonać szybciej niż f3. Czy usługa A i B trwa dłużej? Pytam, ponieważ w przypadku przyszłości obie usługi A i B powinny powrócić, zanim inni będą mogli działać. –

+0

Tak, jeśli każde wywołanie usługi zdalnej jest asynchroniczne. – dwursteisen