2016-11-09 37 views
7

Mam dane w następującym formacie,Strumień migotania: Jak zaimplementować okna zdefiniowane przez element początkowy i końcowy?

SIP | 2405463430 | 4115474257 | 8.205142580136622E12 | Wto 08 listopada 16:58:58 IST 2016 | INVITE RTP | 2405463430 | 4115474257 | 8.205142580136622E12 | Wto 08 listopada 16:58:58 IST 2016 | 0 RTP | 2405463430 | 4115474257 | 8.205142580136622E12 | Wt Nov 08 16:58:58 IST 2016 | 1 RTP | 2405463430 | 4115474257 | 8.205142580136622E12 | Tue Nov 08 16:58:58 IST 2016 | 2 RTP | 2405463430 | 4115474257 | 8.205142580136622E12 | Wto 08 listopada 16:58:58 IST 2016 | 3 RTP | 2405463430 | 4115474257 | 8.205142580136622E12 | Wto 08 listopada 16:58:58 IST 2016 | 4 RTP | 2405463430 | 4115474257 | 8.205142580136622E12 | Tue Nov 08 16:58:58 IST 2016 | 5 RTP | 2405463430 | 4115474257 | 8.205142580136622E12 | Tue Nov 08 16:58:58 IST 2016 | 6 RTP | 2405463430 | 4115474257 | 8.205142580136622 E12 | Wt Nov 08 16:58:58 IST 2016 | 7 RTP | 2405463430 | 4115474257 | 8.205142580136622E12 | Tue Nov 08 16:58:58 IST 2016 | 8 RTP | 2405463430 | 4115474257 | 8.205142580136622E12 | Wt. Listopad 08 16:58:58 IST 2016 | 9 SIP | 2405463430 | 4115474257 | 8.205142580136622E12 | Wto 08 listopad 16:58:58 IST 2016 | BYE

Chcę moje okno, aby rozpocząć, gdy spotyka się komunikat SIP-INVITE i wyzwala zdarzenie, gdyNapotkano wiadomość, wykonując pewne agregacje.

Jak to zrobić? Komunikat SIP-INVITE pojawia się w dowolnym momencie dla danego użytkownika, a ja mogę mieć wiele wiadomości od wielu użytkowników przychodzących w tym samym czasie.

Odpowiedz

2

Myślę, że możesz rozwiązać swój przypadek użycia z globalnymi oknami wpisanymi przez użytkownika. Globalne okna zbierają wszystkie dane z każdego klucza i przekazują odpowiedzialność za aktywację i czyszczenie okna do zdefiniowanej przez użytkownika funkcji Trigger.

Globalna okno jest zdefiniowane następująco:

val input: DataStream[(String, Int, String)] = ??? // (userId, value, marker) 
val agg = input 
    // one global window per user (handles overlapping SIP-INVITE events). 
    .keyBy(_._1) 
    // collect all data for each user until the trigger fires and purges the window. 
    .window(GlobalWindows.create()) 
    // you have to implement a custom trigger which reacts on the marker. 
    .trigger(new YourCustomTrigger()) 
    // the window function computes your aggregation. 
    .apply(new YourWindowFunction()) 

myślę wyzwalacz, który robi następujące powinny działać (zakładając, że jest zawsze wydarzeniem SIP-INVITE rozpoczęciem sesji). Metoda Trigger.onElement() powinna sprawdzać pole SIP-BYE i wywoływać ocenę okna i czyścić okno, tj. Zwracać TriggerResult.FIRE_AND_PURGE. Spowoduje to wywołanie funkcji oceny i usunięcie stanu okna.

Uwaga, szczególna uwaga jest wymagana, jeśli chcesz obsługiwać zdarzenia poza kolejnością (w takim przypadku powinieneś ustawić timer zdarzenia-czasu na znacznik czasu elementu zamykającego, aby zapewnić, że wszystkie dane poprzedzające znacznik czasu zostały odebrane). Jeśli istnieją dane, które należy odrzucić, ponieważ nie są one "pomiędzy" SIP-INVITE i SIP-BYE, musisz to również obsłużyć.

Aby uzyskać szczegółowe informacje, zapoznaj się z dokumentacją global windows i triggers, JavaDocs z [Trigger][3], i tym blog post.

+0

Dzięki :) To było bardzo pomocne! –