8

Mam dystrybuowane/stowarzyszonej bazy danych posiada następującą strukturę:Optymalizacja przepustowość sieci nad rozproszonymi pracy agregacji bazodanowych

  1. Bazy danych są rozłożone w trzech lokalizacjach geograficznych („węzły”)
  2. wielu baz danych są skupione na siebie węzeł
  3. Relacyjne bazy danych to połączenie PostgreSQL, MySQL, Oracle i MS SQL Server; nie-relacyjnych baz danych albo MongoDB lub Cassandro
  4. Luźne sprzęgającą wewnątrz każdego węzła i przez Federacji węźle odbywa się za pomocą RabbitMQ, przy czym każdy węzeł działa maklera RabbitMQ

ja wykonawczego tylko do odczytu agregacji między węzłami system zadań dla zadań, które obejmują federację węzłów (tj. dla zadań, które nie są lokalne dla węzła). Te zadania wykonują tylko kwerendy "get" - nie modyfikują baz danych. (Jeśli wyniki zadań mają pochodzić z jednej lub więcej baz danych, to jest to realizowane przez oddzielne zadanie, które nie jest częścią między-węzłowego systemu zadań, który próbuję zoptymalizować.) Moim celem jest zminimalizowanie przepustowość sieci wymagana przez te zadania (najpierw w celu zminimalizowania szerokości pasma między węzłami/WAN, a następnie w celu zminimalizowania przepustowości wewnątrz węzła/sieci LAN); Zakładam jednakowy koszt dla każdego łącza WAN i inny jednolity koszt dla każdego łącza LAN. Miejsca pracy nie są szczególnie wrażliwe na czas. Wykonuję pewne równoważenie obciążenia procesora w węźle, ale nie między węzłami.

Ilość danych przesyłanych przez sieć WAN/LAN dla zadań agregacji jest niewielka w stosunku do liczby zapisów do bazy danych, które są lokalne dla klastra lub określonej bazy danych, więc nie byłoby praktycznie w pełni dystrybuować bazy danych przez federację.

Podstawowy algorytm używać do minimalizacji przepustowość sieci jest:

  1. Biorąc pod uwagę pracę, która działa na zbiorze danych, które są rozłożone w całej federacji, węzeł menedżer wysyła wiadomość do każdego z pozostałych węzłów zawiera odpowiednie zapytania do bazy danych.
  2. Każdy węzeł uruchamia zestaw zapytań, kompresuje je za pomocą programu gzip, buforuje je i wysyła ich skompresowane rozmiary do węzła menedżera.
  3. Menedżer przesuwa się do węzła zawierającego wiele danych (w szczególności do maszyny w klastrze, która ma najwięcej danych i która ma bezczynne rdzenie); żąda reszty danych z pozostałych dwóch węzłów i innych maszyn w klastrze, a następnie uruchamia zadanie.

Gdy jest to możliwe, zadania wykorzystują metodę dziel i rządź, aby zminimalizować ilość wymaganych kolokacji danych. Na przykład, jeśli zadanie musi obliczyć sumy wszystkich wartości sprzedaży w federacji, wówczas każdy węzeł lokalnie oblicza swoje sumy sprzedaży, które następnie są agregowane w węźle menedżera (zamiast kopiowania wszystkich nieprzetworzonych danych sprzedaży do węzła menedżera) . Czasami jednak (np. Podczas łączenia między dwiema tabelami, które znajdują się w różnych węzłach) potrzebne jest kolokacja danych.

Pierwszą rzeczą, którą zrobiłem, aby to zoptymalizować, było zsumowanie miejsc pracy i uruchomienie zagregowanych zadań w dziesięciominutowych epokach (wszystkie maszyny działają z NTP, więc mogę być w miarę pewna, że ​​"co dziesięć minut" oznacza to samo rzecz w każdym węźle). Celem jest udostępnienie tych samych danych dwóm pracownikom, co zmniejsza całkowity koszt transportu danych.

  1. Przy dwóch zadaniach, które wysyłają zapytania do tej samej tabeli, generuję zestaw wyników każdego zadania, a następnie przechodzę przez punkt przecięcia dwóch zestawów wyników.
  2. Jeśli oba zadania mają być uruchamiane w tym samym węźle, wówczas koszt transferu sieciowego jest obliczany jako suma dwóch zestawów wyników minus przecięcie dwóch zestawów wyników.
  3. Te dwa zestawy wyników są przechowywane w tymczasowych tabelach PostgreSQL (w przypadku danych relacyjnych) lub tymczasowo w kolekcjach Cassandra columnfamilies/MongoDB (w przypadku danych nosql) w węźle wybranym do uruchamiania zadań; oryginalne kwerendy są następnie wykonywane w odniesieniu do połączonych zestawów wyników, a dostarczane dane są do poszczególnych zadań. (Ten krok jest wykonywany tylko w połączonych zestawach wyników, poszczególne dane zestawu wyników są po prostu dostarczane do jego pracy bez wcześniejszego przechowywania na tymczasowych tabelach/rodzinach kolumn/kolekcjach.)

