2015-05-13 7 views
30

Mam wiele problemów ze zrozumieniem operatora zip w RxJava dla mojego projektu Android. Problem Muszę być w stanie wysłać żądanie sieciowe, aby przesłać wideo Następnie muszę wysłać żądanie sieciowe, aby przesłać zdjęcie, aby przejść z nim w końcu muszę dodać opis i użyć odpowiedzi z poprzednich dwóch prosi o przesłanie adresów URL obrazu i obrazu wraz z opisem do mojego serwera.Rxjava Android, jak korzystać z operatora Zip

Założę się, że operator zip byłby idealny do tego zadania, ponieważ zrozumiałam, że możemy podjąć reakcję dwóch obserwowalnych (żądania wideo i obrazów) i użyć ich do mojego ostatniego zadania. Ale nie wydaje mi się, aby do tego doszło, jak sobie to wyobrażam.

Szukam kogoś, kto odpowie na pytanie, jak można to zrobić koncepcyjnie z odrobiną kodu psuedo. Dziękuję operatorowi

Odpowiedz

40

Operator pocztowy ściśle paruje emitowane przedmioty z obserwowalnych. Czeka na oba (lub więcej) elementów, aby dotrzeć, a następnie łączy je. Więc tak, to byłoby odpowiednie dla twoich potrzeb.

Chciałbym użyć Func2 do połączenia wyników z dwóch pierwszych obserwowalnych. Zauważ, że takie podejście byłoby prostsze, jeśli użyjesz Retrofit, ponieważ jego interfejs API może zwrócić obserwowalne. W przeciwnym razie musisz stworzyć własne obserwowalne.

// assuming each observable returns response in the form of String 
Observable<String> movOb = Observable.create(...); 
// if you use Retrofit 
Observable<String> picOb = RetrofitApiManager.getService().uploadPic(...), 
Observable.zip(movOb, picOb, 

    new Func2<String, String, MyResult>() { 

     @Override 
     public MyResult call(String movieUploadResponse, 
      String picUploadResponse) { 
      // analyze both responses, upload them to another server 
      // and return this method with a MyResult type 
      return myResult; 
     } 
     } 
) 
// continue chaining this observable with subscriber 
// or use it for something else 
+0

Problem, który napotykam, polega na tym, że mam pogląd, że odpalam zadania obserwowalnego wideo, a inny, który pozwala obserwować obraz, i jeszcze jeden, który powinien wziąć oba te wyniki i wykorzystać je do ostatecznego obserwowalnego. Czy zip zwróci mi wyniki obserwowalnych już wykonanych? – feilong

+0

Nie sądzę, że można wykonać obserwowalne, ale myślę, że rozumiem twój punkt widzenia. Obserwowalny nie będzie działać, dopóki nie zostanie przyłączony do niego abonent. Więc cały obserwowalny układ musi zostać zrobiony przed przyłączeniem subskrybenta. – inmyth

+1

Zgadzam się z twoim punktem, że "operator zip pozwala ci skomponować wynik z wyników dwóch różnych obserwowalnych". Teraz chcę, aby obserwowalne 2 było zależne od obserwowalnego 1, więc obserwowalne 1 powinno zostać wykonane przed obserwowalnym 2, a następnie muszę połączyć wynik obydwu obserwowalnych. Czy mamy do tego jakiegokolwiek operatora? Zip może wykonać tę pracę. Nie chcę używać flatMap, ponieważ konwertuje jeden strumień do drugiego, ale tutaj potrzebuję ustawić zależność, a następnie skompresować wynik. Proszę odpowiedz. –

0

zip pozwala na komponowanie wyniku z dwóch różnych obserwowalnych.

Będziesz musiał podać am lambda, która stworzy wynik z danych emitowanych przez każde obserwowalne.

Observable<MovieResponse> movies = ... 
Observable<PictureResponse> picture = ... 

Observable<Response> response = movies.zipWith(picture, (movie, pic) -> { 
     return new Response("description", movie.getName(), pic.getUrl()); 

}); 
+0

Zgadzam się z twoim punktem, że "operator zip pozwala ci skomponować wynik z wyników dwóch różnych obserwowalnych". Teraz chcę, aby obserwowalne 2 było zależne od obserwowalnego 1, więc obserwowalne 1 powinno zostać wykonane przed obserwowalnym 2, a następnie muszę połączyć wynik obydwu obserwowalnych. Czy mamy do tego jakiegokolwiek operatora? Zip może wykonać tę pracę. Nie chcę używać flatMap, ponieważ konwertuje jeden strumień do drugiego, ale tutaj potrzebuję ustawić zależność, a następnie skompresować wynik. Proszę odpowiedz. –

39

