Von der Standardverpackung bis zur kompletten Kundenanfertigung
In einem Projekt, in dem ich teilweise für die Integration von Airflow mit anderen Komponenten verantwortlich bin, wurde mir die auf den ersten Blick einfache Aufgabe „Senden von benutzerdefinierten E-Mails mit Aufgabenstatus“ zugewiesen. Es scheint ziemlich einfach zu sein, weil Luftstrom habe bereits einen Mechanismus für eine solche Aufgabe eingebaut:
Aus der Box
Sie müssen lediglich Ihren SMTP-Server im E-Mail-Bereich der Airflow-Konfigurationsdatei einrichten (Standard airflow.cfg):
[email]
email_backend = airflow.utils.email.send_email_smtp
[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = <your smtp host for ex.smtp.gmail.com>
smtp_starttls = False
smtp_ssl = False
# Uncomment and set the user/pass settings if you want to use SMTP AUTH
# smtp_user = airflow
# smtp_password = airflow
smtp_port = 25
smtp_mail_from = <ex. sender@fromdomain.com>
Wenn Sie nur allgemeine E-Mail-Informationen über Fehler oder Aufgabenwiederholungen in Ihrem Workflow erhalten möchten, fügen Sie einfach Ihrer DAG-Definition email_on_failure bzw. email_on_retry hinzu. Im folgenden Beispiel sendet Airflow im Falle eines Fehlers von 'hello_task' eine E-Mail mit einer Benachrichtigung.
from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
def print_hello():
return 'Hello world!'
dag = DAG('hello_world', description='Simple tutorial DAG',
schedule_interval='0 12 * * *',
start_date=datetime(2017, 3, 20))
dummy_operator = DummyOperator(task_id='dummy_task',
retries=3,
email=<'receiver@host.com' or list(adreses)>
email_on_retrie=True,
dag=dag)
hello_operator = PythonOperator(task_id='hello_task',
email=<'receiver@host.com' or list(adreses)>
email_on_failure=True,
python_callable=print_hello,
dag=dag)
dummy_operator >> hello_operator
Wenn Sie E-Mails erhalten möchten, falls eine der Aufgaben fehlschlägt, können Sie die E-Mail-Parameter im defalut_arg-Wörterbuch wie folgt übergeben:
from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
args ={
'owner': 'Airflow',
'description': 'Simple tutorial DAG',
'start_date': datetime(2017, 3, 20)
'email': ['receiver@host.com', 'receiver2@host.com']
'email_on_failure':True
}
def print_hello():
return 'Hello world!'
Die generische Nachricht könnte geändert werden. Wir kehren zur Airflow-Konfigurationsdatei zurück und fügen hinzu:
im E-Mail-Bereich:
[email]
email_backend = airflow.utils.email.send_email_smtp
subject_template = /path/to/my_subject_template_file
html_content_template = /path/to/my_html_content_template_file
wo die Beispielvorlage aussehen könnte (in Airflow ist Jinja die Standard-Template-Engine):
Try {{try_number}} out of {{max_tries + 1}}<br>
Exception:<br>{{exception_html}}<br>
Log: <a href="{{ti.log_url}}">Link</a><br>
Host: {{ti.hostname}}<br>
Log file: {{ti.log_filepath}}<br>
Mark success: <a href="{{ti.mark_success_url}}">Link</a><br>
(alle dynamischen Felder stammen aus dem „Kontext“ -Wörterbuch)
Wie kann man erfolgreich sein
Nach ein paar weiteren lästigen Workflows könnte das Postfach etwas traurig und deprimierend aussehen. Was ist, wenn Informationen über erfolgreich abgeschlossene Aufgaben abgerufen werden müssen? Standardmäßig gibt es keinen Mechanismus für Erfolgs-E-Mails wie email_on_failure oder email_on_retry. Aber wir könnten on_success_callback und on_failure_callback verwenden
Wie kann Airflow Ihrem Unternehmen helfen? Besuchen Sie unsere Automatisierung der Datenpipeline Seite und finden Sie eine Lösung, die Ihren Bedürfnissen entspricht
Diese beiden Methoden lösen jede benutzerdefinierte Python-Funktion aus und übergeben das Kontextwörterbuch als Argument. Schauen wir uns das Beispiel an:
from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.email import send_email
def notify_email(kwargs):
"""Send custom email alerts."""
ti = kwargs['ti']
dag_run = kwargs['dag_run']
var = kwargs['var']['json']
params = kwargs['params']
recipient_emails=['receiver@host.com', 'receiver2@host.com']
logs_link = '{}/log?task_id={}&dag_id={}&execution_date={}'.format(conf.get("webserver", "base_url"), ti.task_id, ti.dag_id, ti.execution_date)
title = ''
body = ''
# email title.
if if dag_run._state == State.SUCCESS:
title = f"Airflow alert: {dag_run.dag_id} succeed"
body = """
Hi Everyone, <br>
<br>
Job {dag_id} is a great success.<br>
<br>
Forever yours,<br>
Airflow bot <br>
""".format(dag_id=dag_run.dag_id)
else:
if ti.state == State.FAILED:
title = f"Airflow alert: {dag_run.dag_id} failed on {ti.task_id}"
body = """
Hi Everyone, <br>
<br>
Task {task_id} failed.<br>
Check what goes wrong {log_link}<br>
<br>
Forever yours,<br>
Airflow bot <br>
""".format(task_id=ti.task_id,log_link=log_link )
else: raise AirflowException('{} task state is not supported in email notifications'.format(ti.state))
send_email(recipient_emails, title, body)
def print_hello():
return 'Hello world!'
dag = DAG('hello_world', description='Simple tutorial DAG',
schedule_interval='0 12 * * *',
on_success_callback=notify_email
start_date=datetime(2017, 3, 20))
hello_operator = PythonOperator(task_id='hello_task',
on_failure_callback=notify_email,
python_callable=print_hello,
dag=dag)
hello_operator
TODO-Beschreibung
Besuchen Sie unseren Blog für ausführlichere Artikel zu Airflow:
Leistungsstarke REST-API in Airflow 2.0 — was müssen Sie wissen?
- Unterschiede zwischen Airflow 1.10.x und 2.0
- Leistungsstarke REST-API in Airflow 2.0 — was müssen Sie wissen?
- Wie kann die Airflow 2.0-Leistung mit intelligenten Sensoren verbessert werden?