Chcę wygenerować Observable
w czasie rzeczywistym z wyników listy Futures
.Observable from Futures - onNastępnie z wielu wątków
W najprostszym przypadku, załóżmy, że mam listę kontraktów terminowych, które prowadzę z Future.sequence
, i po prostu monitoruję ich postępy za pomocą Observable
, która mówi mi za każdym razem, gdy się zakończy. Zasadniczo robię to tak:
def observeFuturesProgress(futs: List[Future[Int]]) : Observable[String] = {
Observable[String](observer => {
val loudFutures: List[Future[Int]] = futs.map(f => {
f onComplete {
case Success(a) => observer.onNext(s"just did $a more")
case Failure(e) => observer.onError(e)
}
f
})
Future.sequence(loudFutures) onComplete {
case Success(_) => observer.onCompleted()
case Failure(e) => observer.onError(e)
}
})
}
To działa dobrze w moim środowisku testowym. Ale właśnie przeczytałem, że onNext
nie powinien być wywoływany z różnych wątków, przynajmniej bez bycia ostrożnym, że nie ma nakładających się połączeń. Jaki jest zalecany sposób rozwiązania tego problemu? Wydaje się, że wiele rzeczywistych zastosowań Observables
wymagałoby wywołania onNext
z takiego kodu asynchronicznego, ale nie mogę znaleźć podobnego przykładu w dokumentach.
Nie jestem pewien, czy istnieje lepsze rozwiązanie, ale można mieć pewność, że 'onNext' rozmowy są prowadzone przez tego samego wątku, jeśli używasz na przykład jeden gwintowany kontekstu wykonania (' ExecutionContext.fromExecutor (eXecutors. newSingleThreadExecutor()) '), aby uruchomić te callbacki' onComplete's. – Kolmar
Czy możesz wskazać, który artykuł dotyczy "onNext"? Ten przypadek użycia jest całkowicie w porządku, z mojego punktu widzenia. – mavarazy
@mavarazy: Większość dokumentacji, którą znalazłem w tej sprawie jest dość niejasna, ale [to] (http://reactivex.io/documentation/operators/serialize.html) mówi o używaniu 'serialize()', aby uniknąć dwóch nakładających się 'onNext()' wywołania i [this] (https://github.com/ReactiveX/RxJava/wiki/Subject) ostrzega, aby nie wywoływać 'onNext()' z wielu wątków - przynajmniej jeśli używasz Przedmiot. Wszystkie oficjalne przykłady Rx, jakie mogłem znaleźć, są jednowątkowe. – thund