6

Próbuję zaimplementować architekturę sterowaną zdarzeniami do obsługi transakcji rozproszonych. Każda usługa ma własną bazę danych i wykorzystuje Kafkę do wysyłania wiadomości w celu informowania innych mikroserwisów o operacjach.Jak zaimplementować mikroserwisową architekturę opartą na zdarzeniach ze Spring Cloud Stream Kafka i baza danych dla usługi

Przykład:

Order service -------> | Kafka |------->Payment Service 
     |          | 
Orders MariaDB DB     Payment MariaDB Database 

Zamówienie otrzymuje żądanie zamówienia. Musi przechowywać nowe zamówienie w swojej bazie danych i opublikować komunikat, aby usługa płatności zorientowała się, że musi pobierać opłatę za przedmiot:

prywatne zamówienieZlecenie biznesoweBiznes;

@PostMapping 
public Order createOrder(@RequestBody Order order){ 
    logger.debug("createOrder()"); 
    //a.- Save the order in the DB 
    orderBusiness.createOrder(order); 
    //b. Publish in the topic so that Payment Service charges for the item. 
    try{ 
     orderSource.output().send(MessageBuilder.withPayload(order).build()); 
    }catch(Exception e){ 
     logger.error("{}", e); 
    } 
    return order; 
} 

Są to moje wątpliwości:

  1. Kroki A.- (zapisać w celu DB) i b.- (publikować wiadomości) powinny być wykonywane w ramach transakcji, niepodzielnie. Jak mogę to osiągnąć?
  2. Jest to związane z poprzednim: Wysyłam komunikat za pomocą: orderSource.output(). Send (MessageBuilder.withPayload (order) .build()); Ta operacja jest asynchroniczna i ZAWSZE zwraca true, bez względu na to, czy broker Kafki jest wyłączony. Skąd mogę wiedzieć, że wiadomość dotarła do brokera Kafka?

Odpowiedz

8

Kroki A.- (SAVE w celu DB) i b.- (publikować wiadomości) powinny być wykonywane w transakcji, niepodzielnie. Jak mogę to osiągnąć?

Kafka obecnie nie obsługuje transakcji (a zatem również nie wycofuje ani nie zatwierdza), które trzeba zsynchronizować w podobny sposób. Krótko mówiąc: nie możesz robić tego, co chcesz. To zmieni się w najbliższą przyszłość, kiedy KIP-98 zostanie połączone, ale to może jeszcze trochę potrwać. Ponadto, nawet przy transakcjach w Kafce, transakcja atomowa pomiędzy dwoma systemami jest bardzo trudna do zrobienia, wszystko, co nastąpi, zostanie poprawione tylko dzięki obsłudze transakcyjnej w Kafce, to jednak nie rozwiąże to problemu. W tym celu należy sprawdzić implementację jakiejś formy two phase commit w swoich systemach.

można dostać nieco blisko konfigurując właściwości producentów, ale w końcu trzeba będzie wybierać pomiędzy przynajmniej raz lub co najwyżej raz dla jednego ze swoich systemów (MariaDB lub Kafki).

Zacznijmy od tego, co możesz zrobić w Kafce, aby zapewnić dostarczenie wiadomości, a następnie przyjrzymy się możliwościom ogólnego przepływu procesów i ich konsekwencji.

gwarantowana dostawa

Można skonfigurować ile pośrednicy muszą potwierdzić odbiór wiadomości, zanim wniosek zostanie zwrócone z parametrem ACK: ustawienie to wszystko Ci powiedzieć broker, aby poczekać, aż wszystkie repliki potwierdzą twoją wiadomość, zanim zwrócą ci odpowiedź. To wciąż nie jest 100% gwarancji, że twoja wiadomość nie zostanie utracona, ponieważ została zapisana tylko w pamięci podręcznej strony i istnieją teoretyczne scenariusze, w których broker zawodzi, zanim zostanie utrwalony na dysku, gdzie wiadomość może nadal zostać utracona. Ale to jest tak dobra gwarancja, jak masz zamiar dostać. Możesz dodatkowo zmniejszyć ryzyko utraty danych, obniżając interwale, w którym brokerzy wymuszają fsync na dysk (podkreślony tekst i/lub flush.ms), ale należy pamiętać, że wartości te mogą przynieść im dużą wydajność kary.

Oprócz tych ustawień należy poczekać, aż producent Kafka zwróci odpowiedź na żądanie użytkownika i sprawdzić, czy wystąpił wyjątek. Ten rodzaj więzi z drugą częścią twojego pytania, więc pójdę w to dalej. Jeśli odpowiedź jest czysta, możesz być pewny, że twoje dane dotarły do ​​Kafki i zacząć martwić się o MariaDB.

