Rozglądam się, aby wymienić bibliotekę przetwarzania logów, która wygląda okropnie blisko ReactiveStreams z io.projectreactor
. Celem jest ograniczenie utrzymywanego przez nas kodu i korzystanie z nowych funkcji dodanych przez społeczność (eyeing operator fusion).Strumienie reaktywne - pakowanie z limitem czasu
Na początek muszę zużyć stdio i scalić wpisy dziennika wieloliniowego w obiektach tekstowych, które mogłyby przepłynąć w dół. Przypadek użycia jest szczegółowo wyjaśniony w rozdziale multiline log entries rozdziału Dokumenty Filebeat (z wyjątkiem tego, że chcemy, aby to nastąpiło).
Dotychczas kod mam jest:
BufferedReader input = new BufferedReader(new InputStreamReader(System.in));
Flux<String> lines = Flux.generate(sink -> rethrow(() -> { while (true) sink.next(input.readLine()); }));
Flux<String> logRecordsStr = lines.concatMap(new LogRecordJoiner());
Flux<LogRecord> logRecords = logRecordsStr.map(new LogRecordMapper());
logRecords.doOnEach(r -> System.out.printf("%s payload: %d chars\n", r.timestamp, r.payload.length()))
.subscribe();
ta dba o multi-line łączących się, gdy zostanie wykryty nowy nagłówek dziennika, ale w istniejącej biblioteki również wypłukać nagromadzone linie po timeout (jeśli żaden tekst nie zostanie odebrany w ciągu 5 sekund, przepłucz rekord).
Jaki byłby właściwy sposób modelowania tego w Reactor? Czy muszę napisać własny operator, czy mogę dostosować dowolny istniejący?
Wszelkie wskazówki odnoszące się do odpowiednich przykładów i dokumentów dla osiągnięcia tego zastosowania w projekcie Reactor lub RxJava byłyby bardzo doceniane.
Czy widzisz operatora 'buffer (długi okres czasu, jednostka TimeUnit)' (rxjava)? – zella
Bufor wygląda naprawdę blisko, ale żadne przeciążenie nie pasuje do tego, czego potrzebuję - potrzebuję kombinacji strategii "bufferClosingSelector" i "timespan" - w zależności od tego, co nastąpi wcześniej. – ddimitrov