Krok po kroku: -> Niech najpierw określić nasz obiekt modernizacyjny dostępu do GitHub API, instalacja dwóch obserwable dla dwóch zapytań sieciowych powyżej:

Retrofit repo = new Retrofit.Builder() 
     .baseUrl("https://api.github.com") 
     .addConverterFactory(GsonConverterFactory.create()) 
     .addCallAdapterFactory(RxJavaCallAdapterFactory.create()) 
     .build(); 

Observable<JsonObject> userObservable = repo 
     .create(GitHubUser.class) 
     .getUser(loginName) 
     .subscribeOn(Schedulers.newThread()) 
     .observeOn(AndroidSchedulers.mainThread()); 

Observable<JsonArray> eventsObservable = repo 
     .create(GitHubEvents.class) 
     .listEvents(loginName) 
     .subscribeOn(Schedulers.newThread()) 
     .observeOn(AndroidSchedulers.mainThread()); 

Interfejsy modernizacyjne są proste wystarczy:

public interface GitHubUser { 
    @GET("users/{user}") 
    Observable<JsonObject> getUser(@Path("user") String user); 
} 

public interface GitHubEvents { 
    @GET("users/{user}/events") 
    Observable<JsonArray> listEvents(@Path("user") String user); 
} 

Ostatnio używamy RxJava s metoda zip połączyć nasze dwie obserwabli i czekać na ich ukończyć przed utworzeniem nowej widoczne.

Observable<UserAndEvents> combined = Observable.zip(userObservable, eventsObservable, new Func2<JsonObject, JsonArray, UserAndEvents>() { 
    @Override 
    public UserAndEvents call(JsonObject jsonObject, JsonArray jsonElements) { 
    return new UserAndEvents(jsonObject, jsonElements); 
    } 
}); 

Jaka jest UserAndEvents?To tylko proste POJO połączyć dwa obiekty:

public class UserAndEvents { 
    public UserAndEvents(JsonObject user, JsonArray events) { 
    this.events = events; 
    this.user = user; 
    } 

    public JsonArray events; 
    public JsonObject user; 
} 

Finally Nazwijmy subskrybować metodę na naszym nowym połączeniu Obserwowalne:

combined.subscribe(new Subscriber<UserAndEvents>() { 
      ... 
      @Override 
      public void onNext(UserAndEvents o) { 
      // You can access the results of the 
      // two observabes via the POJO now 
      } 
     }); 
+0

Czy jest jakaś możliwość, że zastąpiono procedury Body? Żądania body wysyłają pierwsze żądanie wysłane jako params drugiego żądania.Wydaje się, że w jednym z moich zipowych połączeń @ramkailash – Killer

2

Tutaj mam przykład, że zrobiłem używając Zip w sposób asynchroniczny, na wszelki wypadek you're ciekawy

 /** 
* Since every observable into the zip is created to subscribeOn a diferent thread, it´s means all of them will run in parallel. 
* By default Rx is not async, only if you explicitly use subscribeOn. 
    */ 
@Test 
public void testAsyncZip() { 
    scheduler = Schedulers.newThread(); 
    scheduler1 = Schedulers.newThread(); 
    scheduler2 = Schedulers.newThread(); 
    long start = System.currentTimeMillis(); 
    Observable.zip(obAsyncString(), obAsyncString1(), obAsyncString2(), (s, s2, s3) -> s.concat(s2) 
                         .concat(s3)) 
       .subscribe(result -> showResult("Async in:", start, result)); 
} 

/** 
* In this example the the three observables will be emitted sequentially and the three items will be passed to the pipeline 
*/ 
@Test 
public void testZip() { 
    long start = System.currentTimeMillis(); 
    Observable.zip(obString(), obString1(), obString2(), (s, s2, s3) -> s.concat(s2) 
                     .concat(s3)) 
       .subscribe(result -> showResult("Sync in:", start, result)); 
} 


public void showResult(String transactionType, long start, String result) { 
    System.out.println(result + " " + 
           transactionType + String.valueOf(System.currentTimeMillis() - start)); 
} 

public Observable<String> obString() { 
    return Observable.just("") 
        .doOnNext(val -> { 
         System.out.println("Thread " + Thread.currentThread() 
                   .getName()); 
        }) 
        .map(val -> "Hello"); 
} 

public Observable<String> obString1() { 
    return Observable.just("") 
        .doOnNext(val -> { 
         System.out.println("Thread " + Thread.currentThread() 
                   .getName()); 
        }) 
        .map(val -> " World"); 
} 

public Observable<String> obString2() { 
    return Observable.just("") 
        .doOnNext(val -> { 
         System.out.println("Thread " + Thread.currentThread() 
                   .getName()); 
        }) 
        .map(val -> "!"); 
} 