Powoduje to poprawę przepustowości sieci, ale ja zastanawiam się, czy istnieje framework/biblioteka/algorytm, który poprawiłby się w tym zakresie. Jedną z opcji, którą rozważałem, jest buforowanie zestawów wyników w węźle i uwzględnianie zbuforowanych zestawów wyników przy określaniu przepustowości sieci (tj. Próba ponownego użycia zestawów wyników między zadaniami, oprócz bieżącego zestawu wcześniej zaplanowanych, współlokowanych zadań, tak aby np. zadanie uruchomione w jednej 10-minutowej epoce może użyć zbuforowanego zestawu wyników z poprzedniego 10-minutowego zestawu wyników), ale jeśli zadania nie używają dokładnie tych samych zestawów wyników (tj. jeśli nie używają identycznych klauzul dotyczących miejsc), to nie znam ogólnego algorytm celu, który wypełniłby luki w zestawie wyników (na przykład, gdyby zestaw wyników używał klauzuli "gdzie N> 3" i innego zadania wymaga zestawu wyników z klauzulą ​​"gdzie N> 0", to jaki algorytm mógłbym użyć do określić, że muszę wziąć unię oryginalnego zestawu wyników, a zestaw wyników z klauzulą ​​"gdzie N> 0 I N < = 3") - Mógłbym spróbować napisać własny algorytm, aby to zrobić, ale wynik byłby buggy bezużyteczny bałagan. Chciałbym również określić, kiedy dane w pamięci podręcznej są nieaktualne - najprostszym sposobem jest porównanie datownika z pamięci podręcznej z ostatnio zmodyfikowanym znacznikiem czasu w tabeli źródłowej i zastąpienie wszystkich danych, jeśli znacznik czasu się zmienił, ale najlepiej Chciałbym móc aktualizować tylko wartości, które uległy zmianie w przypadku znaczników czasu dla poszczególnych wierszy lub porcji.

+0

Czy łatwiej byłoby w pełni rozdzielić tabele w pełni do każdej witryny, a nie próbować obsługiwać fragmentów z częściowych klauzul? Miejsce na dysku jest tanie, ale zależy to od tego, jak często zmieniają się dane, a jak wąskie są twoje predykaty, czy to zmniejszy ruch w sieci. – rlb

+0

@rlb Problem polega na tym, że w każdym klastrze jest dużo działań zapisu, a więc w pełni rozproszone tabele oznaczałyby, że ta aktywność zapisu musiałaby propagować do każdego klastra, nawet jeśli nie jest potrzebna. Na przykład jedna baza danych to finansowa baza danych z cenami akcji, co oznacza, że ​​istnieje ** lot ** zapisów bazy danych. Sfederowane zadania prawdopodobnie potrzebują tylko migawki tych danych co najwyżej co godzinę, co stanowi ułamek przepustowości sieci, która byłaby potrzebna do propagacji danych dla każdej aktualizacji zasobów do każdego klastra. –

+0

Dobrze rozumiem problem z głośnością. Czy masz kontrolę nad tym, co jest przesyłane przez przewód? Przełączyliśmy się z wiersza na kolumnę dla zestawów wyników i współczynniki kompresji poszły w górę, więc ta mighht to prosta wygrana, niskie ryzyko, ale nie to, o co dokładnie prosisz. Będziemy polować w biurze, szukając czegoś na twoje aktualne pytanie, ale głównie pracujemy nad optymalizacją jako rozproszone sprzężenie, które, jak wspomniałeś, może być flakowane, jeśli nie zostanie wykonane perfekcyjnie. – rlb

Odpowiedz

4

Zacząłem wdrażać moje rozwiązanie tego problemu.

Aby uprościć pamięć podręczną wewnątrz węzłów, a także uprościć równoważenie obciążenia procesora, używam bazy danych Cassandra w każdym klastrze bazy danych ("węzeł Cassandra") do uruchamiania zadań agregacji (wcześniej agregowałem lokalne zestawienia wyników bazy danych ręcznie) - Używam pojedynczej bazy danych Cassandra dla danych relacyjnych, Cassandry i MongoDB (wadą jest to, że niektóre zapytania relacyjne działają wolniej na Cassandrze, ale jest to spowodowane tym, że pojedyncza agregacja jest zjednoczona baza danych jest łatwiejsza do utrzymania niż oddzielne relacyjne i nierelacyjne bazy danych agregacji). Nie zbieram też zadań w dziesięciominutowych epokach, ponieważ pamięć podręczna czyni ten algorytm zbędnym.

