Einführung in intelligente Sensoren
Mit der neuen Airflow-Version 2.0 wurden diesem leistungsstarken Tool von Apache viele neue Funktionen hinzugefügt. Eines der Updates war die Einführung von Smart Sensors, die die Airflow-Leistung drastisch verbessern können, wenn Sie mehrere Sensoren in Ihren Dags verwenden. In diesem Artikel vergleichen wir normale und intelligente Sensoren und zeigen dir, wie du deinen ersten Smart Sensor konfigurierst.
Sensoren im Luftstrom
Der Sensor gehört zu den Airflow-Operatoren, deren Zweck es ist, im Grunde darauf zu warten, dass etwas passiert. Wenn Ihr Prozess ereignisgesteuert ist, was bedeutet, dass der Workflow nur gestartet werden muss, wenn ein Ereignis eintritt, sollten Sie Folgendes verwenden Apache Airflow Sensoren. Typische Anwendungsfälle von Sensoren: Warten auf eine Datei, Verzögerung der Dag-Ausführung, Überprüfung, ob ein SQL-Eintrag erscheint, Warten, bis andere Tage fertig sind. Ein Beispiel für die Verwendung von Sensoren wird am folgenden Tag vorgestellt:

Hier verwenden wir FileSensor, um vier Aufgaben zu erstellen, die auf die Erstellung bestimmter Dateien warten sollen. Jede Aufgabe wait_for_file_ {id} überprüft ständig, ob eine Datei im angegebenen Pfad erstellt wurde und ist erfolgreich, wenn sie die Datei findet. Standardmäßig importiert der Airflow-Dag aus airflow.models DAG
von airflow.utils.dates vor Tagen importieren
aus airflow.sensors.filesystem FileSensor importieren
aus airflow.operators.python_operator importiere PythonOperator
Protokollierung importieren
Standardargumente = {
„start_date“: vor Tagen (1),
}
dir_path = „/usr/local/airflow/test_data“
dateinamen = [
"file1.txt „,
"file2.txt „,
"file3.txt „,
"file4.txt „,
]
def _process_files ():
logging.info („Folgende Dateien wurden erkannt: „)
für den Namen in file_names:
logging.info (f "{Verzeichnispfad}/{Name}“)
logging.info („Dateien verarbeiten...“)
def _save_data ():
logging.info („Daten speichern“)
mit DAG („sensor_example“, default_args=default_args, catchup=False) als dag:
sensoren = [
Dateisensor (
task_id=f"warte_auf_Datei_ {Sensor_ID}“,
Dateipfad=f "{Verzeichnispfad}/{Name}“,
fs_conn_id="fs_default“
) für sensor_id, Name in enumerate (file_names, start=1)
]
Prozess = PythonOperator (
task_id="Prozessdateien“,
python_callable=_prozessdateien
)
speichern = PythonOperator (
task_id="speichern“,
python_callable=_daten speichern
)
Sensoren >> verarbeiten >> speichern
nsor überprüft alle 60 Sekunden, ob die Datei existiert. Dies kann jedoch beim Erstellen des Sensors mit dem Parameter poke_interval konfiguriert werden. In der Beispiel-DAG wird die process_files-Aufgabe erst ausgelöst, nachdem alle in den Sensoren angegebenen Dateien gefunden wurden. Hier ist die Definition des genannten DAG-Beispiels: # dags/sensor.py
Probleme mit normalen Sensoren
Wie bereits erwähnt, ist der Sensoroperator sehr nützlich, wenn wir warten möchten, bis ein Ereignis in unserem Arbeitsablauf eintritt. Aber was ist mit dem Fall, wenn wir Dutzende oder sogar Hunderte von Sensoren in unserer Airflow-Instanz benötigen? Es stellt sich heraus, dass eine große Anzahl regulärer Sensoren zu folgenden Problemen führen kann:
- Gefahr eines Deadlocks — Sensoren sind in der Regel Aufgaben mit langer Laufzeit, und es gibt eine begrenzte Anzahl von Worker-Slots, die gleichzeitig ausgeführt werden können. Deshalb kann es sein, dass du am Ende alle Slots für den Betrieb der Sensoren nutzt, sodass keine anderen Aufgaben mehr ausgelöst werden können, sodass all deine Mitarbeiter stecken bleiben können.
- Unzulängliche Nutzung der Ressourcen — mit den regulären Sensoren verwendet jede Aufgabe einen Prozess, was ineffizient ist, da wir die Slots für Aufgaben reservieren, die nicht rechenintensiv sind. Infolgedessen warten unsere anderen Aufgaben möglicherweise immer noch darauf, geplant zu werden, obwohl die Auslastung unserer Maschine sehr gering ist.
Um den genannten Problemen zu begegnen, wurde in Airflow 2.0 eine neue Version von Sensor Operator eingeführt: Smart Sensor.
Was sind intelligente Sensoren und wann sollten wir sie verwenden?
Die Hauptidee des Smart Sensor-Dienstes besteht darin, nicht mehr für jede Sensoraufgabe einen Prozess zuzuweisen, da dieser Ansatz nicht effizient ist. Stattdessen werden zentralisierte Prozesse verwendet, um diese lang andauernden Aufgaben stapelweise auszuführen. Um dies zu erreichen, wird die Ausführung der Aufgabe in zwei Schritte aufgeteilt:
- Registrierung der Aufgabe im Smart Sensor-Dienst und Speichern der Aufgabeninformationen in Airflow Metastore DB. Wenn die Registrierung erfolgreich ist, gibt die Aufgabe den Worker-Slot frei.
- Verwendung einiger zentralisierter Prozesse zur Ausführung der serialisierten Aufgaben. Diese zentralisierten Aufgaben werden in den speziellen integrierten Tags (mit dem Namen smart_sensor_group_shard_xx) gespeichert
Die intelligenten Sensoren wurden entwickelt, um die Effizienz dieser lang andauernden Aufgaben drastisch zu verbessern, was zu großen Einsparungen bei den Infrastrukturkosten führen kann. Sie sollten damit beginnen, sie zu verwenden, wenn Ihre Sensoren Dutzende oder Hunderte von Sensoren enthalten und die Ressourcen dadurch erheblich nicht ausgelastet sind.

Registrierung von Aufgaben im Smart Sensor Service, Quelle: https://airflow.apache.org/docs/apache-airflow/stable/concepts/sensors.html
Konfiguration intelligenter Sensoren in Airflow 2.0
Um den Smart Sensor-Dienst in Airflow zu konfigurieren, müssen Sie ihn zunächst in der Konfiguration aktivieren. Hier ist der Beispielteil der Datei airflow.cfg, der die intelligenten Sensoren aktiviert.
[Smart_Sensor]
use_smart_sensor = wahr
Oberste Grenze des Shard-Codes = 10000
#Users kann die folgende Konfiguration je nach ihren Anforderungen ändern
Scherbe = 2
sensors_enabled = NamedHivePartitionSensor, SmartFileSensorErklärung der Konfigurationen:
- verwenden_smarten_sensor: Zeigt an, ob der Smart Sensor-Dienst aktiviert ist
- Scherben: Anzahl der gleichzeitig laufenden Smart-Sensor-Jobs für den Airflow-Cluster.
- sensoren_aktiviert: Liste der Sensorklassennamen, die den Smartsensor verwenden.
Der nächste Schritt der Konfiguration besteht darin, die Airflow-Datenbank zu aktualisieren, um die hinzuzufügen sensor_instanz Tabelle, die benötigt wird, damit intelligente Sensoren funktionieren. Führen Sie dazu einfach den folgenden Befehl aus: airflow db upgradeJetzt sollten nach dem Neustart von Airflow 2 neue DAGs automatisch erstellt werden:

Das sind die eigentlichen intelligenten Sensorarbeiter, die die Sensoraufgaben aufnehmen und ausführen. Damit intelligente Sensoren funktionieren, müssen sie manuell eingeschaltet werden. Endlich können wir unseren intelligenten Sensor einrichten. Es ist eine modifizierte Version des regulären Sensors, der oben vorgestellt wurde. Erstellen Sie zuerst eine neue Datei: $airflow_home/plugins/smart_file_sensor.py was die Definition von enthält Intelligenter Dateisensor. Es handelt sich um einen normalen Dateisensor, der jedoch über die erforderlichen Ergänzungen verfügt, damit er als intelligenter Sensor funktioniert. Der Dateiinhalt ist unten dargestellt: # plugins/smart_file_sensor.py
aus airflow.sensors.filesystem FileSensor importieren
importiere apply_defaults von airflow.utils.decorators
von der Eingabe import Any
Klasse SmartFileSensor (FileSensor):
poke_context_fields = ('Dateipfad', 'fs_conn_id')
@apply_defaults
def __init__ (self, **kwargs: Beliebig):
super (). __init__ (**kwargs)
def is_smart_sensor_compatible (selbst):
Ergebnis = (
nicht self.soft_fail
und super () .is_smart_sensor_compatible ()
)
Ergebnis zurückgebenBeachten Sie das poke_context_fields variabel. Sie enthält den Satz von Argumenten, die beim Erstellen des Sensors erwartet werden. Dann in der sensor.py (Beispiel dag mit normalem Sensor von oben) füge den folgenden Import hinzu: from smart_file_sensor import SmartFileSensor
und ändere die Dateisensor zu Intelligenter Dateisensor: sensoren = [
Intelligenter Dateisensor (
task_id=f"warte_auf_Datei_ {Sensor_ID}“,
Dateipfad=f "{Verzeichnispfad}/{Name}“,
fs_conn_id="fs_default“
) für sensor_id, Name in enumerate (file_names, start=1)
]
Jetzt sollten die definierten Sensoraufgaben als intelligente Sensoren funktionieren.
Beachten Sie, dass jetzt, wenn die DAG ausgelöst wird, der Status der Sensoraufgaben wie folgt festgelegt ist Erfassen bis die in den Sensoren definierten Bedingungen erfüllt sind, wohingegen die Aufgaben in smart_sensor_group_shard_xx dags sind im laufenden Zustand.

Weitere Informationen zu Airflow finden Sie in unserem Blog:
- Benutzerdefinierte Airflow-E-Mails
- Unterschiede zwischen Airflow 1.10.x und 2.0
- Leistungsstarke REST-API in Airflow 2.0 — was müssen Sie wissen?