2016-04-13 14 views
8

Czy mogę używać Retrofit + RxJava do słuchania niekończącego się strumienia? Na przykład strumień Twittera. To, co mam, to:Android Retrofit 2 + RxJava: słuchaj nieskończonego strumienia

public interface MeetupAPI { 
    @GET("http://stream.meetup.com/2/rsvps/") 
    Observable<RSVP> getRSVPs(); 
} 

MeetupAPI api = new Retrofit.Builder() 
      .baseUrl(MeetupAPI.RSVP_API) 
      .addCallAdapterFactory(RxJavaCallAdapterFactory.create()) 
      .addConverterFactory(GsonConverterFactory.create()) 
      .build() 
      .create(MeetupAPI.class); 

api.getRSVPs() 
     .subscribeOn(Schedulers.newThread()) 
     .observeOn(AndroidSchedulers.mainThread()) 
     .subscribe(rsvp -> Log.d(TAG, "got rsvp"), 
       error -> Log.d(TAG, "error: " + error), 
       () -> Log.d(TAG, "onComplete")); 

, ale "onComplete" jest wywoływany po przeanalizowaniu pierwszego obiektu. Czy istnieje sposób, aby powiedzieć Retrofit, aby pozostał otwarty do odwołania?

Odpowiedz

9

Oto moje rozwiązanie:

Można użyć adnotacji @Streaming:

public interface ITwitterAPI { 

    @GET("/2/rsvps") 
    @Streaming 
    Observable<ResponseBody> twitterStream(); 
} 

ITwitterAPI api = new Retrofit.Builder() 
      .baseUrl("http://stream.meetup.com") 
      .addCallAdapterFactory(RxJavaCallAdapterFactory.create()) 
      .build().create(ITwitterAPI.class); 

Z @Streaming możemy uzyskać wejście surowego Od ResponseBody.

Oto moja funkcja owinąć ciało podzielone liniami ze zdarzeń:

public static Observable<String> events(BufferedSource source) { 
    return Observable.create(new Observable.OnSubscribe<String>() { 
     @Override 
     public void call(Subscriber<? super String> subscriber) { 
      try { 
       while (!source.exhausted()) { 
        subscriber.onNext(source.readUtf8Line()); 
       } 
       subscriber.onCompleted(); 
      } catch (IOException e) { 
       e.printStackTrace(); 
       subscriber.onError(e); 
      } 
     } 
    }); 
} 

oraz wyniku wykorzystania:

api.twitterStream() 
    .flatMap(responseBody -> events(responseBody.source())) 
    .subscribe(System.out::println); 

UPD o wdzięcznie zatrzymanie

Kiedy wypisywania zamyka modernizacyjne strumień wejściowy. Ale niemożliwe jest wykrycie strumienia wejściowego zamkniętego lub nie z samego strumienia wejściowego, więc tylko droga - spróbuj odczytać ze strumienia - otrzymamy wyjątek z komunikatem Socket closed. Możemy interpretować ten wyjątek jako zamknięcia:

 @Override 
     public void call(Subscriber<? super String> subscriber) { 
      boolean isCompleted = false; 
      try { 
       while (!source.exhausted()) { 
        subscriber.onNext(source.readUtf8Line()); 
       } 
      } catch (IOException e) { 
       if (e.getMessage().equals("Socket closed")) { 
        isCompleted = true; 
        subscriber.onCompleted(); 
       } else { 
        throw new UncheckedIOException(e); 
       } 
      } 
      //if response end we get here 
      if (!isCompleted) { 
       subscriber.onCompleted(); 
      } 
     } 

A jeśli połączenie zamknięte ponieważ koniec reakcji, nie ma żadnych wyjątków. Tutaj sprawdź to. Daj mi znać, jeśli się mylę :)

+0

Dzięki! Jakieś wskazówki, jak z wdziękiem przestać słuchać strumienia? – ticofab