Od gotowych rozwiązań do pełnej customizacji
W projekcie, w którym częściowo odpowiadam za integrację Airflow z innymi komponentami, przydzielono mi na pierwszy rzut oka proste zadanie: "Wysyłanie customowych maili ze statusem tasków". Wydaje się to dość łatwe, ponieważ Airflow ma już wbudowany mechanizm do tego typu zadań:
Gotowe rozwiązanie (Out of the box)
Skonfiguruj serwer smtp w sekcji e-mail pliku konfiguracyjnego Airflow (domyślnie airflow.cfg):
[email]
email_backend = airflow.utils.email.send_email_smtp
[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = <your smtp host for ex.smtp.gmail.com>
smtp_starttls = False
smtp_ssl = False
# Uncomment and set the user/pass settings if you want to use SMTP AUTH
# smtp_user = airflow
# smtp_password = airflow
smtp_port = 25
smtp_mail_from = <ex. sender@fromdomain.com>
Jeśli chcesz uzyskać tylko ogólne informacje e-mail o błędach lub próbach związanych z przepływem pracy, po prostu dodaj do definicji dnia email_on_failure lub email_on_retry. W przypadku awarii 'hello_task' strumień powietrza wyśle wiadomość e-mail z odpowiednim przykładem.
from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
def print_hello():
return 'Hello world!'
dag = DAG('hello_world', description='Simple tutorial DAG',
schedule_interval='0 12 * * *',
start_date=datetime(2017, 3, 20))
dummy_operator = DummyOperator(task_id='dummy_task',
retries=3,
email=<'receiver@host.com' or list(adreses)>
email_on_retrie=True,
dag=dag)
hello_operator = PythonOperator(task_id='hello_task',
email=<'receiver@host.com' or list(adreses)>
email_on_failure=True,
python_callable=print_hello,
dag=dag)
dummy_operator >> hello_operator
from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
args ={
'owner': 'Airflow',
'description': 'Simple tutorial DAG',
'start_date': datetime(2017, 3, 20)
'email': ['receiver@host.com', 'receiver2@host.com']
'email_on_failure':True
}
def print_hello():
return 'Hello world!'
Ogólne komunikaty mogą być zmodyfikowane, wracamy do pliku konfiguracyjnego przepływu powietrza i dodajemy:
w sekcji e-mail:
[email]
email_backend = airflow.utils.email.send_email_smtp
subject_template = /path/to/my_subject_template_file
html_content_template = /path/to/my_html_content_template_file
Gdzie można wyświetlić przykładowy szablon (w przepływie powietrza jinja jest domyślnym silnikiem szablonów):
Try {{try_number}} out of {{max_tries + 1}}<br>
Exception:<br>{{exception_html}}<br>
Log: <a href="{{ti.log_url}}">Link</a><br>
Host: {{ti.hostname}}<br>
Log file: {{ti.log_filepath}}<br>
Mark success: <a href="{{ti.mark_success_url}}">Link</a><br>
(wszystkie pola dynamiczne pochodzą ze słownika kontekstowego)
Jak odnieść sukces
Po to, by the more of the flow of the postal box of the postal box can be found in the most small and. Co jeśli jest potrzebna informacja o pomyślnie zakończonych zadaniach? Nie ma mechanizmu odnoszącego się do sukcesu wiadomości e-mail, takich jak email_on_failure lub email_on_retry. Ale możesz użyć on_success_callback i on_failure_callback
W jaki sposób Airflow może pomóc Twojemu firmie? Odwiedź nasze Automatyzacja rurociągu danych page and find
Te dwie metody wyzwalają dowolną niestandardową wersję Pythona i dostarczają słownika kontekstowego jako argument. Spójrzmy na przykład:
from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.email import send_email
def notify_email(kwargs):
"""Send custom email alerts."""
ti = kwargs['ti']
dag_run = kwargs['dag_run']
var = kwargs['var']['json']
params = kwargs['params']
recipient_emails=['receiver@host.com', 'receiver2@host.com']
logs_link = '{}/log?task_id={}&dag_id={}&execution_date={}'.format(conf.get("webserver", "base_url"), ti.task_id, ti.dag_id, ti.execution_date)
title = ''
body = ''
# email title.
if if dag_run._state == State.SUCCESS:
title = f"Airflow alert: {dag_run.dag_id} succeed"
body = """
Hi Everyone, <br>
<br>
Job {dag_id} is a great success.<br>
<br>
Forever yours,<br>
Airflow bot <br>
""".format(dag_id=dag_run.dag_id)
else:
if ti.state == State.FAILED:
title = f"Airflow alert: {dag_run.dag_id} failed on {ti.task_id}"
body = """
Hi Everyone, <br>
<br>
Task {task_id} failed.<br>
Check what goes wrong {log_link}<br>
<br>
Forever yours,<br>
Airflow bot <br>
""".format(task_id=ti.task_id,log_link=log_link )
else: raise AirflowException('{} task state is not supported in email notifications'.format(ti.state))
send_email(recipient_emails, title, body)
def print_hello():
return 'Hello world!'
dag = DAG('hello_world', description='Simple tutorial DAG',
schedule_interval='0 12 * * *',
on_success_callback=notify_email
start_date=datetime(2017, 3, 20))
hello_operator = PythonOperator(task_id='hello_task',
on_failure_callback=notify_email,
python_callable=print_hello,
dag=dag)
hello_operator
TODO - opis
Odwiedź nasz blog, aby znaleźć więcej artykułów na temat przepływu powietrza:
Interfejs API REST w Airflow 2.0 — co jest możliwe?
- Różnice między przepływem powietrza 1.10.x i 2.0
- Interfejs API REST w Airflow 2.0 — co jest możliwe?
- Jak zwiększyć wydajność Airlfow 2.0 dzięki inteligentnemu czujnikowi?