8

Rozglądam się, aby wymienić bibliotekę przetwarzania logów, która wygląda okropnie blisko ReactiveStreams z io.projectreactor. Celem jest ograniczenie utrzymywanego przez nas kodu i korzystanie z nowych funkcji dodanych przez społeczność (eyeing operator fusion).Strumienie reaktywne - pakowanie z limitem czasu

Na początek muszę zużyć stdio i scalić wpisy dziennika wieloliniowego w obiektach tekstowych, które mogłyby przepłynąć w dół. Przypadek użycia jest szczegółowo wyjaśniony w rozdziale multiline log entries rozdziału Dokumenty Filebeat (z wyjątkiem tego, że chcemy, aby to nastąpiło).

Dotychczas kod mam jest:

BufferedReader input = new BufferedReader(new InputStreamReader(System.in)); 
Flux<String> lines = Flux.generate(sink -> rethrow(() -> { while (true) sink.next(input.readLine()); })); 
Flux<String> logRecordsStr = lines.concatMap(new LogRecordJoiner()); 
Flux<LogRecord> logRecords = logRecordsStr.map(new LogRecordMapper()); 
logRecords.doOnEach(r -> System.out.printf("%s payload: %d chars\n", r.timestamp, r.payload.length())) 
      .subscribe();   

ta dba o multi-line łączących się, gdy zostanie wykryty nowy nagłówek dziennika, ale w istniejącej biblioteki również wypłukać nagromadzone linie po timeout (jeśli żaden tekst nie zostanie odebrany w ciągu 5 sekund, przepłucz rekord).

Jaki byłby właściwy sposób modelowania tego w Reactor? Czy muszę napisać własny operator, czy mogę dostosować dowolny istniejący?

Wszelkie wskazówki odnoszące się do odpowiednich przykładów i dokumentów dla osiągnięcia tego zastosowania w projekcie Reactor lub RxJava byłyby bardzo doceniane.

+1

Czy widzisz operatora 'buffer (długi okres czasu, jednostka TimeUnit)' (rxjava)? – zella

+0

Bufor wygląda naprawdę blisko, ale żadne przeciążenie nie pasuje do tego, czego potrzebuję - potrzebuję kombinacji strategii "bufferClosingSelector" i "timespan" - w zależności od tego, co nastąpi wcześniej. – ddimitrov

Odpowiedz

3

To zależy od tego, w jaki sposób określić początek i koniec każdego bufora, więc następujące RxJava kod 2 przeznaczony jest jako podpowiedź o używaniu wartości głównego źródła, aby otworzyć i zamknąć bramę bufora:

TestScheduler scheduler = new TestScheduler(); 
PublishProcessor<String> pp = PublishProcessor.create(); 

Function<Flowable<String>, Flowable<List<String>>> f = o -> 
     o.buffer(o.filter(v -> v.contains("Start")), 
       v -> Flowable.merge(o.filter(w -> w.contains("End")), 
            Flowable.timer(5, TimeUnit.MINUTES, scheduler))); 

pp.publish(f) 
.subscribe(System.out::println); 

pp.onNext("Start"); 
pp.onNext("A"); 
pp.onNext("B"); 
pp.onNext("End"); 

pp.onNext("Start"); 
pp.onNext("C"); 

scheduler.advanceTimeBy(5, TimeUnit.MINUTES); 

pp.onNext("Start"); 
pp.onNext("D"); 
pp.onNext("End"); 
pp.onComplete(); 

Drukuje:

[Start, A, B, End] 
[Start, C] 
[Start, D, End] 

Działa poprzez udostępnienie źródła przez publish, co pozwala na ponowne użycie tej samej wartości z poziomu źródłowego bez jednoczesnego uruchamiania wielu kopii źródłowych. Otwarcie jest regulowane przez wykrycie łańcucha "Start" na linii. Zamknięcie jest regulowane przez wykrycie ciągu "Koniec" lub zwłoki czasomierza po okresie karencji.

Edit:

Jeśli „Start” jest również wskaźnik dla kolejnej partii, można zastąpić „End” sprawdzić z „start” i zmienić zawartość bufora, ponieważ będzie obejmować nowy nagłówek w poprzednim buforze inaczej:

pp.publish(f) 
.doOnNext(v -> { 
    int s = v.size(); 
    if (s > 1 && v.get(s - 1).contains("Start")) { 
     v.remove(s - 1); 
    } 
}) 
.subscribe(System.out::println); 
+0

Co powiesz na przypadek, w którym nie ma KOŃCA, ale bufor jest zamknięty, gdy widzimy następny START, lub wygasa Tumeout? Zaczynam wątpić w moją komunikację - czy jest coś, co nie jest jasne w pytaniu? – ddimitrov

1

buffer Operator wydaje mi się najbardziej odpowiednim i prostym rozwiązaniem.

Ma strategie oparte na wielkości i czasie. Masz dziennik, więc myślę, że możesz interpretować linie liczone jako rozmiar bufora.

Oto przykład - jak emitować pozycji zgrupowanych przez 4 lub 5 sekund przedziale czasu:

Observable<String> lineReader = Observable.<String>create(subscriber -> { 
     try { 
      BufferedReader br = new BufferedReader(new InputStreamReader(System.in)); 
      for (String line = br.readLine(); line != null; line = br.readLine()) { 
       subscriber.onNext(line); 
      } 
     } catch (IOException e) { 
      throw new UncheckedIOException(e); 
     } 
    }).subscribeOn(Schedulers.newThread()); 

    lineReader 
     .buffer(5, TimeUnit.SECONDS,4) 
     .filter(lines -> !lines.isEmpty()) 
     .subscribe(System.out::println); 
+0

Potrzebuję zgrupowane przez nagłówek dziennika z Tumeout. To znaczy. jeśli zalogowałem wiadomość 2-wierszową, a następnie komunikat 1-wierszowy, a następnie stacktrace, a następnie kolejny 1 wiersz messshe, a następnie nic przez okres Tumeout.Oczekuję, że natychmiast dostanę 3 wiadomości, aż do stacktrace, i czwartą wiadomość po tumeout. – ddimitrov