Buduję aplikację, która umożliwia dynamiczne dodawanie i usuwanie subskrypcji tematów kafka. Po dodaniu subskrypcji tematów chciałem co godzinę uruchomić zadanie wsadowe, które pobiera wszystkie nowe wiadomości i przesyła je do innego magazynu danych.Kafka - najprostszy sposób na uzyskanie ostatniej offsetu
Chcę zrozumieć, jak uzyskać aktualne przesunięcie tematu. Zaraz po dodaniu subskrypcji chcę, aby następne zadanie wsadowe otrzymywało wszystkie wiadomości od przybliżonego czasu subskrypcji.
Jako przykład wyobraź sobie, że mam temat o nazwie "TopicA", który stale odbiera wiadomości. Jeśli dodam subskrypcję o godzinie 19.15, gdy zadanie wsadowe uruchomi się o 20.00, chcę, aby wszystkie wiadomości od godziny 7.15 zostały uaktywnione. Cieszę się, że czas jest przybliżony - 7.10, 7.20 itd. 5 lub 10 minut z każdej strony nie wzbudza we mnie obaw.
Dlatego moim zamierzonym rozwiązaniem jest pobranie aktualnego przesunięcia tematu w momencie dodania subskrypcji. Spojrzałem na prostego konsumenta, ale nie chcę angażować się we wszystkie aspekty zarządzania klastrem w tym podstawowym przypadku użycia.
Przyjrzałem się także konsumentom wysokiego poziomu. Mógłbym coś takiego:
consumer.createMessageStreamsByFilter(new Whitelist(topicName)).head.head.offset
Co mnie niepokoi z tym podejściem, wezwanie do "głowy" jest w rzeczywistości strumieniem. Sądzę więc, że zablokuje to czekanie na kolejną wiadomość. Blokowanie jest problematyczne, ponieważ może powodować, że inne subskrypcje będą umieszczane w kolejce do momentu pojawienia się następnego komunikatu.
Cieszę się, że mogę poświęcić trochę czasu na wdrożenie tego drugiego podejścia, ale jeśli jest prostszy sposób, który nie wymaga napisania kodu podatnego na błędy, wolałbym nie marnować czasu.
Potrzebuję również sposobu na uzyskanie wszystkich dzienników od tego offsetu.
Byłby to wysoki znak wodny dla danej partycji. Myślę, że pyta o informacje "ostatniej wiadomości" {partitionId, offsetId}. – arviman
Myślę, że nie ma czegoś takiego jak globalna "najnowsza wiadomość". Kafka nie skalowałby się, gdyby miał jakiś globalny mechanizm synchronizacji ... –