Każda maszyna w węźle odnosi się do rodziny kolumn Cassandra o nazwie Cassandra_Cache_ [MachineID], która służy do przechowywania identyfikatorów key_ids i column_ids, które wysłała do węzła Cassandra. Rodzina kolumn Cassandra_Cache składa się z kolumny Tabela, kolumny Primary_Key, kolumny Column_ID, kolumny Last_Modified_Timestamp, kolumny Last_Used_Timestamp i klucza złożonego składającego się z Table | Primary_Key | Column_ID. Kolumna Last_Modified_Timestamp oznacza datownik o nazwie last_modified timestamp ze źródłowej bazy danych, a kolumna Last_Used_Timestamp oznacza znacznik czasu, w którym dane były ostatnio używane/odczytywane przez zadanie agregacji. Kiedy węzeł Cassandra żąda danych z komputera, maszyna oblicza zestaw wyników, a następnie pobiera ustawioną różnicę zestawu wyników i tabeli | klucz | kolumny, które znajdują się w jej Cassandra_Cache i które mają ten sam Last_Modified_Timestamp jako wiersze w jego Cassandra_Cache (jeśli znaczniki czasu nie są zgodne, a dane w pamięci podręcznej są nieaktualne i są aktualizowane wraz z nowym Last_Modified_Timestamp).Następnie lokalny komputer wysyła ustawioną różnicę do węzła Cassandra i aktualizuje swój Cassandra_Cache z ustawioną różnicą i aktualizuje Last_Used_Timestamp dla każdego buforowanego punktu odniesienia, który został użyty do skompilowania zestawu wyników. (Prostszą alternatywą dla zachowania oddzielnego znacznika czasu dla każdej tabeli | klucz | kolumna jest zachowanie znacznika czasu dla każdego klawisza |, ale jest to mniej precyzyjne, a znacznik czasu tabeli | znacznik | kolumny nie jest nadmiernie skomplikowany.) Zachowanie Last_Used_Timestamps w Synchronizacja między Cassandra_Caches wymaga tylko, aby lokalne maszyny i zdalne węzły wysyłały Last_Used_Timestamp związane z każdym zadaniem, ponieważ wszystkie dane w zadaniu używają tego samego Last_Used_Timestamp.

Węzeł Cassandra aktualizuje zestaw wyników o nowe dane, które otrzymuje z węzła, a także z danymi, które otrzymuje z innych węzłów. Węzeł Cassandra utrzymuje również rodzinę kolumn przechowującą te same dane, które znajdują się w Cassandra_Cache każdego urządzenia (z wyjątkiem Last_Modified_Timestamp, który jest wymagany tylko na komputerze lokalnym, aby określić, kiedy dane są nieaktualne), wraz z identyfikatorem źródła wskazującym, czy dane pochodzą z wewnątrz węzła lub z innego węzła - id rozróżnia różne węzły, ale nie rozróżnia różnych maszyn w węźle lokalnym. (Inną opcją jest użycie zunifikowanego Cassandra_Cache zamiast używania jednego Cassandra_Cache na maszynę i innego Cassandra_Cache dla węzła, ale zdecydowałem, że dodatkowa złożoność nie była warta oszczędności miejsca.)

Każdy węzeł Cassandra także utrzymuje Federated_Cassandra_Cache, który składa się z {Kody baz danych, tabeli, klucza podstawowego, identyfikatora kolumny, ostatniego_użytego_znacznika} krotek, które zostały wysłane z węzła lokalnego do jednego z dwóch pozostałych węzłów.

Gdy zadanie przechodzi przez potok, każdy węzeł Cassandra aktualizuje swoją pamięć podręczną wewnątrz węzła za pomocą lokalnych zestawów wyników, a także wykonuje zadania podrzędne, które mogą być wykonywane lokalnie (np. W zadaniu, aby zsumować dane między wieloma węzłami, każdy węzeł sumuje dane wewnątrz węzła, aby zminimalizować ilość danych, które muszą być umieszczone w federacji między-węzłowej) - pod-zadanie może być wykonywane lokalnie, jeśli wykorzystuje tylko dane wewnątrz węzłów. Węzeł menedżera następnie określa, który węzeł wykona resztę zadania: każdy węzeł Cassandra może lokalnie obliczyć koszt wysłania zestawu wyników do innego węzła, przyjmując ustawioną różnicę jego zestawu wyników i podzestawu zestawu wyników, który został buforowany zgodnie do jego Federated_Cassandra_Cache, a węzeł menedżera minimalizuje równanie kosztów ["koszt transportu zestaw wyników z NodeX" + "koszt transportu zestaw wyników z NodeY"]. Na przykład, koszt węzła 1 {3, 5} przesyła swój zestaw wyników do {Węzeł2, Węzeł3}, kosztuje węzeł2 {2, 2}, aby przetransportować jego zestaw wyników do węzła {Węzeł1, Węzeł3}, i kosztuje Węzeł3 {4, 3} aby przenieść jego zestaw wyników do węzła {Węzeł1, Węzeł2}, dlatego zadanie jest uruchamiane w węźle 1 z kosztem "6".

