2014-11-20 5 views
7

Buduję aplikację, która umożliwia dynamiczne dodawanie i usuwanie subskrypcji tematów kafka. Po dodaniu subskrypcji tematów chciałem co godzinę uruchomić zadanie wsadowe, które pobiera wszystkie nowe wiadomości i przesyła je do innego magazynu danych.Kafka - najprostszy sposób na uzyskanie ostatniej offsetu

Chcę zrozumieć, jak uzyskać aktualne przesunięcie tematu. Zaraz po dodaniu subskrypcji chcę, aby następne zadanie wsadowe otrzymywało wszystkie wiadomości od przybliżonego czasu subskrypcji.

Jako przykład wyobraź sobie, że mam temat o nazwie "TopicA", który stale odbiera wiadomości. Jeśli dodam subskrypcję o godzinie 19.15, gdy zadanie wsadowe uruchomi się o 20.00, chcę, aby wszystkie wiadomości od godziny 7.15 zostały uaktywnione. Cieszę się, że czas jest przybliżony - 7.10, 7.20 itd. 5 lub 10 minut z każdej strony nie wzbudza we mnie obaw.

Dlatego moim zamierzonym rozwiązaniem jest pobranie aktualnego przesunięcia tematu w momencie dodania subskrypcji. Spojrzałem na prostego konsumenta, ale nie chcę angażować się we wszystkie aspekty zarządzania klastrem w tym podstawowym przypadku użycia.

Przyjrzałem się także konsumentom wysokiego poziomu. Mógłbym coś takiego:

consumer.createMessageStreamsByFilter(new Whitelist(topicName)).head.head.offset 

Co mnie niepokoi z tym podejściem, wezwanie do "głowy" jest w rzeczywistości strumieniem. Sądzę więc, że zablokuje to czekanie na kolejną wiadomość. Blokowanie jest problematyczne, ponieważ może powodować, że inne subskrypcje będą umieszczane w kolejce do momentu pojawienia się następnego komunikatu.

Cieszę się, że mogę poświęcić trochę czasu na wdrożenie tego drugiego podejścia, ale jeśli jest prostszy sposób, który nie wymaga napisania kodu podatnego na błędy, wolałbym nie marnować czasu.

Potrzebuję również sposobu na uzyskanie wszystkich dzienników od tego offsetu.

Odpowiedz

2

Każda odpowiedź na żądanie pobrania zwraca "HighWaterMark", który reprezentuje ostatnie przesunięcie w dzienniku aktualnie używanej partycji. Tak więc teoretycznie można pobrać najwcześniejszą wiadomość lub wiadomość (zakładając, że istnieje) dla danego tematu i wyciągnąć HighWaterMark z odpowiedzi. Jest więcej szczegółów na HighWaterMark tutaj: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-FetchResponse

Oczywiście, jest w stanie pociągnąć HighWaterMarkOffset od odpowiedzi zależy od klienta czyni, że dane dostępne za pośrednictwem własnej Kafka API.

+0

Byłby to wysoki znak wodny dla danej partycji. Myślę, że pyta o informacje "ostatniej wiadomości" {partitionId, offsetId}. – arviman

+1

Myślę, że nie ma czegoś takiego jak globalna "najnowsza wiadomość". Kafka nie skalowałby się, gdyby miał jakiś globalny mechanizm synchronizacji ... –