6

Zgodnie z Amazon Kinesis Streams documentation, rekord może być dostarczany wiele razy.Jak radzisz sobie z duplikatami Amazon Kinesis Record?

Jedynym sposobem na to, aby przetworzyć każdy rekord, jest tymczasowe przechowywanie ich w bazie danych obsługującej sprawdzanie integralności (np. DynamoDB, Elasticache lub MySQL/PostgreSQL) lub po prostu punkt kontrolny RecordId dla każdego fragmentu Kinesis.

Czy znasz lepszy/bardziej skuteczny sposób obsługi duplikatów?

Odpowiedz

6

Mieliśmy dokładnie ten problem przy budowie systemu telemetrii dla aplikacji mobilnej . W naszym przypadku nie byliśmy również pewni, że producenci wysyłają każdą wiadomość dokładnie raz, dlatego dla każdego otrzymanego rekordu obliczyliśmy jej MD5 w locie i sprawdziliśmy, czy jest on prezentowany w pewnej formie trwałego magazynu, ale w rzeczywistości to, którego najtrudniejszy kawałek.

Po pierwsze, próbowaliśmy trywialnej, relacyjnej bazy danych, ale szybko stała się ona poważnym wąskim gardłem całego systemu, ponieważ nie jest to tylko ciężki odczyt, ale także pisanie ciężkich przypadków, ponieważ ilość danych przechodzących przez Kinesis była dość znacząca. .

W końcu otrzymaliśmy tabelę DynamoDB przechowującą MD5 dla każdej unikalnej wiadomości. Problem polegał na tym, że nie było tak łatwo usunąć wiadomości - mimo że nasza tabela zawierała klucze partycji i sortowania, DynamoDB nie pozwala na opuszczenie wszystkich rekordów za pomocą danego klucza partycji, musieliśmy zapytać wszystkich, aby uzyskać sortuj kluczowe wartości (które tracą czas i pojemność). Niestety, musieliśmy po prostu odrzucić cały stół raz na jakiś czas. Innym sposobem na nieoptymalne rozwiązanie jest regularne obracanie tabel DynamoDB, które przechowują identyfikatory wiadomości.

Jednak niedawno DynamoDB wprowadził bardzo przydatną funkcję - Time To Live, co oznacza, że ​​teraz możemy kontrolować wielkość tabeli, włączając automatyczne wygaśnięcie na podstawie każdego rekordu. W tym sensie DynamoDB wydaje się być dość podobny do ElastiCache, jednak ElastiCache (przynajmniej klaster Memcached) jest znacznie mniej trwały - nie ma tam nadmiarowości, a wszystkie dane rezydujące na zakończonych węzłach są tracone w przypadku działania wagi lub awarii.

+1

Witam Dmitriju, pracowałem nad kilkoma benchmarkami używając czegoś podobnego do infrastruktury JustGiving, wyjaśnionej tutaj: https://aws.amazon.com/blogs/compute/serverless-cross-account-stream-replication-using-aws-lambda -amazon-dynamodb-and-amazon-kinesis-firehose /. Dlaczego obliczono sumę kontrolną MD5 zamiast Shardid + SequenceNumber dla tabeli DDB? – Antonio

+2

Hi @Antonio W naszym przypadku możliwe było, że producent wysłałby tę samą wiadomość Gdyby tak było, to Kinesis i tak uważałaby je za inne wiadomości (po prostu dlatego, że były 2 lub więcej postów od producenta). Ponieważ wiedzieliśmy, że każda wiadomość musi być unikalna, po prostu zignorowaliśmy wiadomości, które md5 ma Md5 zostało również wyliczone przez producentów, oszczędzając trochę czasu obliczeniowego dla klientów (biorąc pod uwagę stosunkowo dużą ilość danych przechodzących przez Kinesis) –

+0

Po prostu chciałem się tam wyrzucić - AWS zauważa, że ​​różne producenci mogą naturalnie wytwarzać ten sam rekord wiele razy ze względu na błędy, a także, częściej, wielu konsumentów może ciągnąć ten sam zestaw rekordów. Mam teraz do czynienia z tym w naszym systemie. Korzystamy z elastycznego przeszukiwania, a na razie plan polega na zastosowaniu elastycznych elementów wbudowanych w wersjach, aby zapewnić, że ten sam rekord nie jest aktualizowany w tym samym czasie, a następnie zapamiętać listę ostatnich zdarzeń zastosowanych do rekordu samego rekordu. – genexp

7

To, o czym wspomniałeś, to ogólny problem wszystkich systemów kolejek z podejściem "co najmniej raz". Ponadto, nie tylko systemy kolejek, producenci i konsumenci mogą przetwarzać tę samą wiadomość wiele razy (z powodu błędów ReadTimeout itp.). Kinesis i Kafka używają tego paradygmatu. Niestety nie ma na to łatwej odpowiedzi.

Możesz także spróbować użyć kolejki komunikatów "dokładnie raz", z bardziej rygorystycznym podejściem do transakcji. Na przykład AWS SQS robi to: https://aws.amazon.com/about-aws/whats-new/2016/11/amazon-sqs-introduces-fifo-queues-with-exactly-once-processing-and-lower-prices-for-standard-queues/. Należy pamiętać, że przepustowość SQS jest znacznie mniejsza niż Kinesis.

Aby rozwiązać problem, należy zdawać sobie sprawę z domeny aplikacji i spróbować rozwiązać ją wewnętrznie, tak jak sugerowałeś (sprawdzanie bazy danych). Szczególnie, gdy komunikujesz się z usługą zewnętrzną (na przykład z serwerem poczty e-mail), powinieneś być w stanie odzyskać stan operacji, aby zapobiec podwójnemu przetwarzaniu (ponieważ podwójne wysłanie w przykładzie serwera poczty e-mail może spowodować powstanie wielu kopii ten sam wpis w skrzynce odbiorcy).

Zobacz także następujące pojęcia;

  1. At-najmniej raz dostawy: http://www.cloudcomputingpatterns.org/at_least_once_delivery/
  2. Dokładnie jednokrotnego Dostawa: http://www.cloudcomputingpatterns.org/exactly_once_delivery/
  3. idempotent Procesor: http://www.cloudcomputingpatterns.org/idempotent_processor/
+0

Dziękuję za odpowiedź. Nie mogę używać SQS ze względu na wysoką przepustowość. Wysoka przepustowość jest również powodem, dla którego testuję kilka rozwiązań z różnymi trwałymi magazynami (Mysql/PgSQL/Aurora/ElasticSearch/DynamoDB). Najlepszym sposobem tymczasowego przechowywania identyfikatorów zdarzeń jest Redis, ale ElastiCache nie może zagwarantować trwałości danych. Dlatego szukałem alternatywnych sposobów robienia tego. – Antonio

+1

Redis przyznaje ci ścisłe śledzenie tx, ale jest to pojedynczy węzeł, a RDS jest zbyt wolny, masz rację. DynamoDB wydaje się być jedynym rozwiązaniem PaaS. Jeśli jednak chcesz zarządzać instancjami EC2, możesz wypróbować rozwiązania klastrowe w pamięci, takie jak Hazelcast lub VoltDB (w wielu węzłach r3)? – az3

+0

Bazy danych w pamięci nie są trwałe. Jeśli Twój klastra Hazelcast nie powiedzie się, nie będziesz w stanie zrozumieć, które wiadomości zostały już przetworzone. :( – Antonio