Niestandardowe wiadomości e-mail Airflow

Marcin Okuniewski
Marcin Okuniewski
May 22, 2025
9 min read
Loading the Elevenlabs Text to Speech AudioNative Player...

Od gotowych rozwiązań do pełnej customizacji

W projekcie, w którym częściowo odpowiadam za integrację Airflow z innymi komponentami, przydzielono mi na pierwszy rzut oka proste zadanie: "Wysyłanie customowych maili ze statusem tasków". Wydaje się to dość łatwe, ponieważ Airflow ma już wbudowany mechanizm do tego typu zadań:

Gotowe rozwiązanie (Out of the box)

Skonfiguruj serwer smtp w sekcji e-mail pliku konfiguracyjnego Airflow (domyślnie airflow.cfg):

[email]
email_backend = airflow.utils.email.send_email_smtp

[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = <your smtp host for ex.smtp.gmail.com>
smtp_starttls = False
smtp_ssl = False
# Uncomment and set the user/pass settings if you want to use SMTP AUTH
# smtp_user = airflow
# smtp_password = airflow
smtp_port = 25
smtp_mail_from = <ex. sender@fromdomain.com>

Jeśli chcesz uzyskać tylko ogólne informacje e-mail o błędach lub próbach związanych z przepływem pracy, po prostu dodaj do definicji dnia email_on_failure lub email_on_retry. W przypadku awarii 'hello_task' strumień powietrza wyśle wiadomość e-mail z odpowiednim przykładem.

from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator

def print_hello():
    return 'Hello world!'

dag = DAG('hello_world', description='Simple tutorial DAG',
          schedule_interval='0 12 * * *',
          start_date=datetime(2017, 3, 20))

dummy_operator = DummyOperator(task_id='dummy_task',
                              retries=3,
                              email=<'receiver@host.com' or list(adreses)>
                              email_on_retrie=True,
                              dag=dag)

hello_operator = PythonOperator(task_id='hello_task',
                                email=<'receiver@host.com' or list(adreses)>
                                email_on_failure=True,
                                python_callable=print_hello,
                                dag=dag)             

dummy_operator >> hello_operator

from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
args ={
'owner': 'Airflow',
'description': 'Simple tutorial DAG',
'start_date': datetime(2017, 3, 20)
'email': ['receiver@host.com', 'receiver2@host.com']
'email_on_failure':True
}

def print_hello():
    return 'Hello world!'

Ogólne komunikaty mogą być zmodyfikowane, wracamy do pliku konfiguracyjnego przepływu powietrza i dodajemy:

w sekcji e-mail:

[email]
email_backend = airflow.utils.email.send_email_smtp

subject_template = /path/to/my_subject_template_file
html_content_template = /path/to/my_html_content_template_file

Gdzie można wyświetlić przykładowy szablon (w przepływie powietrza jinja jest domyślnym silnikiem szablonów):

Try {{try_number}} out of {{max_tries + 1}}<br>
Exception:<br>{{exception_html}}<br>
Log: <a href="{{ti.log_url}}">Link</a><br>
Host: {{ti.hostname}}<br>
Log file: {{ti.log_filepath}}<br>
Mark success: <a href="{{ti.mark_success_url}}">Link</a><br>

(wszystkie pola dynamiczne pochodzą ze słownika kontekstowego)

Jak odnieść sukces

Po to, by the more of the flow of the postal box of the postal box can be found in the most small and. Co jeśli jest potrzebna informacja o pomyślnie zakończonych zadaniach? Nie ma mechanizmu odnoszącego się do sukcesu wiadomości e-mail, takich jak email_on_failure lub email_on_retry. Ale możesz użyć on_success_callback i on_failure_callback

W jaki sposób Airflow może pomóc Twojemu firmie? Odwiedź nasze Automatyzacja rurociągu danych page and find

Te dwie metody wyzwalają dowolną niestandardową wersję Pythona i dostarczają słownika kontekstowego jako argument. Spójrzmy na przykład:

from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.email import send_email

def notify_email(kwargs):
    """Send custom email alerts."""
    ti = kwargs['ti']
    dag_run = kwargs['dag_run']
    var = kwargs['var']['json']
    params = kwargs['params']
    recipient_emails=['receiver@host.com', 'receiver2@host.com']
   

    logs_link = '{}/log?task_id={}&dag_id={}&execution_date={}'.format(conf.get("webserver", "base_url"), ti.task_id, ti.dag_id, ti.execution_date)
   

    title = ''
    body = ''

    # email title.
    if if dag_run._state == State.SUCCESS:
        title = f"Airflow alert: {dag_run.dag_id} succeed"
        body = """
            Hi Everyone, <br>
            <br>
            Job {dag_id} is a great success.<br>
            <br>
            Forever yours,<br>
            Airflow bot <br>
            """.format(dag_id=dag_run.dag_id)
    else:
        if ti.state == State.FAILED:
            title = f"Airflow alert: {dag_run.dag_id} failed on {ti.task_id}"
            body = """
            Hi Everyone, <br>
            <br>
            Task {task_id} failed.<br>
            Check what goes wrong {log_link}<br>
            <br>
            Forever yours,<br>
            Airflow bot <br>
            """.format(task_id=ti.task_id,log_link=log_link )
        else: raise AirflowException('{} task state is not supported in email notifications'.format(ti.state))

    send_email(recipient_emails, title, body)

def print_hello():
    return 'Hello world!'

dag = DAG('hello_world', description='Simple tutorial DAG',
          schedule_interval='0 12 * * *',
          on_success_callback=notify_email
          start_date=datetime(2017, 3, 20))


hello_operator = PythonOperator(task_id='hello_task',
                                on_failure_callback=notify_email,
                                python_callable=print_hello,
                                dag=dag)             

hello_operator

TODO - opis

Odwiedź nasz blog, aby znaleźć więcej artykułów na temat przepływu powietrza:

Interfejs API REST w Airflow 2.0 — co jest możliwe?

Share this post
DevOps
Marcin Okuniewski
MORE POSTS BY THIS AUTHOR
Marcin Okuniewski

Curious how we can support your business?

TALK TO US