public Observable<String> obAsyncString() { 
    return Observable.just("") 
        .observeOn(scheduler) 
        .doOnNext(val -> { 
         System.out.println("Thread " + Thread.currentThread() 
                   .getName()); 
        }) 
        .map(val -> "Hello"); 
} 

public Observable<String> obAsyncString1() { 
    return Observable.just("") 
        .observeOn(scheduler1) 
        .doOnNext(val -> { 
         System.out.println("Thread " + Thread.currentThread() 
                   .getName()); 
        }) 
        .map(val -> " World"); 
} 

public Observable<String> obAsyncString2() { 
    return Observable.just("") 
        .observeOn(scheduler2) 
        .doOnNext(val -> { 
         System.out.println("Thread " + Thread.currentThread() 
                   .getName()); 
        }) 
        .map(val -> "!"); 
} 

można zobaczyć więcej przykładów tutaj https://github.com/politrons/reactive

6

Mała example:

Observable<String> stringObservable1 = Observable.just("Hello", "World"); 
Observable<String> stringObservable2 = Observable.just("Bye", "Friends"); 

Observable.zip(stringObservable1, stringObservable2, new BiFunction<String, String, String>() { 
    @Override 
    public String apply(@NonNull String s, @NonNull String s2) throws Exception { 
     return s + " - " + s2; 
    } 
}).subscribe(new Consumer<String>() { 
    @Override 
    public void accept(String s) throws Exception { 
     System.out.println(s); 
    } 
}); 

ten wypisze:

Hello - Bye 
World - Friends 
+0

jest fajnie, w "Func2 <...>", jakie są trzy ciągi? jest to dla 's',' -' i 's2'? – chornge

+1

Właśnie zaktualizowałem odpowiedź, aby obsłużyć metody RxJava2, pierwszy ciąg w BiFunction jest typem pierwszego obserwowalnego, drugi String jest typem drugiego obserwowalnego, a trzeci String jest typem obserwowalnego, że wygeneruje! –

1

I zostały poszukiwania prosta odpowiedź na temat korzystania operator pocztowy, a co zrobić z obserwabli Tworzę, aby przekazać je do niego, zastanawiałem się, czy powinienem zadzwonić subskrybować() dla każdego obserwowalne, czy nie, nie z tych odpowiedzi były łatwe do znalezienia, musiałem zrozumieć to przez siebie, więc tutaj jest prosty przykład dla za pomocą operatora Zip na 2 obiektach Observables:

@Test 
public void zipOperator() throws Exception { 

    List<Integer> indexes = Arrays.asList(0, 1, 2, 3, 4); 
    List<String> letters = Arrays.asList("a", "b", "c", "d", "e"); 

    Observable<Integer> indexesObservable = Observable.fromIterable(indexes); 

    Observable<String> lettersObservable = Observable.fromIterable(letters); 

    Observable.zip(indexesObservable, lettersObservable, mergeEmittedItems()) 
      .subscribe(printMergedItems()); 
} 

@NonNull 
private BiFunction<Integer, String, String> mergeEmittedItems() { 
    return new BiFunction<Integer, String, String>() { 
     @Override 
     public String apply(Integer index, String letter) throws Exception { 
      return "[" + index + "] " + letter; 
     } 
    }; 
} 

@NonNull 
private Consumer<String> printMergedItems() { 
    return new Consumer<String>() { 
     @Override 
     public void accept(String s) throws Exception { 
      System.out.println(s); 
     } 
    }; 
} 

wydrukowany wynik to:

[0] a 
[1] b 
[2] c 
[3] d 
[4] e 

ostateczne odpowiedzi na pytania, które gdzie w mojej głowie były następujące

obserwable przekazanych do zip() metoda wystarczy być utworzone tylko, nie muszą mieć żadnych subskrybentów do nich, wystarczy tylko je utworzyć ... jeśli chcesz, aby jakiekolwiek obserwowalne do uruchomienia w harmonogramie, możesz określić to dla tego Observable ... próbowałem również zip() operator na Observables, gdzie powinien czekać na wynik, a materiał eksploatacyjny zip() był t jest ustawiony tylko wtedy, gdy oba wyniki są gotowe (co jest oczekiwanym zachowaniem).

+0

Mam dwie listy obiektów Observables (żądania http). Pierwsza lista zawiera żądania utworzenia elementów, a druga zawiera żądania aktualizacji elementów. Chcę wykonać "coś", gdy wszystkie żądania zakończą się. Myślałem o używaniu zip, ale na twoim przykładzie widzę, że grupuję żądania według par, a ja po prostu chcę otrzymywać powiadomienia na końcu (listy mają różne rozmiary). Czy możesz mi dać jakiś pomysł? Dzięki. – JCarlos

+1

spójrz na to: http://reactivex.io/documentation/operators/combinelatest.html –