2015-10-02 22 views
8

Używamy burzy z wylewką Kafki. Gdy zawiadamiamy o błędach, chcielibyśmy je odtworzyć, ale w niektórych przypadkach złe dane lub błędy w kodzie spowodują, że wiadomości będą zawsze zawierane przez Bolta, więc przejdziemy do nieskończonego cyklu powtórzeń. Oczywiście naprawiamy błędy, kiedy je znajdziemy, ale chcielibyśmy, aby nasza topologia była ogólnie odporna na błędy. Jak możemy ack() krotkę po tym, jak została powtórzona więcej niż N razy?Maksymalna liczba powtórzeń krotki na wylocie Storm Kafka

Przeglądając kod dla Kafki wylewem, widzę, że został on zaprojektowany, aby ponowić próbę z wykładniczym zegarem ograniczania mocy i stanu comments on the PR:

„Dziobek nie zakończyć cykl restartu (to jest moje przekonanie, że nie powinien tego robić, ponieważ nie może raportować kontekstu o awarii, która miała miejsce, aby anulować żądanie), obsługuje tylko opóźnianie ponownych prób, ale w topologii wciąż oczekuje się, że w końcu wywoła ack() zamiast fail(), aby zatrzymać cykl. "

Widziałem odpowiedzi StackOverflow, które zalecają pisanie niestandardowego dziobka, ale wolałbym nie utknąć z utrzymaniem niestandardowej poprawki wewnętrznych części wylewki Kafki, jeśli istnieje zalecany sposób wykonania tego w Bolt.

Jak to zrobić w Bolt? Nie widzę żadnego stanu w krotce, która eksponuje ile razy została powtórzona.

+1

Jeśli masz jakieś sprawdzanie błędów w ryglu, gdzie możesz wywnioskować, że dana krotka jest "zła", jak na ciebie logiki biznesowej, możesz "ack" zamiast niepowodzenia .... więc nie będzie odtwarzany .. ... –

Odpowiedz

5

Sam Storm nie zapewnia żadnego wsparcia dla twojego problemu. W związku z tym jedynym rozwiązaniem jest indywidualne rozwiązanie. Nawet jeśli nie chcesz łatać KafkaSpout, myślę, że wprowadzenie najlepszego licznika i przerwanie cyklu powtórzeń byłoby najlepszym rozwiązaniem. Jako alternatywę możesz również dziedziczyć po KafkaSpout i umieścić licznik w swojej podklasie. Jest to oczywiście nieco podobne do poprawki, ale może być mniej uciążliwe i łatwiejsze do wdrożenia.

Jeśli chcesz użyć śruby, możesz wykonać następujące czynności (co również wymaga pewnych zmian w KafkaSpout lub jego podklasie).

  • przypisać unikalne identyfikatory jako dodatkowy atrybut dla każdej krotki (być może, nie jest już unikatowy identyfikator dostępne, w przeciwnym razie można wprowadzić „przeciw-ID” lub po prostu cały krotka, czyli wszystkie atrybuty, zidentyfikować każdą krotkę).
  • Włóż śrubę po KafkaSpout poprzez fieldsGrouping na identyfikatorze (aby upewnić się, że krotka odtwarzana jest przesyłana strumieniowo do tej samej instancji typu bolt).
  • Wewnątrz sworznia użyj HashMap<ID,Counter>, który buforuje wszystkie krotki i policzy liczbę ponownych prób. Jeśli licznik jest mniejszy niż twoja wartość progowa, przesyła krotkę wejściową tak, aby była przetwarzana przez faktyczną topologię, która następuje (oczywiście, musisz odpowiednio zakotwiczyć krotkę). Jeśli liczba jest większa niż twój próg, potnij krotkę, aby przerwać cykl i usuń jego wpis z HashMap (możesz również chcieć LOG wszystkie nieudane krotki).
  • Aby usunąć pomyślnie przetworzone krotki z HashMap, za każdym razem, gdy krotka jest potwierdzana w KafkaSpout, musisz przekazać identyfikator krotki do śruby, aby mógł usunąć krotkę z HashMap. Po prostu zadeklaruj drugi strumień wyjściowy dla podklasy KafkaSpout i nadpisaj Spout.ack(...) (oczywiście musisz zadzwonić pod numer super.ack(...), aby upewnić się, że KafkaSpout również otrzyma potwierdzenie).

Takie podejście może jednak pochłaniać dużo pamięci.Jako alternatywę dla wpisu dla każdej krotki w HashMap można również użyć trzeciego strumienia (który jest podłączony do śruby jako dwa pozostałe) i przekazać identyfikator krotki, jeśli krotka się nie powiedzie (tj. W Spout.fail(...)). Za każdym razem, gdy śruba otrzymuje komunikat "Fail" z tego trzeciego strumienia, licznik rośnie. Tak długo, jak brak wpisu nie zostanie osiągnięty (lub próg nie zostanie osiągnięty), śruba po prostu przekazuje dalej krotkę do przetworzenia. To powinno zmniejszyć zużytą pamięć, ale wymaga nieco więcej logiki, która zostanie wdrożona w twojej wylewce i śrubie.

