Die „Shift-Left“-Revolution: Warum deine PySpark-Pipelines Unit Tests brauchen (und wie du das umsetzt)

Michal Milosz
Michal Milosz
February 27, 2026
9 min read
Loading the Elevenlabs Text to Speech AudioNative Player...

In der Data-Engineering-Welt haben wir eine schlechte Angewohnheit: Wir testen gerne in der Produktion.

Wir schreiben etwas SQL oder PySpark-Code, klicken im Databricks-Notebook auf „Run“, und wenn wir grüne Häkchen sehen, sagen wir: „Es funktioniert!“ Dann planen wir den Job ein, gehen nach Hause und hoffen auf das Beste.

Aber „es läuft“ bedeutet nicht „es funktioniert korrekt“.

Behandelt die Logik null-Werte richtig? Was passiert, wenn der Input leer ist? Hat dieser komplexe join wirklich die Duplikate herausgefiltert – oder sah es nur so aus?

Software Engineers haben dieses Problem vor 20 Jahren gelöst. Sie nennen es „Shift Left“ – also das Vorziehen von Tests in die frühestmögliche Phase der Entwicklung. Im Jahr 2026 ist es an der Zeit, dass Data Engineers aufhören, sich wie Cowboys zu verhalten, und anfangen, wie Software Engineers zu arbeiten.

Heute möchte ich über Unit Testing in PySpark sprechen: wie man Cloud-Kosten durch lokales Testen spart und wie man das Ganze mit Databricks Asset Bundles (DABs) und Azure DevOps automatisiert.

Das Problem: Integration Tests sind teuer

Die meisten Data-Teams verlassen sich auf Integration Tests. Das bedeutet: die komplette Pipeline auf echten (oder gesampelten) Daten in der Cloud laufen zu lassen.

Das Problem? Es ist langsam und teuer. Um eine kleine Logikänderung in einer Gold-Tabelle zu testen, musst du oft:

  • Einen Cluster hochfahren (5 Minuten).
  • Daten aus ADLS lesen.
  • Den gesamten Job ausführen.
  • Das Ergebnis prüfen.

Wenn du einen Bug hast, behebst du ihn – und wartest wieder 10 Minuten. Diese Feedback-Schleife ist zu lang. Und noch schlimmer: Du zahlst für Databricks-Compute nur, um zu überprüfen, ob 1 + 1 = 2.

Die Lösung: Unit Testing für PySpark

Unit Testing ist anders. Es testet nicht die gesamte Pipeline – es testet die Transformationslogik.

Damit das gut funktioniert, müssen wir ändern, wie wir Code schreiben. Wir müssen aufhören, riesige „Spaghetti-Code“-Notebooks zu bauen, und anfangen, Pure Functions zu schreiben.

Der „Spaghetti“-Weg (schlecht)

Typische Notebook-Zelle:

# A typical notebook cell
df = spark.read.table("sales")
df_clean = df.filter(col("amount") > 0).withColumn("tax", col("amount") * 0.23)
df_clean.write.save("clean_sales")

Das ist praktisch nicht unit-testbar, weil die Logik (Filter/Mathe) mit I/O (Lesen/Schreiben) vermischt ist. Du brauchst eine Datenquelle/Umgebung, um das überhaupt auszuführen.

Der „Shift-Left“-Weg (gut)

# logic.py
def calculate_tax_and_filter(df: DataFrame) -> DataFrame:
    return df.filter(col("amount") > 0).withColumn("tax", col("amount") * 0.23)

Jetzt kann ich calculate_tax_and_filter testen, ohne Dateien zu lesen – und sogar ohne dass ein Databricks-Cluster läuft.

Das Toolset: Pytest, Chispa und Local Spark

Für eine robuste Test-Suite brauchen wir drei Tools.

1. Pytest

Der Industriestandard für Python-Testing. Findet Tests, führt sie aus und erstellt einen sauberen Report.

2. Chispa

Eine Library speziell für PySpark-Tests (vom Entwickler hinter Delta Lake). PySpark-DataFrames sind komplex; du kannst nicht einfach assert df1 == df2 schreiben. Chispa liefert elegante Assertions:

assert_df_equality(actual_df, expected_df)

Wenn es nicht passt, siehst du genau, welche Zeile und Spalte abweicht.

3. Local Spark (Session mocken)

Du brauchst keinen 10-Node-Cluster, um eine Funktion zu testen. Du kannst pyspark auf deinem Laptop (oder CI-Agent) installieren und eine „Local Spark Session“ starten. Sie läuft in-memory, kostet $0 Cloud-Credits und startet in etwa 3 Sekunden.

