2013-08-02 25 views
5

Korzystając z przykładu kodu dostarczonego przez Twitter4j, chciałbym zatrzymać strumień po zgromadzeniu listy 1000 statusów i zwrócić tę listę. Jak mogę to zrobić?Zatrzymaj strumień Twitter i zwróć listę statusu za pomocą twitter4j

public class Stream { 
    public List<Status> execute throws TwitterException { 

    List<Status> statuses = new ArrayList(); 

    ConfigurationBuilder cb = new ConfigurationBuilder(); 
    cb.setDebugEnabled(true); 
    cb.setOAuthConsumerKey("bbb"); 
    cb.setOAuthConsumerSecret("bbb"); 
    cb.setOAuthAccessToken("bbb"); 
    cb.setOAuthAccessTokenSecret("bbb"); 

    TwitterStream twitterStream = new TwitterStreamFactory(cb.build()).getInstance(); 

    StatusListener listener = new StatusListener() { 

     public void onStatus(Status status) { 
      statuses.add(status); 
      if (statuses.size>1000){ 
       //return statuses. Obviously that's not the correct place for a return statement... 
      } 
     } 

     public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) { 
      System.out.println("Got a status deletion notice id:" + statusDeletionNotice.getStatusId()); 
     } 

     public void onTrackLimitationNotice(int numberOfLimitedStatuses) { 
      System.out.println("Got track limitation notice:" + numberOfLimitedStatuses); 
     } 

     public void onScrubGeo(long userId, long upToStatusId) { 
      System.out.println("Got scrub_geo event userId:" + userId + " upToStatusId:" + upToStatusId); 
     } 

     public void onException(Exception ex) { 
      ex.printStackTrace(); 
     } 
    }; 

    FilterQuery fq = new FilterQuery(); 
    String keywords[] = {"Keyword 1", "Keyword 2"}; 

    fq.track(keywords); 

    twitterStream.addListener(listener); 
    twitterStream.filter(fq); 

} 
+0

Czy u dostać jakieś rozwiązanie? – jackyesind

+0

Nie ... wciąż jestem zainteresowany odpowiedzią! – seinecle

Odpowiedz

4

Nie jest dobrym pomysłem wymuszanie asynchronicznego fragmentu kodu w trybie synchronicznym. Zobacz: https://stackoverflow.com/a/5934656/276263 Sprawdź, czy możesz przerobić swoją logikę.

Jednak poniższy kod działa zgodnie z wymaganiami.

public class Stream { 

    public static void main(String[] args) throws TwitterException { 
    Stream stream = new Stream(); 
    stream.execute(); 
    } 

    private final Object lock = new Object(); 
    public List<Status> execute() throws TwitterException { 

    final List<Status> statuses = new ArrayList(); 

    ConfigurationBuilder cb = new ConfigurationBuilder(); 
    cb.setDebugEnabled(true); 
    cb.setOAuthConsumerKey("bbb"); 
    cb.setOAuthConsumerSecret("bbb"); 
    cb.setOAuthAccessToken("bbb"); 
    cb.setOAuthAccessTokenSecret("bbb"); 

    TwitterStream twitterStream = new TwitterStreamFactory(cb.build()) 
     .getInstance(); 

    StatusListener listener = new StatusListener() { 

     public void onStatus(Status status) { 
     statuses.add(status); 
     System.out.println(statuses.size() + ":" + status.getText()); 
     if (statuses.size() > 100) { 
      synchronized (lock) { 
      lock.notify(); 
      } 
      System.out.println("unlocked"); 
     } 
     } 

     public void onDeletionNotice(
      StatusDeletionNotice statusDeletionNotice) { 
     System.out.println("Got a status deletion notice id:" 
      + statusDeletionNotice.getStatusId()); 
     } 

     public void onTrackLimitationNotice(int numberOfLimitedStatuses) { 
     System.out.println("Got track limitation notice:" 
      + numberOfLimitedStatuses); 
     } 

     public void onScrubGeo(long userId, long upToStatusId) { 
     System.out.println("Got scrub_geo event userId:" + userId 
      + " upToStatusId:" + upToStatusId); 
     } 

     public void onException(Exception ex) { 
     ex.printStackTrace(); 
     } 

     @Override 
     public void onStallWarning(StallWarning sw) { 
     System.out.println(sw.getMessage()); 

     } 
    }; 

    FilterQuery fq = new FilterQuery(); 
    String keywords[] = { "federer", "nadal", "#Salute" }; 

    fq.track(keywords); 


    twitterStream.addListener(listener); 
    twitterStream.filter(fq); 

    try { 
     synchronized (lock) { 
     lock.wait(); 
     } 
    } catch (InterruptedException e) { 
     // TODO Auto-generated catch block 
     e.printStackTrace(); 
    } 
    System.out.println("returning statuses"); 
    twitterStream.shutdown(); 
    return statuses; 
    } 
} 
+0

