2015-12-23 15 views
5

Próbuję użyć Realm z RxJava i modernizowanych w sposób DanLew opisaną here concating wejście od królestwa i modernizacji, ale to utknie gdybym sferę dodanie do łańcuchaJak korzystać z Realm asObservable z operatorem concat() RxJavy?

Observable.concat(countryStorage.restoreAsObservable(), 
       networkService.api() 
        .getCountries() 
        .doOnNext(countryStorage::save)) 
       .first() 
       .observeOn(AndroidSchedulers.mainThread()) 
       .subscribe(//never reaching here) 

STORAGE

@Override public Observable<List<Country>> restoreAsObservable() { 
     Realm realm = realmProvider.get(); 
     return realm.where(Country.class) 
      .findAll() 
      .asObservable() 
      .map(countries -> return realm.copyFromRealm(countries)) 
      .first(countries -> return !countries.isEmpty()) 
      .doOnCompleted(realm::close()); 
     } 

Wydaje się, że to może się zdarzyć, że obserwowalne jest gorące z Królestwa, ale nic na ten temat w dokumentach i jak mam przypisać komponowanie Królestwa z innymi obserwowalnymi?

AKTUALIZACJA: Okazuje się, że działa dobrze w starym stylu. Pozostaje pytanie o nowe API.

return Observable.just(
     realm.copyFromRealm(realm.where(Country.class).findAll())) 
     .filter(countries -> !countries.isEmpty()) 
     .doOnCompleted(realm::close); 
+0

https://github.com/realm/realm-java/issues/1998 samo pytanie na github. Zobacz postęp tam. – beeender

+0

Mam dokładnie ten sam problem. Czy znalazłeś rozwiązanie tego problemu? – Eitan

+0

'doOnCompleted' nigdy nie będzie nazywane –

Odpowiedz

0

Dzieje się to dlatego, countryStorage.restoreAsObservable() nigdy nie zakończy, a jeśli czytasz concat doc, to wyraźnie stwierdza, że:

Concat czeka subskrybować każdy dodatkowy Obserwowalne które przechodzą do niej aż poprzednich obserwowalnych dopełnia .

Zamiast tego można po prostu zrobić coś takiego:

countryStorage.restoreAsObservable() 
      .doOnSubscribe(() -> { 
       networkService.api() 
        .getCountries() 
        .subscribe(countryStorage::save) 
      }) 
      .observeOn(AndroidSchedulers.mainThread()) 
      .subscribe(//do smth)