2016-08-03 34 views
25

Załóżmy, że masz DAG-a z przepływem powietrza, który nie ma sensu zasypywania, co oznacza, że ​​po jego uruchomieniu raz, jego kolejne uruchomienie szybko byłoby zupełnie bezcelowe.Jak zapobiec przepływowi powietrza z zasypywania działek?

Na przykład, jeśli ładujesz dane z jakiegoś źródła, które jest aktualizowane tylko co godzinę w bazie danych, zasypywanie, które następuje szybko po sobie, będzie po prostu importować te same dane raz za razem.

Jest to szczególnie denerwujące, gdy tworzysz nowe, godzinne zadanie, i uruchamia ono N ilość razy dla każdej godziny, którą przegapił, wykonując nadmiarową pracę, zanim zacznie działać w podanym przedziale.

Jedynym rozwiązaniem można myślę, że to coś specjalnie odradzane w FAQ of the docs

Zalecamy przed użyciem wartości dynamiczne jak datą_początkową, zwłaszcza datetime.now() jak to może być dość kłopotliwe.

Czy istnieje sposób, aby wyłączyć zasypywanie w przypadku DAG, czy powinienem wykonać powyższe czynności?

Odpowiedz

17

Uaktualnij do wersji przepływu powietrza 1.8 i użyj catchup_by_default = False w airflow.cfg lub zastosuj catchup = False do każdego ze swoich dagów.

https://github.com/apache/incubator-airflow/blob/master/UPDATING.md#catchup_by_default

+0

Dzięki. Jest to o wiele lepsze niż narzędzie LatestOnlyOperator. – m0meni

+2

Ustawiłem catchup_by_default = False, ale Airflow wciąż wypełnia zadania. Każdy pomysł, dlaczego? Używam wersji 1.8 –

+0

@OllieGlass Czy na pewno zastosowałeś ją do właściwego pojemnika, nie wiem dokładnie, co to jest twoja konfiguracja, ale to na pewno ma znaczenie. Możesz także spróbować zastosować go do określonych DAG, jeśli nie masz pewności. – sage88

10

To wydaje się być nierozwiązanym problemem z przepływem powietrza. Wiem, że bardzo chciałbym mieć dokładnie tę samą funkcję. Oto tyle, ile dostałem; może być użyteczny dla innych.

Są funkcje interfejsu użytkownika (przynajmniej w 1.7.1.3), które mogą pomóc w rozwiązaniu tego problemu. Jeśli przejdziesz do widoku drzewa i klikniesz na określone zadanie (kwadratowe pola), pojawi się przycisk dialogowy z przyciskiem "Oznacz sukces". Kliknięcie przycisku "przeszłość", a następnie kliknięcie przycisku "Oznacz sukces" spowoduje oznaczenie wszystkich wystąpień tego zadania w DAG jako udanych i nie zostaną one uruchomione. Najwyższy poziom DAG (kółka na górze) może być również oznaczony jako udany w podobny sposób, ale wydaje się, że nie ma sposobu na etykietowanie wielu instancji DAG.

Nie zajrzałem jeszcze dostatecznie głęboko, ale możliwe jest użycie podkomendy "trigger_dag" do oznaczania stanów DAG. zobacz tutaj: https://github.com/apache/incubator-airflow/pull/644/commits/4d30d4d79f1a18b071b585500474248e5f46d67d

CLI wyposażone oznaczyć DAG jest w pracach: http://mail-archives.apache.org/mod_mbox/airflow-commits/201606.mbox/%[email protected]%3E https://github.com/apache/incubator-airflow/pull/1590

UPDATE (28.09.2016) został dodany nowy operator 'LatestOnlyOperator' (https://github.com/apache/incubator-airflow/pull/1752), który uruchomi tylko najnowszą wersję zadań niższego rzędu. Brzmi bardzo użytecznie i mam nadzieję, że niedługo pojawi się w wydaniach.

AKTUALIZACJA 2: Od czasu przepływu powietrza 1.8 został zwolniony LatestOnlyOperator.

+0

Aktualizacja wygląda naprawdę obiecująco! Dziękuję za odpowiedź. – m0meni

+1

Należy zauważyć, że funkcja Ostatnia operacja wyłącza zadania po przejściu do stanu "pominiętego". W przypadku dokumentów pomijane stany propagują tak, że wszystkie bezpośrednie zadania upstream są również pomijane. To sprawia, że ​​podejście nie nadaje się, gdy chcesz (lubisz), aby poprzednie zadania działały poprawnie z nieaktualnymi danymi. W takim przypadku najlepszym rozwiązaniem jest dodanie wczesnego operatora w kodzie, który ucieka do sukcesu, jeśli zadanie jest uruchamiane szczególnie późno. – russellpierce

+1

Polecenie wypełnienia dla cli wygląda na dostępne i prawdopodobnie jest to najlepszy sposób na teraz. https://airflow.incubator.apache.org/cli.html Strumień przepływu powietrza -h [nazwa hosta tutaj] -m = Prawda -s [data początkowa] -e $ (data + "% Y-% m-% dT:% H:% M:% S ") – sage88