Jak poprawić wydajność przepływu powietrza 2.0 dzięki inteligentnym czujnikom?

Mikolaj Klepacz
Mikolaj Klepacz
May 8, 2025
5 min read
Loading the Elevenlabs Text to Speech AudioNative Player...

Wprowadzenie do inteligentnych czujników

Wraz z nową wersją Airflow 2.0 do tego potężnego narzędzia Apache dodano wiele nowych funkcji. Jedną z aktualizacji było wprowadzenie inteligentnych czujników, które mogą drastycznie poprawić wydajność przepływu powietrza, jeśli używasz wielu czujników w swoim Dag. W tym artykule porównamy zwykłe i inteligentne czujniki i pokażemy, jak skonfigurować pierwszy inteligentny czujnik.

Czujniki w przepływie powietrza

Czujnik jest jednym z typów operatorów przepływu powietrza, którego celem jest zasadniczo czekanie, aż coś się wydarzy. Jeśli proces jest oparty na zdarzeniach, co oznacza, że przepływ pracy musi zostać uruchomiony tylko wtedy, gdy wystąpi jakieś zdarzenie, należy użyć Przepływ powietrza Apache czujniki. Typowe przypadki użycia czujników: czekanie na plik, opóźnianie wykonania dag, sprawdzanie, czy pojawił się wpis SQL, czekanie na zakończenie innych dagów.Przykładowe użycie Czujników przedstawiono w dag poniżej:

Airflow 2.0 sensor - 1

Tutaj używamy FileSensor do tworzenia czterech zadań, które mają czekać na utworzenie określonych plików. Każde zadanie wait_for_file_ {id} stale sprawdza, czy plik w określonej ścieżce został utworzony i kończy się sukcesem, gdy znajduje plik. Domyślnie, Airflow dag sez airflow.models importuje DAG
z airflow.utils.dates import days_ago
z airflow.sensors.filesystem import FileSensor
z airflow.operators.python_operator import PythonOperator

rejestrowanie importu

default_args = {
„start_date”: dni temu (1),
}

dir_path = „/usr/local/przepływ powietrza/dane testowe”
nazwa_plików = [
„file1.txt „,
„file2.txt „,
„file3.txt „,
„file4.txt „,
]

def plików_procesów_plików ():
logging.info („Wykryto następujące pliki: „)

dla nazwy w nazwach plików:
logging.info (f "{dir_path}/{name}”)

logging.info („Przetwarzaj pliki...”)

def _zapisz_dane ():
logging.info („zapisz dane”)

z DAG („sensor_example”, default_args=default_args, catchUp = false) jako dag:

czujniki = [
Czujnik plików (
task_id=f"czekaj_za_plik_ {identyfikator_sensor_}”,
filepath=f "{dir_path}/{nazwa}”,
fs_conn_id="fs_default”
) dla sensor_id, nazwa w wyliczeniu (nazwa_plików, start=1)
]

proces = PythonOperator (
task_id="pliki procesowe”,
python_callable=_plików_procesów_plików
)

zapisz = PythonOperator (
task_id="zapisz”,
python_callable=Zapisz_dane
)

czujniki >> proces >> oszczędzanie
nsor sprawdza, czy plik istnieje co 60 sekund, ale można to skonfigurować za pomocą parametru poke_interval podczas tworzenia czujnika. W przykładzie DAG zadanie process_files jest uruchamiane dopiero po znalezieniu wszystkich plików określonych w czujnikach. Oto definicja wspomnianego przykładu dag: # dags/sensor.py



Problemy ze zwykłymi czujnikami

Jak wspomniano wcześniej, operator czujnika jest bardzo przydatny, jeśli chcemy poczekać, aż wydarzy się jakieś zdarzenie w naszym przepływie pracy. Ale co z przypadkiem, gdy potrzebujemy dziesiątek, a nawet setek czujników w naszym przypadku przepływu powietrza? Okazuje się, że posiadanie dużej liczby zwykłych czujników może powodować następujące problemy:

  • Ryzyko impasu - czujniki są zwykle zadaniami długotrwałymi i istnieje ograniczona liczba miejsc pracy, które można uruchomić jednocześnie. Dlatego możesz użyć wszystkich gniazd do uruchamiania czujników, dlatego żadne inne zadania nie mogą być już uruchamiane, więc wszystkie twoje dagy mogą utknąć.
  • Niedostateczne wykorzystanie zasobów - przy użyciu zwykłych czujników każde zadanie wykorzystuje jeden proces, który jest nieefektywny, ponieważ rezerwujemy miejsca na zadania, które nie są obciążone obliczeniowo. W rezultacie inne nasze zadania mogą nadal czekać na zaplanowanie, mimo że wykorzystanie naszej maszyny jest bardzo niskie.

Aby stawić czoła wspomnianym problemom, wprowadzono nową wersję operatora czujnika w Airflow 2.0: Smart Sensor.

Czym są inteligentne czujniki i kiedy powinniśmy z nich korzystać?

Główną ideą usługi Smart Sensor jest zaprzestanie przypisywania jednego procesu do każdego zadania czujnika, ponieważ takie podejście nie jest skuteczne. Zamiast tego scentralizowane procesy są wykorzystywane do wykonywania tych długotrwałych zadań partiami. Aby to osiągnąć, wykonanie zadania jest podzielone na dwa etapy:

  1. Rejestracja zadania w usłudze Smart Sensor i przechowywanie informacji o zadaniu w Airflow Metastore DB. Gdy rejestracja się powiedzie, zadanie zwalnia miejsce pracownika.
  2. Wykorzystanie kilku scentralizowanych procesów do wykonywania zadań serializowanych. Te scentralizowane zadania są przechowywane w specjalnych wbudowanych numerach (o nazwie smart_sensor_group_shard_xx)

Inteligentne czujniki zostały zaprojektowane w celu drastycznej poprawy wydajności tych długotrwałych zadań, co może prowadzić do dużych oszczędności kosztów infrastruktury. Powinieneś zacząć z nich korzystać, jeśli Twoje DAG zawierają dziesiątki lub setki czujników i powodują znaczne niedostateczne wykorzystanie zasobów.

Airflow 2.0 sensor - 4

Rejestracja zadań w usłudze inteligentnych czujników, źródło: https://airflow.apache.org/docs/apache-airflow/stable/concepts/sensors.html

Konfigurowanie inteligentnych czujników w Airflow 2.0

Aby skonfigurować usługę inteligentnego czujnika w Airflow, musisz najpierw włączyć ją w konfiguracji. Oto przykładowa część pliku airflow.cfg, która umożliwia inteligentne czujniki.
[inteligentny czujnik]
use_smart_sensor = prawda
górny limit kod_shard_kod_= 10000

#Users może zmienić następującą konfigurację w zależności od ich wymagań
odłamek = 2

sensors_enabled = NazwaHivePartitionSensor, SmartFileSensorWyjaśnienie konfiguracji:

  • użyj czujnika inteligentnego: Wskazuje, czy usługa inteligentnego czujnika jest włączona
  • odłamki: Liczba jednoczesnych zadań inteligentnych czujników dla klastra przepływu powietrza.
  • czujniki_włączone: Lista nazw klas czujników używających inteligentnego czujnika.

Następnym krokiem konfiguracji jest uaktualnienie bazy danych Airflow w celu dodania instancja_czujnika stół, który jest potrzebny do pracy inteligentnych czujników. Aby to zrobić, wystarczy uruchomić następujące polecenie:airflow db UpgradeTeraz po ponownym uruchomieniu przepływu powietrza powinieneś zobaczyć 2 nowe DAG utworzone automatycznie:

Airflow 2.0 senseo - 6

Są to faktyczni pracownicy inteligentnych czujników, którzy odbierają i wykonują zadania czujników. Aby inteligentne czujniki działały, należy je włączyć ręcznie.Na koniec możemy skonfigurować nasz inteligentny czujnik. Jest to zmodyfikowana wersja zwykłego czujnika, który został przedstawiony wyżej Najpierw utwórz nowy plik: $AIRFLOW_HOME/Wtyczki/Smart_File_Sensor.py który zawiera definicję Czujnik SmartFileSensor. Jest to zwykły czujnik plików, ale z niezbędnymi dodatkami, które są potrzebne, aby działał jako inteligentny czujnik. Zawartość pliku przedstawiono poniżej: # plugins/smart_file_sensor.py

z airflow.sensors.filesystem import FileSensor
z airflow.utils.decorators importuj apply_defaults
od wpisania importu Dowolny

klasa SmartFileSensor (FileSensor):
poke_context_fields = ('ścieżka pliku', 'fs_conn_id')

@apply_defaults
def __init__ (self, **kwargs: Dowolny):
super (). __init__ (**kwargs)

def is_smart_sensor_compatible (self):
wynik = (
nie self.soft_fail
i super () .is_smart_sensor_compatible ()
)
zwróć wynikZwróć uwagę na pola kontekstowe_poke_ zmienna. Zawiera zestaw argumentów, których oczekuje się podczas tworzenia czujnika.Następnie w Plik sensor.py (przykład dag ze zwykłym czujnikiem z góry) dodaj następujący import:from smart_file_sensor import SmartFileSensor
i zmień FileSensor do SmartFileSensor: czujniki = [
SmartFileSensor (
task_id=f"czekaj_za_plik_ {identyfikator_sensor_}”,
filepath=f "{dir_path}/{nazwa}”,
fs_conn_id="fs_default”
) dla sensor_id, nazwa w wyliczeniu (nazwa_plików, start=1)
]

Teraz zdefiniowane zadania czujników powinny działać jako inteligentne czujniki.

Zauważ, że teraz, gdy DAG zostanie uruchomiony, stan zadań czujnika jest ustawiony jako detekcja do czasu spełnienia warunków określonych w czujnikach, podczas gdy zadania smart_sensor_group_shard_xx Dagi są w stanie bieżącym.

Airflow 2.0 sensor - 9

Sprawdź nasz blog, aby uzyskać więcej informacji na temat przepływu powietrza:

Share this post
DevOps
Mikolaj Klepacz
MORE POSTS BY THIS AUTHOR
Mikolaj Klepacz

Curious how we can support your business?

TALK TO US