Używam zasad eksmisji LRU dla każdego węzła Cassandra; Początkowo korzystałem z najstarszej zasady eksmisji, ponieważ jest ona prostsza w implementacji i wymaga mniejszej liczby zapisów do kolumny Last_Used_Timestamp (raz na aktualizację bazy danych zamiast raz na odczyt danych), ale wdrożenie zasady LRU okazało się nie być przesadnie złożone i Last_Used_Timestamp pisze nie tworzyły wąskiego gardła. Kiedy węzeł Cassandra osiąga 20% wolnej przestrzeni, eksmituje dane, aż osiągnie 30% wolnej przestrzeni, stąd każda eksmisja jest w przybliżeniu równa 10% całkowitej dostępnej przestrzeni. Węzeł utrzymuje dwa znaczniki czasu: znacznik czasu ostatnich eksmitowanych danych wewnątrz węzłów i znacznik czasu ostatnich wyeksploatowanych danych między węzłami/federacjami; ze względu na zwiększoną latencję komunikacji między węzłami w porównaniu z komunikacją wewnątrz węzła, celem polityki eksmisji jest 75% danych buforowanych danych między węzłami, a 25% danych w pamięci podręcznej to dane wewnątrz węzłów , które można szybko uzyskać w przybliżeniu, ponieważ 25% każdej eksmisji jest danymi między-węzłami, a 75% każdej eksmisji jest danymi wewnątrz węzłów. Eksmisja działa następująco:

while(evicted_local_data_size < 7.5% of total space available) { 
    evict local data with Last_Modified_Timestamp < 
     (last_evicted_local_timestamp += 1 hour) 
    update evicted_local_data_size with evicted data 
} 

while(evicted_federated_data_size < 2.5% of total space available) { 
    evict federated data with Last_Modified_Timestamp < 
     (last_evicted_federated_timestamp += 1 hour) 
    update evicted_federated_data_size with evicted data 
} 

eksmitowany dane nie zostaną trwale usunięte aż potwierdzenia eksmisji zostały otrzymane od maszyn w obrębie węzła i od innych węzłów.

Węzeł Cassandra wysyła powiadomienie do komputerów w swoim węźle, wskazując, jaki jest nowy ostatni_wybrany_local_timestamp. Lokalne komputery aktualizują swoje Cassandra_Cache w celu odzwierciedlenia nowego znacznika czasu i wysyłają powiadomienie do węzła Cassandra, gdy jest to kompletne; kiedy węzeł Cassandra otrzymał powiadomienia od wszystkich lokalnych maszyn, to trwale usuwa eksmitowane dane lokalne. Węzeł Cassandra wysyła również powiadomienie do odległych węzłów z nowym last_evicted_federated_timestamp; pozostałe węzły aktualizują swoje Federated_Cassandra_Caches w celu odzwierciedlenia nowego znacznika czasu, a węzeł Cassandra trwale usuwa eksmitowane dane federacyjne po otrzymaniu powiadomień z każdego węzła (węzeł Cassandra śledzi, z którego węzła pochodzi dane, więc po otrzymaniu eksmisji potwierdzenie z NodeX, węzeł może trwale usunąć eksmitowane dane NodeX przed otrzymaniem potwierdzenia eksmisji od NodeY). Dopóki wszystkie komputery/węzły nie wysłały swoich powiadomień, węzeł Cassandra wykorzystuje buforowane eksmitowane dane w swoich kwerendach, jeśli otrzymuje zestaw wyników z komputera/węzła, który nie wyeksmitował swoich starych danych. Na przykład węzeł Cassandra ma lokalny wzorzec Tabela | Primary_Key | Column_ID, który został eksmitowany, a w międzyczasie lokalny komputer (który nie przetworzył żądania eksmisji) nie uwzględnił podstawowego zestawu danych Tabela | Primary_Key | Column_ID w swoim zestawie wyników, ponieważ uważa, że że węzeł Cassandra ma już dane w pamięci podręcznej; węzeł Cassandra odbiera zestaw wyników z maszyny lokalnej, a ponieważ lokalny komputer nie potwierdził żądania eksmisji, węzeł Cassandra zawiera buforowany eksmitowany punkt odniesienia w swoim własnym zestawie wyników.