2017-09-30 25 views
5

Powtarzam, porównuję RxJavę z Java 9 Flow. Domyślnie Flow jest asynchronicznie i zastanawiałem się, czy istnieje sposób, aby uruchomić go synchronicznie.Run Flow w głównym wątku

Czasami po prostu chcemy go użyć nie dla Nio, ale dla składni cukru i mieć bardziej jednorodny kod.

Domyślnie RxJava jest synchroniczny i można go uruchomić asynchronicznie za pomocą observerOn i subscribeOn w potoku.

Jest jakiś operator w Flow, aby uruchomić go w głównym wątku ?.

Pozdrawiam.

Odpowiedz

5

Można zdefiniować niestandardową wartość Publisher, zgodnie z dokumentacją w dokumencie Flow w celu korzystania z wykonania synchronicznego.

Bardzo prosty wydawca że tylko problemy (na żądanie) pojedynczy TRUE pozycja do pojedynczego abonenta. Ponieważ subskrybent otrzymuje tylko jeden element, ta klasa nie używa kontroli buforowania i porządkowania.

class OneShotPublisher implements Publisher<Boolean> { 
    private final ExecutorService executor = ForkJoinPool.commonPool(); // daemon-based 
    private boolean subscribed; // true after first subscribe 
    public synchronized void subscribe(Subscriber<? super Boolean> subscriber) { 
    if (subscribed) 
     subscriber.onError(new IllegalStateException()); // only one allowed 
    else { 
     subscribed = true; 
     subscriber.onSubscribe(new OneShotSubscription(subscriber, executor)); 
    } 
    } 
    static class OneShotSubscription implements Subscription { 
    private final Subscriber<? super Boolean> subscriber; 
    private final ExecutorService executor; 
    private Future<?> future; // to allow cancellation 
    private boolean completed; 
    OneShotSubscription(Subscriber<? super Boolean> subscriber, 
         ExecutorService executor) { 
     this.subscriber = subscriber; 
     this.executor = executor; 
    } 
    public synchronized void request(long n) { 
     if (n != 0 && !completed) { 
     completed = true; 
     if (n < 0) { 
      IllegalArgumentException ex = new IllegalArgumentException(); 
      executor.execute(() -> subscriber.onError(ex)); 
     } else { 
      future = executor.submit(() -> { 
      subscriber.onNext(Boolean.TRUE); 
      subscriber.onComplete(); 
      }); 
     } 
     } 
    } 
    public synchronized void cancel() { 
     completed = true; 
     if (future != null) future.cancel(false); 
    } 
    } 
} 
+2

Dzięki za rozbudowany przykładowy kod. – paul

3

Nie ma operatora, aby to zrobić, ale API pozwala kontrolować sposób elementy są publikowane. Dlatego możesz po prostu wywołać metody subskrybenta bezpośrednio z bieżącego wątku.

class SynchronousPublisher implements Publisher<Data> { 
     public synchronized void subscribe(Subscriber<? super Data> subscriber) { 
      subscriber.onSubscribe(new SynchronousSubscription(subscriber)); 
     } 
} 
static class SynchronousSubscription implements Subscription { 
     private final Subscriber<? super Data> subscriber; 

     SynchronousSubscription(Subscriber<? super Data> subscriber) { 
      this.subscriber = subscriber; 
     } 
     public synchronized void request(long n) { 
      ... // prepare item    
      subscriber.onNext(someItem);  
     } 

     ... 
    } 
} 
+0

Dzięki, jest to również dobre rozwiązanie, nadal dość gadatliwy, ale hej! to jest Java :) – paul

+1

@paul Należy pamiętać, że Java zapewnia to jako specyfikację RS (brak implementacji). To nie jest alternatywa dla RxJavy. Tylko minimum potrzebne do działania jako specyfikacja. – manouti

2

To zależy od tego, co masz na myśli, uruchamiając główny wątek.

Jeśli chcesz wymusić wykonanie dowolnego przepływu w określonym wątku, nie ma standardowego sposobu, aby to zrobić, chyba że implementacja przepływu zostanie przeprowadzona w bibliotece, która pozwala ci zastąpić części zapewniające asynchronię. W kategoriach RxJava są to Scheduler s dostarczone przez klasę użyteczności Schedulers.

Jeśli chcesz obserwować przepływ w głównym wątku, musisz napisać konsument kolejki blokującej na końcu Flow.Subscriber, który blokuje wątek, dopóki kolejka nie ma pozycji. To może się skomplikować, więc skieruję cię do implementacji blockingSubscribe w Reactive4JavaFlow.

Jeśli chcesz używać głównego wątku Java jako Executor/Scheduler, jest to jeszcze bardziej skomplikowane i wymaga podobnego mechanizmu blokowania, a także niektórych pomysłów z executora wątków. Reactive4JavaFlow ma taki Scheduler, którego możesz użyć jako Executora przez: new SubmissionPublisher<>(128, blockingScheduler::schedule).