2016-03-12 10 views
5

Chcę wygenerować Observable w czasie rzeczywistym z wyników listy Futures.Observable from Futures - onNastępnie z wielu wątków

W najprostszym przypadku, załóżmy, że mam listę kontraktów terminowych, które prowadzę z Future.sequence, i po prostu monitoruję ich postępy za pomocą Observable, która mówi mi za każdym razem, gdy się zakończy. Zasadniczo robię to tak:

def observeFuturesProgress(futs: List[Future[Int]]) : Observable[String] = { 
    Observable[String](observer => { 
     val loudFutures: List[Future[Int]] = futs.map(f => { 
      f onComplete { 
       case Success(a) => observer.onNext(s"just did $a more") 
       case Failure(e) => observer.onError(e) 
      } 
      f 
     }) 
     Future.sequence(loudFutures) onComplete { 
      case Success(_) => observer.onCompleted() 
      case Failure(e) => observer.onError(e) 
     } 
    }) 
    } 

To działa dobrze w moim środowisku testowym. Ale właśnie przeczytałem, że onNext nie powinien być wywoływany z różnych wątków, przynajmniej bez bycia ostrożnym, że nie ma nakładających się połączeń. Jaki jest zalecany sposób rozwiązania tego problemu? Wydaje się, że wiele rzeczywistych zastosowań Observables wymagałoby wywołania onNext z takiego kodu asynchronicznego, ale nie mogę znaleźć podobnego przykładu w dokumentach.

+0

Nie jestem pewien, czy istnieje lepsze rozwiązanie, ale można mieć pewność, że 'onNext' rozmowy są prowadzone przez tego samego wątku, jeśli używasz na przykład jeden gwintowany kontekstu wykonania (' ExecutionContext.fromExecutor (eXecutors. newSingleThreadExecutor()) '), aby uruchomić te callbacki' onComplete's. – Kolmar

+0

Czy możesz wskazać, który artykuł dotyczy "onNext"? Ten przypadek użycia jest całkowicie w porządku, z mojego punktu widzenia. – mavarazy

+0

@mavarazy: Większość dokumentacji, którą znalazłem w tej sprawie jest dość niejasna, ale [to] (http://reactivex.io/documentation/operators/serialize.html) mówi o używaniu 'serialize()', aby uniknąć dwóch nakładających się 'onNext()' wywołania i [this] (https://github.com/ReactiveX/RxJava/wiki/Subject) ostrzega, aby nie wywoływać 'onNext()' z wielu wątków - przynajmniej jeśli używasz Przedmiot. Wszystkie oficjalne przykłady Rx, jakie mogłem znaleźć, są jednowątkowe. – thund

Odpowiedz

1

The Observable Contract

obserwable musi wydać powiadomienia obserwatorów seryjnie (nie w równolegle). Mogą wysyłać te powiadomienia z różnych wątków, , ale musi istnieć formalna, zdarzająca się wcześniej relacja między powiadomieniami .

Spójrz na Serialize

Jest to możliwe do zaobserwowania w celu wywołania metod swoich obserwatorów asynchronicznie, być może z różnych wątków. Może to spowodować, że obserwowalne naruszy kontrakt obserwowalny, ponieważ może próbować wysłać do powiadomienie OnCompleted lub OnError przed jednym z powiadomień OnNext lub może jednocześnie wysyłać powiadomienia OnNext z dwóch różnych wątków . Możesz wymusić takie obserwowanie, aby być dobrze zachowanym i synchronicznym, stosując do tego operator szeregowy.

+0

Dzięki. Kiedy szybko spojrzałem na stronę Serialize zanim zniechęciło mnie to, że sekcja Rx-scala mówi tylko o TBD i nie podaje właściwej składni. Nie byłem pewien, czy kiedykolwiek został wdrożony. Jednak poniższa składnia kompiluje: 'Observable [String] (observer => {...}). Serialize', i wydaje się działać dobrze. Nie mogę szczerze powiedzieć, czy '.serialize' robi cokolwiek - dla tego spróbuję zbudować mocniejsze testy wytrzymałościowe. Jestem również zaskoczony, że nie jest to domyślna implementacja 'Observable'. – thund