Jak poprawić wydajność Airflow 2.0 za pomocą Smart Sensors.

Mikolaj Klepacz
Mikolaj Klepacz
May 26, 2025
5 min read
Loading the Elevenlabs Text to Speech AudioNative Player...

Wprowadzenie do Smart Sensors

Wraz z nową wersją Airflow 2.0 do tego potężnego narzędzia od Apache dodano wiele nowych funkcji. Jedną z aktualizacji było wprowadzenie Smart Sensors, które mogą znacząco poprawić wydajność Airflow, jeśli używasz wielu sensors w swoich DAGs. W tym artykule porównamy zwykłe sensors i Smart sensors oraz pokażemy, jak skonfigurować swój pierwszy Smart Sensor.

Sensors w Airflow

Sensor to jeden z typów operatorów w Airflow, którego zadaniem jest zasadniczo czekanie na wystąpienie jakiegoś zdarzenia. Jeśli Twój proces jest event-driven, czyli workflow powinien się rozpocząć dopiero po zajściu określonego zdarzenia, powinieneś użyć Apache Airflow sensors. Typowe przypadki użycia sensors to: oczekiwanie na plik, opóźnianie wykonania DAG, sprawdzanie, czy pojawił się wpis w SQL, oczekiwanie na zakończenie innych DAGs.

Przykład użycia Sensors przedstawiono w poniższym DAGu:
Tutaj używamy FileSensor do utworzenia czterech tasks, które mają czekać na utworzenie określonych plików. Każdy task wait_for_file_{id} stale sprawdza, czy plik w określonej ścieżce został utworzony i kończy się sukcesem, gdy plik zostanie znaleziony. Domyślnie Airflow sensor sprawdza istnienie pliku co 60 sekund, ale można to skonfigurować za pomocą parametru poke_interval podczas tworzenia sensora. W przykładzie DAG, task process_files uruchamia się dopiero po znalezieniu wszystkich plików określonych w sensors.

Airflow 2.0 sensor - 1

Tutaj używamy FileSensor do tworzenia czterech zadań, które mają czekać na utworzenie określonych plików. Każde zadanie wait_for_file_ {id} stale sprawdza, czy plik w określonej ścieżce został utworzony i kończy się sukcesem, gdy znajduje plik. Domyślnie, Airflow dag sez airflow.models importuje DAG
z airflow.utils.dates import days_ago
z airflow.sensors.filesystem import FileSensor
z airflow.operators.python_operator import PythonOperator

rejestrowanie importu

default_args = {
 „start_date”: dni temu (1),
}

dir_path = „/usr/local/przepływ powietrza/dane testowe”
nazwa_plików = [
 „file1.txt „,
 „file2.txt „,
 „file3.txt „,
 „file4.txt „,
]