W końcu akceptuję tę odpowiedź, ponieważ jest to ta, którą zaimplementowałem i działa dobrze. Dzięki. – seinecle

0

Istnieją dwa sposoby:

Jeśli wystarczy, aby zakończyć wątek, który pobiera strumień (jest to wątek, który nazywa swoją słuchacza) można użyć twitterStream.cleanUp();. To z wdziękiem zatrzyma wątek. Możesz użyć zmiennej boolean stopped w swoim detektorze statusu, aby zignorować wszystkie połączenia, które otrzymasz po tym wydarzeniu.

Możesz także zamknąć wątek pochłaniający strumień wraz z wątkiem rozsyłającym (to jest wątek demona), dzwoniąc pod numer twitterStream.shutdown();. Jest to bardziej brutalne podejście polegające na kończeniu komunikacji z API Twittera. Choć to działa, jeśli nazwać to od wewnątrz słuchacza, wolałbym podejście zaproponowane przez @krishnakumarp

6

Rozważ używanie BlockingQueue jako pośrednik, używając słuchacza dodać Status obiektów do niego.

Po rozpoczęciu transmisji możesz zacząć przyjmować Status es z kolejki, aż znajdziesz tysiąc, którego potrzebujesz.

Jako punkt wyjścia, by to wyglądać mniej więcej tak:

public class Stream { 
    private static final int TOTAL_TWEETS = 1000; 

    public List<Status> execute() throws TwitterException { 
     // skipped for brevity... 

     // TODO: You may have to tweak the capacity of the queue, depends on the filter query 
     final BlockingQueue<Status> statuses = new LinkedBlockingQueue<Status>(10000); 
     final StatusListener listener = new StatusListener() { 

      public void onStatus(Status status) { 
       statuses.offer(status); // Add received status to the queue 
      } 

      // etc... 
     }; 

     final FilterQuery fq = new FilterQuery(); 
     final String keywords[] = {"Keyword 1", "Keyword 2"}; 
     fq.track(keywords); 

     twitterStream.addListener(listener); 
     twitterStream.filter(fq); 

     // Collect the 1000 statues 
     final List<Status> collected = new ArrayList<Status>(TOTAL_TWEETS); 
     while (collected.size() < TOTAL_TWEETS) { 
      // TODO: Handle InterruptedException 
      final Status status = statuses.poll(10, TimeUnit.SECONDS); 

      if (status == null) { 
       // TODO: Consider hitting this too often could indicate no further Tweets 
       continue; 
      } 
      collected.add(status); 
     } 
     twitterStream.shutdown(); 

     return collected; 
    } 
} 
+0

Witam, próbuję zamknąć strumień twitter, ale zawsze otrzymuję wiązkę ostrzeżeń/erros. Mam metodę, która oczyszcza i zamyka. i otrzymuję: (proszę zobaczyć moją "odpowiedź poniżej"). Czy wiesz, dlaczego tak się dzieje? – Loebre