Oba podejścia mają tę wadę, że każda potwierdzona krotka powoduje dodatkowy komunikat do nowo wprowadzonego bolta (co zwiększa ruch sieciowy). W przypadku drugiego podejścia może się wydawać, że wystarczy wysłać wiadomość "ack" do śruby w przypadku krotek, które wcześniej zawiodły. Jednak nie wiesz, które krotki zawiodły, a które nie. Jeśli chcesz pozbyć się tego obciążenia sieci, możesz wprowadzić drugi HashMap w KafkaSpout, który buforuje identyfikatory nieudanych wiadomości. Tak więc możesz wysłać wiadomość "ack" tylko wtedy, gdy nieudana gra została pomyślnie odtworzona. Oczywiście to trzecie podejście sprawia, że ​​logika do wdrożenia jest jeszcze bardziej złożona.

W pewnym zakresie, nie modyfikując w żaden sposób KafkaSpout, nie widzę żadnego rozwiązania problemu. Ja osobiście załatałbym KafkaSpout lub użyłbym trzeciego podejścia z podklasą HashMap w (ponieważ zużywał on mało pamięci i nie nakładał dodatkowego obciążenia na sieć w porównaniu z dwoma pierwszymi rozwiązaniami).

+0

jest metodą fail() w wylocie wywoływaną przez pojedynczy wątek? Próbuję tylko ustalić, czy potrzebuję ConcurrentHashMap do śledzenia msgIds-> errorCnt, czy też prosta HashMap <>. dzięki – user3169330

+0

'nextTuple()', 'ack()' i 'fail()' są wywoływane przez pojedynczy wątek. Używanie 'HashMap' jest wystarczające. Zobacz tutaj, aby uzyskać więcej informacji: https://stackoverflow.com/questions/32547935/why-should-i-not-lub-lub-block-in-spout-nexttuple –

+0

Kolejna rzecz, jeśli mam N spouts, nie ma błędu() Metoda dla określonego msgId, uzyskać wywołania na serwerze SAME/wylewki? – user3169330

0

zasadzie to działa tak:

  1. Jeśli wdrożyć topologii powinny być klasy produkcyjnej (to jest to oczekiwana pewien poziom jakości, a liczba krotek niski).
  2. Jeśli krotka nie powiedzie się, sprawdź, czy krotka jest rzeczywiście ważna.
  3. Jeśli krotka jest poprawna (na przykład nie można jej wstawić, ponieważ nie można połączyć się z zewnętrzną bazą danych lub coś podobnego), odpowiedz.
  4. Jeśli krotka jest nietrafiona i nigdy nie można jej obsłużyć (na przykład identyfikator bazy danych, który jest tekstem, a baza danych oczekuje liczby całkowitej) to powinna być ack, nigdy nie będziesz w stanie naprawić takiej rzeczy lub wstawić jej do baza danych.
  5. Nowe rodzaje wyjątków powinny być rejestrowane (tak samo jak zawartość krotek). Powinieneś sprawdzić te logi i wygenerować regułę, aby zweryfikować krotki w przyszłości. I ostatecznie dodaj kod, aby poprawnie je przetwarzać (ETL) w przyszłości.
  6. Nie rejestruj wszystkiego, w przeciwnym razie pliki dziennika będą ogromne, bądź bardzo wybiórczy w stosunku do tego, co logujesz. Zawartość plików dziennika powinna być użyteczna, a nie kupa śmieci.
  7. Kontynuuj, a ostatecznie zajmiesz się tylko wszystkimi przypadkami.
0

Stoimy również w obliczu podobnych danych, w których pojawiają się błędne dane, powodujące nieskończoną awarię śruby.

Aby rozwiązać ten problem w środowisku wykonawczym, wprowadziliśmy jeszcze jeden rygiel, nazywający go "DebugBolt" w celach informacyjnych. Tak więc dziobek najpierw przesyła wiadomość do tego sworznia, a następnie ten rygiel dokonuje wymaganej korekty danych dla złych wiadomości, a następnie wysyła je do wymaganej śruby. W ten sposób można naprawić błędy danych w locie.

Ponadto, jeśli chcesz usunąć niektóre wiadomości, możesz przekazać ignoreFlag z DebugBolt do oryginalnego Bolta, a twój oryginalny rygiel powinien po prostu wysłać ack do spouta bez przetwarzania, jeśli ignoreFlag ma wartość True.

0

Po prostu nasz rygiel wypuścił błędną krotkę na strumień błędów i potwierdził. Kolejna śruba poradziła sobie z błędem, pisząc ją z powrotem do tematu Kafki specjalnie w poszukiwaniu błędów. Dzięki temu możemy łatwo sterować przepływem danych normalnych w porównaniu z błędami w topologii.

Jedynym przypadkiem, w którym zawodzimy krotce, jest to, że niektóre wymagane zasoby są niedostępne, takie jak połączenie sieciowe, DB, ... To są błędy do odzyskania. Cokolwiek innego jest kierowane do strumienia błędów, który ma zostać naprawiony lub obsłużony, jak jest to właściwe.

To wszystko zakłada oczywiście, że nie chcesz ponosić utraty danych. Jeśli chcesz tylko wykonać jak najlepszy wysiłek i zignorować po kilku próbach, to sprawdziłbym inne opcje.