mysql
  • python-3.x
  • airflow
  • 2017-10-10 85 views 5 likes 
    5
    def mysql_operator_test(): 
        DEFAULT_DATE = datetime(2017, 10, 9) 
        t = MySqlOperator(
         task_id='basic_mysql', 
         sql="SELECT count(*) from table 1 where id>100;", 
         mysql_conn_id='mysql_default', 
         dag=dag) 
        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=False) 
    
    run_this = PythonOperator(
        task_id='getRecoReq', 
        python_callable=mysql_operator_test, 
        # xcom_push=True, 
        dag=dag) 
    
    task2 = PythonOperator(
        task_id= 'mysql_select', 
        provide_context=True, 
        python_callable = blah, 
        templates_dict = {'requests': "{{ ti.xcom_pull(task_ids='getReq') }}" }, 
        dag=dag) 
    
    run_this.set_downstream(task2) 
    

    Chcę przechwycić liczbę zwróconą przez MySqlOperator za pomocą xcomów. Czy ktoś może podać przewodnik dotyczący tego samego?Jak używać xcomów przepływu powietrza za pomocą MySqlOperator

    Odpowiedz

    1

    Jesteś bardzo blisko! Jednak sposób, w jaki zadajesz to pytanie, jest rodzajem anty-wzoru. Nie chcesz udostępniać danych pomiędzy zadaniami w przepływie powietrza. Ponadto nie chcesz używać operatora, tak jak w przypadku mysql_operator_test. To kuszące, robiłem to samo, gdy zaczynałem.

    Próbowałem czegoś bardzo podobnego do tego, ale z połączeniami SFTP. W końcu zrobiłem wszystko wewnątrz PythonOperator i użyłem leżących poniżej haczyków.

    Polecam użyć MySQLHook wewnątrz python_callable. Coś takiego:

    def count_mysql_and_then_use_the_count(): 
        """ 
        Returns an SFTP connection created using the SSHHook 
        """ 
        mysql_hook = MySQLHook(...) 
        cur = conn.cursor() 
        cur.execute("""SELECT count(*) from table 1 where id>100""") 
        for count in cur: 
         # Do something with the count... 
    

    Nie jestem pewien, czy to będzie działać jak jest, ale idea jest użyć haka wewnątrz Python wywoływalnym, nie używać MySQLHook często, ale zrobiłem to z SSHHook i działało świetnie.

     Powiązane problemy

    • Brak powiązanych problemów^_^