def plików_procesów_plików ():
 logging.info („Wykryto następujące pliki: „)

 dla nazwy w nazwach plików:
 logging.info (f "{dir_path}/{name}”)

 logging.info („Przetwarzaj pliki...”)

def _zapisz_dane ():
 logging.info („zapisz dane”)

z DAG („sensor_example”, default_args=default_args, catchUp = false) jako dag:

 czujniki = [
 Czujnik plików (
 task_id=f"czekaj_za_plik_ {identyfikator_sensor_}”,
 filepath=f "{dir_path}/{nazwa}”,
 fs_conn_id="fs_default”
 ) dla sensor_id, nazwa w wyliczeniu (nazwa_plików, start=1)
 ]

 proces = PythonOperator (
 task_id="pliki procesowe”,
 python_callable=_plików_procesów_plików
 )

 zapisz = PythonOperator (
 task_id="zapisz”,
 python_callable=Zapisz_dane
 )

 czujniki >> proces >> oszczędzanie
nsor sprawdza, czy plik istnieje co 60 sekund, ale można to skonfigurować za pomocą parametru poke_interval podczas tworzenia czujnika. W przykładzie DAG zadanie process_files jest uruchamiane dopiero po znalezieniu wszystkich plików określonych w czujnikach. Oto definicja wspomnianego przykładu dag: # dags/sensor.py



Problemy ze zwykłymi sensors

Jak wspomniano, sensor operator jest bardzo przydatny, jeśli chcemy poczekać na jakieś zdarzenie w workflow. Ale co, jeśli potrzebujemy dziesiątek lub setek sensors w naszej instancji Airflow? Okazuje się, że duża liczba zwykłych sensors może powodować następujące problemy:

  • Ryzyko deadlocka – sensors to zazwyczaj długotrwałe tasks, a liczba worker slots jest ograniczona. Możesz więc zająć wszystkie sloty sensorsami, przez co inne tasks nie będą mogły się uruchomić i wszystkie DAGs mogą się zablokować.
  • Niewykorzystanie zasobów – każdy zwykły sensor używa jednego procesu, co jest nieefektywne, bo rezerwujemy sloty dla tasks, które nie są obciążające obliczeniowo. W efekcie inne tasks mogą czekać na zaplanowanie, mimo że maszyna jest słabo wykorzystywana.

Aby rozwiązać te problemy, w Airflow 2.0 wprowadzono nową wersję sensor operatora: Smart Sensor.

Czym są Smart Sensors i kiedy ich używać?

Główną ideą Smart Sensor service jest odejście od przypisywania jednego procesu do każdego sensor taska, bo to nieefektywne. Zamiast tego, wykorzystywane są scentralizowane procesy, które wykonują te długotrwałe tasks w batchach.

Aby to osiągnąć, wykonanie taska dzieli się na dwa etapy:

  1. Rejestracja taska w Smart Sensor service i zapisanie informacji o tasku w Airflow Metastore DB. Po rejestracji task zwalnia worker slot.
  2. Wykorzystanie kilku scentralizowanych procesów do wykonania zserializowanych tasks. Te scentralizowane tasks są przechowywane w specjalnych wbudowanych DAGs (np. smart_sensor_group_shard_xx).

Smart sensors zostały zaprojektowane, by znacząco poprawić efektywność tych długotrwałych tasks, co może prowadzić do dużych oszczędności infrastrukturalnych. Powinieneś ich używać, jeśli Twoje DAGs zawierają dziesiątki lub setki sensors i powodują znaczne niewykorzystanie zasobów.

Airflow 2.0 sensor - 4

Rejestracja zadań w usłudze inteligentnych czujników, źródło: https://airflow.apache.org/docs/apache-airflow/stable/concepts/sensors.html

Konfigurowanie inteligentnych czujników w Airflow 2.0

Aby skonfigurować usługę inteligentnego czujnika w Airflow, musisz najpierw włączyć ją w konfiguracji. Oto przykładowa część pliku airflow.cfg, która umożliwia inteligentne czujniki.
[inteligentny czujnik]
use_smart_sensor = prawda
górny limit kod_shard_kod_= 10000

#Users może zmienić następującą konfigurację w zależności od ich wymagań
odłamek = 2

sensors_enabled = NazwaHivePartitionSensor, SmartFileSensorWyjaśnienie konfiguracji:

  • użyj czujnika inteligentnego: Wskazuje, czy usługa inteligentnego czujnika jest włączona
  • odłamki: Liczba jednoczesnych zadań inteligentnych czujników dla klastra przepływu powietrza.
  • czujniki_włączone: Lista nazw klas czujników używających inteligentnego czujnika.

Następnym krokiem konfiguracji jest uaktualnienie bazy danych Airflow w celu dodania instancja_czujnika stół, który jest potrzebny do pracy inteligentnych czujników. Aby to zrobić, wystarczy uruchomić następujące polecenie:airflow db UpgradeTeraz po ponownym uruchomieniu przepływu powietrza powinieneś zobaczyć 2 nowe DAG utworzone automatycznie:

Airflow 2.0 senseo - 6

Są to faktyczni pracownicy inteligentnych czujników, którzy odbierają i wykonują zadania czujników. Aby inteligentne czujniki działały, należy je włączyć ręcznie.Na koniec możemy skonfigurować nasz inteligentny czujnik. Jest to zmodyfikowana wersja zwykłego czujnika, który został przedstawiony wyżej Najpierw utwórz nowy plik: $AIRFLOW_HOME/Wtyczki/Smart_File_Sensor.py który zawiera definicję Czujnik SmartFileSensor. Jest to zwykły czujnik plików, ale z niezbędnymi dodatkami, które są potrzebne, aby działał jako inteligentny czujnik. Zawartość pliku przedstawiono poniżej: # plugins/smart_file_sensor.py

z airflow.sensors.filesystem import FileSensor
z airflow.utils.decorators importuj apply_defaults
od wpisania importu Dowolny

klasa SmartFileSensor (FileSensor):
poke_context_fields = ('ścieżka pliku', 'fs_conn_id')

@apply_defaults
def __init__ (self, **kwargs: Dowolny):
super (). __init__ (**kwargs)

def is_smart_sensor_compatible (self):
wynik = (
nie self.soft_fail
i super () .is_smart_sensor_compatible ()
)
zwróć wynikZwróć uwagę na pola kontekstowe_poke_ zmienna. Zawiera zestaw argumentów, których oczekuje się podczas tworzenia czujnika.Następnie w Plik sensor.py (przykład dag ze zwykłym czujnikiem z góry) dodaj następujący import:from smart_file_sensor import SmartFileSensor
i zmień FileSensor do SmartFileSensor: czujniki = [
SmartFileSensor (
task_id=f"czekaj_za_plik_ {identyfikator_sensor_}”,
filepath=f "{dir_path}/{nazwa}”,
fs_conn_id="fs_default”
) dla sensor_id, nazwa w wyliczeniu (nazwa_plików, start=1)
]

Teraz zdefiniowane zadania czujników powinny działać jako inteligentne czujniki.

Zauważ, że teraz, gdy DAG zostanie uruchomiony, stan zadań czujnika jest ustawiony jako detekcja  do czasu spełnienia warunków określonych w czujnikach, podczas gdy zadania smart_sensor_group_shard_xx  Dagi są w stanie bieżącym.

Airflow 2.0 sensor - 9

 Continuous delivery dla machine learning w fmcg

Architektura  stanowa vs. bezstanowa: co wybrać?

Nowoczesna  transformacja danych z dbt, re_data i Airflow.

Share this post
DevOps
Mikolaj Klepacz
MORE POSTS BY THIS AUTHOR
Mikolaj Klepacz

Curious how we can support your business?

TALK TO US