Wszystko, co omówiliśmy do tej pory, dotyczy tylko tego, jak zapewnić, że Kafka ma twoje wiadomości, ale musisz również zapisywać dane w MariaDB, a to również może się nie udać, co wymagałoby przypomnienia wiadomości, którą potencjalnie już znasz. wysłany do Kafki - a tego nie możesz zrobić.

Więc w zasadzie trzeba wybrać jeden system, w którym jesteś w stanie lepiej radzić sobie z duplikatów/wartości brakujących (w zależności od tego, czy ponowne częściowe awarie) i że będzie wpływać na kolejność robisz rzeczy.

Wariant 1

Kafka first

W tej opcji możesz zainicjować transakcję w MariaDB, a następnie wysłać wiadomość do Kafki, czekać na odpowiedź i jeśli wyślij powiodła popełnienia transakcję w MariaDB. Jeśli wysyłanie do Kafki nie powiedzie się, możesz wycofać transakcję w MariaDB i wszystko jest eleganckie. Jeśli jednak wysłanie do Kafki zakończy się sukcesem, a twoje zobowiązanie do MariaDB z jakiegoś powodu nie powiedzie się, nie ma możliwości odzyskania wiadomości od Kafki. Będziesz więc albo przegapić wiadomość w MariaDB, albo mieć duplikat wiadomości w Kafce, jeśli ponownie wyślesz wszystko później.

Wariant 2

MariaDB first

Jest to dość dużo właśnie na odwrót, ale prawdopodobnie jesteś w stanie lepiej usunąć wiadomość, że został napisany w MariaDB, w zależności od modelu danych.

Oczywiście można złagodzić oba podejścia, śledząc nieudane wysyłanie i ponawiając je później, ale wszystko to jest bardziej opasłe w przypadku większego problemu.

Osobiście podchodzę do podejścia 1, ponieważ szansa niepowodzenia popełnienia błędu powinna być nieco mniejsza niż sama wysyłka i wykonać jakąś kontrolę dupek po drugiej stronie Kafki.


Jest to związane z poprzednim: I wysłać wiadomość z. orderSource.output() send (MessageBuilder.withPayload (kolejności) .build()); Ta operacja jest asynchroniczna i ZAWSZE zwraca wartość true, niezależnie od tego, czy broker Kafka jest wyłączony. Skąd mogę wiedzieć, że wiadomość dotarła do brokera Kafki ?

Teraz, po pierwsze, przyznaję, że nie jestem zaznajomiony ze sprężyną, więc może ci się to nie przydać, ale poniższy fragment kodu ilustruje jeden ze sposobów sprawdzania odpowiedzi na wyjątki. Przez wywołanie opróżnij blokuj do momentu zakończenia wszystkich wysyłek (i albo się nie powiodło lub się powiodło), a następnie sprawdź wyniki.

Producer<String, String> producer = new KafkaProducer<>(myConfig); 
final ArrayList<Exception> exceptionList = new ArrayList<>(); 

for(MessageType message : messages){ 
    producer.send(new ProducerRecord<String, String>("myTopic", message.getKey(), message.getValue()), new Callback() { 
    @Override 
    public void onCompletion(RecordMetadata metadata, Exception exception) { 
     if (exception != null) { 
     exceptionList.add(exception); 
     } 
    } 
    }); 
} 

producer.flush(); 

if (!exceptionList.isEmpty()) { 
    // do stuff 
} 
+0

edytowany pytanie z nowym podejściem śledzę, jednak biorąc pod uwagę, że odpowiedź jest bardzo wyjaśniając mi powracanie go do oryginału i uruchom nowy z edytowaną wersją. Wrócę z opinią o Twojej odpowiedzi. Dzięki! – codependent

+0

Sönke, wszystko jasne, doceniam dokładne wyjaśnienie. Dla zainteresowanych, w jaki sposób zapewnić dostarczenie wiadomości ze Spring Cloud Stream: https://github.com/spring-cloud/spring-cloud-stream/issues/795 – codependent

+0

@codependent Jak o posiadaniu bramy api lub cokolwiek wysłać najpierw kafka, a potem dwie mikroserwisy subskrybują wiadomości kafka .. czy to nie jest opłacalne? lub nie robisz tego, ponieważ chcesz mieć spójne dane zamiast ostatecznie spójne? –

2

Myślę, że właściwą drogą do realizacji Event Sourcing jest poprzez Kafka być wypełnione bezpośrednio ze zdarzeń pchane przez plugin, który odczytuje z RDBMS binlog np użyciu confluent Woda butelkowana (https://www.confluent.io/blog/bottled-water-real-time-integration-of-postgresql-and-kafka/) lub bardziej aktywną Debezium (http://debezium.io/). Następnie konsumowanie Microservices może wysłuchać tych zdarzeń, skonsumować je i działać zgodnie z ich bazami danych, które są ostatecznie zgodne z bazą danych RDBMS.

Sprawdźcie tutaj do mojej pełnej odpowiedzi na wytyczne: https://stackoverflow.com/a/43607887/986160