Ein Praxisbeispiel: Eine Transformation testen

Angenommen, wir haben eine Funktion, die Kunden klassifiziert. So würde ich dafür einen Test mit pytest und chispa schreiben:

import pytest
from chispa.dataframe_comparer import assert_df_equality
from my_pipeline.transformations import classify_customer
# This fixture creates a tiny Spark session on your laptop
@pytest.fixture(scope="session")
def spark():
    return SparkSession.builder.master("local").appName("test").getOrCreate()
def test_classify_customer_logic(spark):
    # 1. Prepare Mock Data (No ADLS needed!)
    data = [("Alice", 100), ("Bob", 0), ("Charlie", -50)]
    schema = ["name", "spent"]
    source_df = spark.createDataFrame(data, schema)
    # 2. Define Expected Output
    expected_data = [("Alice", 100, "Active"), ("Bob", 0, "Inactive")]
    expected_schema = ["name", "spent", "status"]
    expected_df = spark.createDataFrame(expected_data, expected_schema)
    # 3. Run the Logic
    actual_df = classify_customer(source_df)
    # 4. Assert
    assert_df_equality(actual_df, expected_df, ignore_row_order=True)

Was ist hier passiert?

  • Keine Verbindung zu Azure.
  • Kein Warten auf den Cluster-Start.
  • Edge Cases (negative Werte) sofort getestet.

Integration in CI/CD: das „Guardrail“

Jetzt, wo wir Tests haben, müssen wir sie durchsetzen. Wir wollen uns nicht darauf verlassen, dass Entwickler daran denken, pytest lokal zu starten.

Hier kommen Azure DevOps und Databricks Asset Bundles (DABs) ins Spiel.

Workflow
  • Pull Request: Ein Entwickler erstellt einen PR in Azure DevOps.
  • Pipeline-Trigger: Bevor Code nach Databricks deployed wird, installiert der Azure-DevOps-Agent pyspark + pytest.
  • Tests ausführen: Er führt pytest tests/ aus.
  • Blockieren oder freigeben:
    • Wenn 100% der Tests erfolgreich sind → PR kann gemerged werden.
    • Wenn 1 Test fehlschlägt → PR wird blockiert. Keine Bugs deployen.

Das ist „Shift Left“ in Reinform: Wir finden den Bug im Pull Request, nicht im Production Job.

Coverage Reports

Um deinen Manager zu beeindrucken, füge pytest-cov hinzu. Das erzeugt einen Report, der zeigt, wie viel Prozent deines Codes durch Tests abgedeckt sind. Du kannst das direkt im Azure-DevOps-Dashboard anzeigen: „Hey Boss, unsere Pipeline hat 94% Test Coverage.“ Klingt deutlich besser als „Ich glaube, es funktioniert.“

Deployment mit Databricks Asset Bundles (DABs)

Wenn die Tests grün sind, bleibt die Frage: Wie deployen wir?

Früher haben wir Notebooks manuell kopiert. Heute nutzen wir DABs. Mit DABs definieren wir Jobs, Cluster und Libraries in yaml-Dateien direkt neben unserem Python-Code.

Weil unsere Logik jetzt in richtigen Python-Modulen statt in Notebooks liegt, können DABs sie als .whl (Wheel) paketieren und automatisch auf den Cluster hochladen.

Dein databricks.yml könnte so aussehen:

bundle:
  name: sales_pipeline

resources:
  jobs:
    daily_sales:
      tasks:
        - task_key: main_task
          python_wheel_task:
            package_name: my_pipeline
            entry_point: run_pipeline

Damit behandelst du deine Data Pipeline wie eine Software-Applikation: versioniert, getestet und paketiert.

Fazit: Lohnt sich der Aufwand?

Ich höre dich fragen: „Ist das nicht Over-Engineering? Ich will doch nur Daten bewegen.“

Wenn du solo eine Tabelle betreust: vielleicht. Aber wenn du im Team arbeitest, ein Data Mesh aufbaust oder kritische Finanzdaten verantwortest, ist das Pflicht.

Der „Shift-Left“-Ansatz:

  • Spart Geld: Bugs werden gefunden, bevor sie Cloud-Compute verbrennen.
  • Spart Nerven: Du kannst refactoren, ohne Angst vor versteckten Seiteneffekten.
  • Schafft Vertrauen: Das Business weiß, dass die Daten geprüft sind, bevor sie im Dashboard landen.

Wir sind Data Engineers. Fangen wir an zu engineer’n, als würden wir es ernst meinen.

Share this post
Data Engineering
Michal Milosz
MORE POSTS BY THIS AUTHOR
Michal Milosz

Curious how we can support your business?

TALK TO US