Einführung in Smart Sensors
Mit der neuen Airflow Version 2.0 wurden viele neue Features zu diesem leistungsstarken Apache-Tool hinzugefügt. Eine der Neuerungen ist die Einführung von Smart Sensors, die die Airflow-Performance erheblich verbessern können, wenn du viele sensors in deinen DAGs verwendest. In diesem Artikel vergleichen wir reguläre und Smart sensors und zeigen, wie du deinen ersten Smart Sensor konfigurierst.
Sensors in Airflow
Ein Sensor ist ein Typ von Airflow operator, dessen Zweck es ist, auf ein bestimmtes Ereignis zu warten. Wenn dein Prozess event-driven ist, das heißt, der Workflow soll erst starten, wenn ein bestimmtes Ereignis eintritt, solltest du Apache Airflow sensors verwenden. Typische Anwendungsfälle für sensors: Warten auf eine Datei, Verzögern der DAG-Ausführung, Überprüfen, ob ein SQL-Eintrag vorhanden ist, Warten auf das Ende anderer DAGs.
Beispiel für die Verwendung von Sensors im folgenden DAG:
Hier wird FileSensor verwendet, um vier tasks zu erstellen, die auf die Erstellung bestimmter Dateien warten. Jeder Task wait_for_file_{id} prüft ständig, ob eine Datei im angegebenen Pfad erstellt wurde, und ist erfolgreich, wenn die Datei gefunden wird. Standardmäßig prüft der Airflow sensor alle 60 Sekunden, ob die Datei existiert, dies kann aber mit dem poke_interval Parameter angepasst werden. Im Beispiel-DAG wird der process_files Task erst ausgelöst, wenn alle in den sensors angegebenen Dateien gefunden wurden.

Hier verwenden wir FileSensor, um vier Aufgaben zu erstellen, die auf die Erstellung bestimmter Dateien warten sollen. Jede Aufgabe wait_for_file_ {id} überprüft ständig, ob eine Datei im angegebenen Pfad erstellt wurde und ist erfolgreich, wenn sie die Datei findet. Standardmäßig importiert der Airflow-Dag aus airflow.models DAG
von airflow.utils.dates vor Tagen importieren
aus airflow.sensors.filesystem FileSensor importieren
aus airflow.operators.python_operator importiere PythonOperator
Protokollierung importieren
Standardargumente = {
„start_date“: vor Tagen (1),
}
dir_path = „/usr/local/airflow/test_data“
dateinamen = [
"file1.txt „,
"file2.txt „,
"file3.txt „,
"file4.txt „,
]
def _process_files ():
logging.info („Folgende Dateien wurden erkannt: „)
für den Namen in file_names:
logging.info (f "{Verzeichnispfad}/{Name}“)
logging.info („Dateien verarbeiten...“)
def _save_data ():
logging.info („Daten speichern“)
mit DAG („sensor_example“, default_args=default_args, catchup=False) als dag:
sensoren = [
Dateisensor (
task_id=f"warte_auf_Datei_ {Sensor_ID}“,
Dateipfad=f "{Verzeichnispfad}/{Name}“,
fs_conn_id="fs_default“
) für sensor_id, Name in enumerate (file_names, start=1)
]
Prozess = PythonOperator (
task_id="Prozessdateien“,
python_callable=_prozessdateien
)
speichern = PythonOperator (
task_id="speichern“,
python_callable=_daten speichern
)
Sensoren >> verarbeiten >> speichern
nsor überprüft alle 60 Sekunden, ob die Datei existiert. Dies kann jedoch beim Erstellen des Sensors mit dem Parameter poke_interval konfiguriert werden. In der Beispiel-DAG wird die process_files-Aufgabe erst ausgelöst, nachdem alle in den Sensoren angegebenen Dateien gefunden wurden. Hier ist die Definition des genannten DAG-Beispiels: # dags/sensor.py
Probleme mit regulären sensors
Wie bereits erwähnt, ist der sensor operator sehr nützlich, wenn man auf ein Ereignis im Workflow warten möchte. Aber was, wenn man Dutzende oder Hunderte von sensors in seiner Airflow-Instanz benötigt? Viele reguläre sensors können folgende Probleme verursachen:
- Deadlock-Risiko – sensors sind meist langlaufende tasks und es gibt nur eine begrenzte Anzahl von worker slots. Es kann passieren, dass alle Slots von sensors belegt sind und keine anderen tasks mehr ausgeführt werden können, sodass alle DAGs blockiert werden.
- Ressourcenunterauslastung – jeder reguläre sensor verwendet einen eigenen Prozess, was ineffizient ist, da Slots für wenig rechenintensive tasks reserviert werden. Andere tasks warten auf die Planung, obwohl die Maschine kaum ausgelastet ist.
Um diese Probleme zu lösen, wurde in Airflow 2.0 eine neue Version des sensor operators eingeführt: Smart Sensor.
Was sind Smart Sensors und wann sollte man sie verwenden?
Die Hauptidee des Smart Sensor service ist, nicht mehr für jeden sensor task einen eigenen Prozess zu verwenden, da dies ineffizient ist. Stattdessen werden zentrale Prozesse genutzt, die diese langlaufenden tasks in Batches ausführen.
Dazu wird die Ausführung des tasks in zwei Schritte unterteilt:
- Registrierung des tasks im Smart Sensor service und Speichern der task-Informationen in der Airflow Metastore DB. Nach erfolgreicher Registrierung gibt der task den worker slot frei.
- Verwendung einiger zentraler Prozesse zur Ausführung der serialisierten tasks. Diese zentralen tasks werden in speziellen eingebauten DAGs (z. B. smart_sensor_group_shard_xx) gespeichert.
Smart sensors wurden entwickelt, um die Effizienz dieser langlaufenden tasks drastisch zu verbessern, was zu erheblichen Einsparungen bei den Infrastrukturkosten führen kann. Du solltest sie verwenden, wenn deine DAGs Dutzende oder Hunderte von sensors enthalten und erhebliche Ressourcenunterauslastung verursachen.

Registrierung von Aufgaben im Smart Sensor Service, Quelle: https://airflow.apache.org/docs/apache-airflow/stable/concepts/sensors.html
Konfiguration intelligenter Sensoren in Airflow 2.0
Um den Smart Sensor-Dienst in Airflow zu konfigurieren, müssen Sie ihn zunächst in der Konfiguration aktivieren. Hier ist der Beispielteil der Datei airflow.cfg, der die intelligenten Sensoren aktiviert.
[Smart_Sensor]
use_smart_sensor = wahr
Oberste Grenze des Shard-Codes = 10000
#Users kann die folgende Konfiguration je nach ihren Anforderungen ändern
Scherbe = 2
sensors_enabled = NamedHivePartitionSensor, SmartFileSensorErklärung der Konfigurationen:
- verwenden_smarten_sensor: Zeigt an, ob der Smart Sensor-Dienst aktiviert ist
- Scherben: Anzahl der gleichzeitig laufenden Smart-Sensor-Jobs für den Airflow-Cluster.
- sensoren_aktiviert: Liste der Sensorklassennamen, die den Smartsensor verwenden.
Der nächste Schritt der Konfiguration besteht darin, die Airflow-Datenbank zu aktualisieren, um die hinzuzufügen sensor_instanz Tabelle, die benötigt wird, damit intelligente Sensoren funktionieren. Führen Sie dazu einfach den folgenden Befehl aus: airflow db upgradeJetzt sollten nach dem Neustart von Airflow 2 neue DAGs automatisch erstellt werden:

Das sind die eigentlichen intelligenten Sensorarbeiter, die die Sensoraufgaben aufnehmen und ausführen. Damit intelligente Sensoren funktionieren, müssen sie manuell eingeschaltet werden. Endlich können wir unseren intelligenten Sensor einrichten. Es ist eine modifizierte Version des regulären Sensors, der oben vorgestellt wurde. Erstellen Sie zuerst eine neue Datei: $airflow_home/plugins/smart_file_sensor.py was die Definition von enthält Intelligenter Dateisensor. Es handelt sich um einen normalen Dateisensor, der jedoch über die erforderlichen Ergänzungen verfügt, damit er als intelligenter Sensor funktioniert. Der Dateiinhalt ist unten dargestellt: # plugins/smart_file_sensor.py
aus airflow.sensors.filesystem FileSensor importieren
importiere apply_defaults von airflow.utils.decorators
von der Eingabe import Any
Klasse SmartFileSensor (FileSensor):
poke_context_fields = ('Dateipfad', 'fs_conn_id')
@apply_defaults
def __init__ (self, **kwargs: Beliebig):
super (). __init__ (**kwargs)
def is_smart_sensor_compatible (selbst):
Ergebnis = (
nicht self.soft_fail
und super () .is_smart_sensor_compatible ()
)
Ergebnis zurückgebenBeachten Sie das poke_context_fields variabel. Sie enthält den Satz von Argumenten, die beim Erstellen des Sensors erwartet werden. Dann in der sensor.py (Beispiel dag mit normalem Sensor von oben) füge den folgenden Import hinzu: from smart_file_sensor import SmartFileSensor
und ändere die Dateisensor zu Intelligenter Dateisensor: sensoren = [
Intelligenter Dateisensor (
task_id=f"warte_auf_Datei_ {Sensor_ID}“,
Dateipfad=f "{Verzeichnispfad}/{Name}“,
fs_conn_id="fs_default“
) für sensor_id, Name in enumerate (file_names, start=1)
]
Jetzt sollten die definierten Sensoraufgaben als intelligente Sensoren funktionieren.
Beachten Sie, dass jetzt, wenn die DAG ausgelöst wird, der Status der Sensoraufgaben wie folgt festgelegt ist Erfassen bis die in den Sensoren definierten Bedingungen erfüllt sind, wohingegen die Aufgaben in smart_sensor_group_shard_xx dags sind im laufenden Zustand.

Moderne Datentransformation mit dbt, re_data und Airflow.