Databricks-Tests mit GitHub-Aktionen
An diejenigen, die es inspiriert haben und es niemals lesen werden. Und für Marcin!
Einführung
Databricks Connect (mehr Infos) hier) bietet eine gute Möglichkeit, mit den Azure Databricks-Clustern auf Ihrem lokalen Computer zu interagieren (entweder mithilfe der IDE oder einer benutzerdefinierten Anwendung). Die Einrichtung ist einfach: Sie geben die URL und das persönliche Token ein, die aus dem Databricks Workspace generiert wurden, und Sie können beginnen, mit dem DBFS, den Clustern usw. zu arbeiten. Dies funktioniert einwandfrei, bis Sie Ihren Anwendungscode lokal ausführen möchten, z. B. wenn Sie dbutils verwenden müssen. Dazu müssen Sie ein weiteres persönliches Token mit erhöhten Berechtigungen generieren (das ist nur in einem Databricks-Notebook möglich), das nur 48 Stunden gültig ist. Für einige grundlegende Prüfungen ist dies mehr als ausreichend, aber wenn Sie es in einer stärker automatisierten Lösung verwenden möchten, müssen Sie dieses Token alle zwei Tage neu generieren, damit es funktioniert. Da ich mich darauf gefreut hatte, einige Integrationstests für meine Databricks-Anwendung durchzuführen, musste ich eine robustere Lösung finden, da sie jedes Mal, wenn ein Push erkannt wurde, für meinen Entwicklungszweig ausgeführt wurde.
Benötigen Sie Hilfe mit Databricks? Schauen Sie sich unsere an datentechnische Dienstleistungen und sehen Sie, wie wir Ihnen helfen können.
Die Idee des Integrationstests für meine App
Da meine Anwendung über mehr als 50 Datenfeeds verfügte, die jeweils in einer separaten Python-Klasse organisiert waren, die die gesamte Verarbeitung abdeckte, vom Lesen aus der Quelle über Transformationen bis hin zum Speichern bis hin zur letzten Ebene in den BLOB-Speicher und die Azure Synapse-Tabelle. Da es einige Geschäftsanwender (eigentlich Power-User) gab, die die DEV Databricks-Umgebung nutzten, wollte ich eine Möglichkeit haben, sicherzustellen, dass jede Änderung, die aus einem Feature-Branch mit dem Develop-Branch zusammengeführt wurde, ordnungsgemäß funktioniert, bevor sie tatsächlich in den DEV Databricks installiert wurde.
Ursprünglich veröffentlichte mein GitHub-Workflow den Entwicklungszweig jedes Mal, wenn ein Push in ein Anwendungsverzeichnis erkannt wurde. Die Idee war, den Entwicklungszweig zu testen, bevor er im Anwendungsverzeichnis veröffentlicht wurde, und auf der Grundlage des Testergebnisses nur dann, wenn er erfolgreich war, in das Zielverzeichnis zu veröffentlichen.
Der neue Arbeitsablauf sollte wie folgt aussehen:
- Voraussetzungen installieren
- Im Test-Verzeichnis von Databricks bereitstellen
- Testen Sie den Dummy-Load mithilfe des TEST-Verzeichnisses
- Im Databricks-Zielverzeichnis bereitstellen
Das einzige Problem war, dass ich dieses Token für erhöhte Berechtigungen immer noch benötigte, um meine Klassen lokal auf dem bereitgestellten Computer ausführen zu können, der den Workflow ausführte.
Lösung - Databricks Jobs API
Die Databricks Job API bietet die Möglichkeit, Jobs im Databricks Workspace auszuführen. Es kann sich um einen benutzerdefinierten Job handeln oder um einen Link zu einem vorhandenen Notizbuch. Sobald wir in der Lage sind, ein Notebook einzurichten, das alle Operationen ausführt, die wir innerhalb der Databricks benötigen, benötigen wir das Token für erhöhte Berechtigungen nicht mehr und können eine automatisierte Methode zum Testen der Feeds einrichten.
Betrachten wir diese einfache Feed-Klasse unten. Ich habe sie nur für diesen Artikel erstellt, sodass ihr viele Funktionen fehlen und im Grunde genommen alles, was sie tut, ist, aus der Quelle zu lesen und sie im BLOB-Speicher zu speichern (beide Speicherorte sind im DBFS gemountet):
aus pyspark.sql importiere SparkSession
Klasse testDataFeed (Objekt):
„"“ Dummy-Klasse für Integrationstests „"“
def __init__ (selbst, dbutils):
selbst. _df = Keine
selbst. _ctx = sparksession.builder.getOrCreate ()
self.dbutils = dbutils
self.source_dir = '/mnt/test_in/integration_testing/srce_dir'
self.destination_dir = '/mnt/test_out/integration_testing/dest_dir'
auf jeden Fall gelesen (selbst, Pfad=None):
path = path oder self.source_dir
Funke = Selbst. _ctx
opts = dict (format='parkett', compression='schnappy')
selbst. _df = spark.read.load (Pfad, **Optionen)
selbst zurückgeben
def transform (selbst):
Pass
def save2blob (selbst, Pfad=Keiner):
path = path oder self.destination_dir
opts = dict (modus='überschreiben', format='parkett', komprimierung='schnappy')
selbst. _df.write.save (Pfad, **Optionen)
auf jeden Fall ausführen (selbst):
selbstgelesen ()
selbst.transformieren ()
selbst.save2blob ()
Ein einfacher Test für diese Klasse würde nur aus dem Quellverzeichnis lesen und die Anzahl der abgerufenen Datensätze zählen. Die Methode wird wie folgt aussehen:
def test_TestDataFeed ():
o = TestDataFeed (dbutils)
o.lesen ()
o.transform ()
y = o._df.count ()
assert y>0, „TestDataFeed dummy pull“
Im Allgemeinen können Tests gründlicher sein und die Ergebnisse der Transformationen überprüfen und überprüfen, ob das Speichern in das Zielverzeichnis und/oder die ASDW-Tabelle erfolgreich war, aber in diesem Fall werden wir den Test nur mit dieser Überprüfung abschließen.
Normalerweise würde die Testmethode im Modul im Testverzeichnis neben dem getesteten Modul platziert:
• module_tested.py
— Klasse TestDataFeed
• Tests/
— test_module_tested.py
• test_TestDataFeed ()
Aber um dbutils verwenden zu können, müssen wir es ein wenig modifizieren und wir müssen es in ein Databricks-Notizbuch packen, damit es dbutils verwenden kann.
Anstatt für jede Testfunktion ein Notizbuch zu erstellen, ist das Notizbuch generisch und kann den als Parameter übergebenen Klassennamen lesen. Basierend auf dem Klassennamen importiert es ihn dynamisch und führt die Testfunktion aus. Es kann wie das folgende Beispiel aussehen:
sys importieren
sys.path.insert (0, '/dbfs/dein_app_verzeichnis_test')
Datum/Uhrzeit importieren
importlib importieren
cls_name = getArgument („Klassenname“)
pkg_name = getArgument („pkg_name“)
versuche:
wenn nicht cls_name oder nicht pkg_name:
raise ImportError ('Klassenname oder Paketname dürfen nicht leer sein! ')
d=datetime.datetime.now ()
ex = Keine
Klasse = Keine
o = Keine
Modul = importlib.import_module (pkg_name)
class = getattr (Modul, Klassenname)
o = Klasse ()
o.lesen ()
o.transform ()
y = o._df.count ()
assert y>0, cls_name + "Dummy-Pull“
außer Ausnahme wie e:
ex = e
endlich:
dbutils.notebook.exit (ex)
Um dieses Notebook auszuführen, benötigen wir ein kleines Util, das die Databricks Jobs API aufrufen kann. Das kann so aussehen:
def run_notebook ()
Notebook-Pfad,
Cluster-ID,
dbr_credentials,
basis_url,
Klassenname,
pkg_name,
):
def_timeout_sec = 180
def_check_timeout_sec = 5
job_name = 'Dummytest zur Integration'
base_parameters = {'Klassenname': Klassenname, 'Paketname': Paketname}
Auftragsdaten einreichen = {
'run_name': Jobname,
'existing_cluster_id': Cluster_ID,
'timeout_seconds': def_timeout_sec,
'notebook_task': {'notebook_path': Notebook_Pfad,
'Basisparameter': Basisparameter},
}
run_cmd = requests.post (base_url + 'api/2.0/jobs/runs/submit',
daten=json.dumps (job_submit_data),
auth=dbr_credentials)
runjson = run_cmd.text
d = json.loads (runjson)
job_run_id = d ['Lauf-ID']
print 'Job eingereicht (run_ID: '+ str (job_run_id) +') '
status_cmd = requests.get (base_url + 'api/2.0/jobs/runs/get? run_id='
+ str (job_run_id),
daten=json.dumps (job_submit_data),
auth=dbr_credentials)
jobjson = status_cmd.text
j = json.loads (jobjson)
print 'URL der Startseite: '+ j [' run_page_url ']
i = 0
job_timed_out = Falsch
obwohl nicht job_timed_out:
time.sleep (def_check_timeout_sec)
status_cmd = requests.get (base_url)
+ 'api/2.0/jobs/runs/get? run_id='
+ str (job_run_id),
daten=json.dumps (job_submit_data),
auth=dbr_credentials)
jobjson = status_cmd.text
j = json.loads (jobjson)
job_current_state = j ['Zustand'] ['Lebenszyklusstatus']
job_run_id = j ['Lauf-ID']
wenn job_current_state in ['TERMINATED', 'INTERNAL_ERROR',
'ÜBERSPRUNGEN']:
'Jobstatus: 'drucken + job_current_state
Pause
# maximal 30 Minuten prüfen
wenn ich >= def_timeout_sec:
job_timed_out = Wahr
status_cmd = requests.get (base_url)
+ 'api/2.0/jobs/runs/get-output? run_id='
+ str (job_run_id),
daten=json.dumps (job_submit_data),
auth=dbr_credentials)
jobjson = status_cmd.text
j = json.loads (jobjson)
'Auftragsstatus: TIMED OUT' drucken
print 'Der Auftrag dauert länger als erwartet. Weitere Informationen finden Sie im Auftragsprotokoll: '
drucke j ['metadata'] ['run_page_url']
Pause
i = i + 1
wenn nicht job_timed_out:
status_cmd = requests.get (base_url)
+ 'api/2.0/jobs/runs/get-output? run_id='
+ str (job_run_id),
daten=json.dumps (job_submit_data),
auth=dbr_credentials)
jobjson = status_cmd.text
j = json.loads (jobjson)
job_output = j ['Notebook_Ausgabe'] ['Ergebnis']
assert job_output == 'Keine', job_output
'Auftrag ERFOLGREICH AUSGEDRUCKT'
Da dieses Skript lokal ohne spezielles Token ausgeführt werden kann, habe ich die Methode test_TestDataFeed ersetzt, um run_notebook anstelle des eigentlichen Tests auszuführen. Es geht zu Databricks und führt für mich das Notebook aus, das meine Tests ausführen kann.
Die Ausnahmeregelung wird jetzt wie folgt aussehen. Denken Sie daran, dass ich das Databricks-Token als Umgebungsvariable übergebe, da dies in den GitHub-Geheimnissen aufbewahrt wird.
<base_url_of_databricks>BASIS-URL = ''
<notebook_path_of_your_test_notebook>NOTIZBUCHPFAD = ''
DBR_TOKEN = os.environ.get ('DBR_TOKEN')
DBR_CREDENTIALS = ('Token', DBR_TOKEN)
def test_TestDataFeed ():
Notizbuch ausführen (
cluster_id=Cluster_ID,
DBR_CREDENTIALS=DBR_CREDENTIALS,
base_url=Basis_URL,
notebook_path=Notizbuch_Pfad,
cls_name = 'Testdateneingabe',
pkg_name='modules.data_feed_test',
)
Der Workflow für die GitHub-Aktion führt dann pytest aus, um die Tests auszuführen. Der Workflow sollte in den folgenden Schritten abgeschlossen werden:
Name: Kontinuierlicher Einsatz - Databricks
auf:
drücken:
Zweige: [entwickeln, beherrschen]
Jobs:
bauen:
läuft auf: Ubuntu-latest
Schritte:
- verwendet: actions/checkout @v2
- Name: Python 3.7 einrichten
verwendet: Aktionen/Setup-Python @v2
mit:
Python-Version: 3.7
- Name: Voraussetzungen installieren
ausführen: |
pip install -r your_app_directory/requirements.txt
cat > ~/.databrickscfg <<EOF
[STANDARD]
host = https://eastus2.azuredatabricks.net
token = $ ([$ {GITHUB_REF##*/} == „master“] && echo „$ {{secrets.db_token_prod}}“ || echo „$ {{secrets.db_token_dev}}“)
EOF
— Name: Auf der Databricks-Testinstanz bereitstellen
ausführen: |
dbfs rm -r dbfs: /dein_app_verzeichnis_test
dbfs mkdirs dbfs: /ihre_app_verzeichnis_test/logs
finden.\ (! -regex '.*/\.. *'\) -Typ f -name „*.py“ | cut -c 11- $a | xargs -i dbfs cp -r die_app/ {} dbfs: /your_app_directory_test/ {} --overwrite
- Name: Dummy-Last testen
Umgebung:
DBR_TOKEN_SECRET_DEV: $ {{secrets.DB_TOKEN_DEV}}
ausführen: |
exportiere DBR_TOKEN="$DBR_TOKEN_SECRET_DEV“
pytest --workers 5 the_app/modules/tests/test_module_tested.py
- Name: Auf der Databricks-Zielinstanz bereitstellen
ausführen: |
finden.\ (! -regex '.*/\.. *'\) -type f -name „*.py“ | cut -c 11- $a | xargs -i dbfs cp -r die_app/ {} dbfs: /ihre_app_verzeichnis/ {} --overwrite
Sobald der Test abgeschlossen ist, zeigt die Ausgabe eine Zusammenfassung zu jedem Test:
Dummy-Last testen
====================== 32 wurde in 135,50 Sekunden (0:02:15) bestanden. ======================
Führen Sie pytest --workers 5 the_app/modules/tests/test_module_tested.py aus
=========================== Testsitzung beginnt ==============================
plattformlinux -- Python 3.7.8, pytest-6.0.1, py-1.9.0, pluggy-0.13.1
<repo_name>rootdir: /home/runner/work/ <repo_name>
Erweiterungen: Parallel-0.1.0, testmon-1.0.3
32 Artikel gesammelt
pytest-parallel: 5 Worker (Prozesse), 1 Test pro Worker (Thread)
...
====================== 32 wurde in 135,50 Sekunden (0:02:15) bestanden. ======================
Wenn eine der Prüfungen fehlgeschlagen ist, bleibt das Verzeichnis your_app_unberührt und der für die Zusammenführung verantwortliche Entwickler muss das genaue Problem in den Pytest-Protokollen überprüfen und entsprechend beheben. Und die App funktioniert immer noch!
Dies ist nur eine der möglichen Lösungen für dieses Problem, und wie immer sollte die ursprüngliche Idee nur ein Stumpf für eine umfassendere Diskussion über Verbesserungen der beschriebenen Verfahren sein. Die beschriebenen Tests decken nur die grundlegenden Tests ab, die sicherstellen können, dass die Anwendung keine größeren Probleme wie Syntaxfehler, fehlerhafte bereitgestellte oder unzugängliche Verzeichnisse hat, aber sie können eine gute Grundlage für komplexere Tests sein.
Besuchen Sie unseren Blog für ausführlichere Data Engineering-Artikel: