Benutzerdefinierte Airflow-E-Mails

Marcin Okuniewski
Marcin Okuniewski
May 7, 2025
9 min read
Loading the Elevenlabs Text to Speech AudioNative Player...

Von der Standardverpackung bis zur kompletten Kundenanfertigung

In einem Projekt, in dem ich teilweise für die Integration von Airflow mit anderen Komponenten verantwortlich bin, wurde mir die auf den ersten Blick einfache Aufgabe „Senden von benutzerdefinierten E-Mails mit Aufgabenstatus“ zugewiesen. Es scheint ziemlich einfach zu sein, weil Luftstrom habe bereits einen Mechanismus für eine solche Aufgabe eingebaut:

Aus der Box

Sie müssen lediglich Ihren SMTP-Server im E-Mail-Bereich der Airflow-Konfigurationsdatei einrichten (Standard airflow.cfg):

[email]
email_backend = airflow.utils.email.send_email_smtp

[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = <your smtp host for ex.smtp.gmail.com>
smtp_starttls = False
smtp_ssl = False
# Uncomment and set the user/pass settings if you want to use SMTP AUTH
# smtp_user = airflow
# smtp_password = airflow
smtp_port = 25
smtp_mail_from = <ex. sender@fromdomain.com>

Wenn Sie nur allgemeine E-Mail-Informationen über Fehler oder Aufgabenwiederholungen in Ihrem Workflow erhalten möchten, fügen Sie einfach Ihrer DAG-Definition email_on_failure bzw. email_on_retry hinzu. Im folgenden Beispiel sendet Airflow im Falle eines Fehlers von 'hello_task' eine E-Mail mit einer Benachrichtigung.

from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator

def print_hello():
    return 'Hello world!'

dag = DAG('hello_world', description='Simple tutorial DAG',
          schedule_interval='0 12 * * *',
          start_date=datetime(2017, 3, 20))

dummy_operator = DummyOperator(task_id='dummy_task',
                              retries=3,
                              email=<'receiver@host.com' or list(adreses)>
                              email_on_retrie=True,
                              dag=dag)

hello_operator = PythonOperator(task_id='hello_task',
                                email=<'receiver@host.com' or list(adreses)>
                                email_on_failure=True,
                                python_callable=print_hello,
                                dag=dag)             

dummy_operator >> hello_operator

Wenn Sie E-Mails erhalten möchten, falls eine der Aufgaben fehlschlägt, können Sie die E-Mail-Parameter im defalut_arg-Wörterbuch wie folgt übergeben:

from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
args ={
'owner': 'Airflow',
'description': 'Simple tutorial DAG',
'start_date': datetime(2017, 3, 20)
'email': ['receiver@host.com', 'receiver2@host.com']
'email_on_failure':True
}

def print_hello():
    return 'Hello world!'

Die generische Nachricht könnte geändert werden. Wir kehren zur Airflow-Konfigurationsdatei zurück und fügen hinzu:

im E-Mail-Bereich:

[email]
email_backend = airflow.utils.email.send_email_smtp

subject_template = /path/to/my_subject_template_file
html_content_template = /path/to/my_html_content_template_file

wo die Beispielvorlage aussehen könnte (in Airflow ist Jinja die Standard-Template-Engine):

Try {{try_number}} out of {{max_tries + 1}}<br>
Exception:<br>{{exception_html}}<br>
Log: <a href="{{ti.log_url}}">Link</a><br>
Host: {{ti.hostname}}<br>
Log file: {{ti.log_filepath}}<br>
Mark success: <a href="{{ti.mark_success_url}}">Link</a><br>

(alle dynamischen Felder stammen aus dem „Kontext“ -Wörterbuch)

Wie kann man erfolgreich sein

Nach ein paar weiteren lästigen Workflows könnte das Postfach etwas traurig und deprimierend aussehen. Was ist, wenn Informationen über erfolgreich abgeschlossene Aufgaben abgerufen werden müssen? Standardmäßig gibt es keinen Mechanismus für Erfolgs-E-Mails wie email_on_failure oder email_on_retry. Aber wir könnten on_success_callback und on_failure_callback verwenden

Wie kann Airflow Ihrem Unternehmen helfen? Besuchen Sie unsere Automatisierung der Datenpipeline Seite und finden Sie eine Lösung, die Ihren Bedürfnissen entspricht

Diese beiden Methoden lösen jede benutzerdefinierte Python-Funktion aus und übergeben das Kontextwörterbuch als Argument. Schauen wir uns das Beispiel an:

from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.email import send_email

def notify_email(kwargs):
    """Send custom email alerts."""
    ti = kwargs['ti']
    dag_run = kwargs['dag_run']
    var = kwargs['var']['json']
    params = kwargs['params']
    recipient_emails=['receiver@host.com', 'receiver2@host.com']
   

    logs_link = '{}/log?task_id={}&dag_id={}&execution_date={}'.format(conf.get("webserver", "base_url"), ti.task_id, ti.dag_id, ti.execution_date)
   

    title = ''
    body = ''

    # email title.
    if if dag_run._state == State.SUCCESS:
        title = f"Airflow alert: {dag_run.dag_id} succeed"
        body = """
            Hi Everyone, <br>
            <br>
            Job {dag_id} is a great success.<br>
            <br>
            Forever yours,<br>
            Airflow bot <br>
            """.format(dag_id=dag_run.dag_id)
    else:
        if ti.state == State.FAILED:
            title = f"Airflow alert: {dag_run.dag_id} failed on {ti.task_id}"
            body = """
            Hi Everyone, <br>
            <br>
            Task {task_id} failed.<br>
            Check what goes wrong {log_link}<br>
            <br>
            Forever yours,<br>
            Airflow bot <br>
            """.format(task_id=ti.task_id,log_link=log_link )
        else: raise AirflowException('{} task state is not supported in email notifications'.format(ti.state))

    send_email(recipient_emails, title, body)

def print_hello():
    return 'Hello world!'

dag = DAG('hello_world', description='Simple tutorial DAG',
          schedule_interval='0 12 * * *',
          on_success_callback=notify_email
          start_date=datetime(2017, 3, 20))


hello_operator = PythonOperator(task_id='hello_task',
                                on_failure_callback=notify_email,
                                python_callable=print_hello,
                                dag=dag)             

hello_operator

TODO-Beschreibung

Besuchen Sie unseren Blog für ausführlichere Artikel zu Airflow:

Leistungsstarke REST-API in Airflow 2.0 — was müssen Sie wissen?

Share this post
DevOps
Marcin Okuniewski
MORE POSTS BY THIS AUTHOR
Marcin Okuniewski

Curious how we can support your business?

TALK TO US