Rewolucja „Shift Left”: dlaczego Twoje pipeline’y PySpark potrzebują testów jednostkowych (i jak to zrobić)

Michał Miłosz
Michał Miłosz
February 27, 2026
9 min read
Loading the Elevenlabs Text to Speech AudioNative Player...

W świecie data engineering mamy zły nawyk. Lubimy testować na produkcji.

Pisujemy trochę SQL-a albo kodu PySpark, klikamy „Run” w notebooku Databricks, i jeśli widzimy zielone checkmarki, mówimy: „Działa!”. Potem to harmonogramujemy, idziemy do domu i liczymy na najlepsze.

Ale „uruchamia się” nie znaczy „działa poprawnie”.

Czy logika poprawnie obsługuje wartości null? Co się stanie, gdy wejście jest puste? Czy ten złożony join faktycznie odfiltrował duplikaty, czy tylko wyglądało, jakby to zrobił?

Software Engineers rozwiązali ten problem 20 lat temu. Nazywają to „Shift Left” — czyli przesunięciem testowania na możliwie najwcześniejszy etap rozwoju. W 2026 czas, aby Data Engineers przestali zachowywać się jak kowboje i zaczęli działać jak Software Engineers.

Dziś chcę opowiedzieć o testach jednostkowych (Unit Testing) w PySpark, o tym jak oszczędzać koszty chmury dzięki testom lokalnym oraz jak zautomatyzować całość z użyciem Databricks Asset Bundles (DABs) i Azure DevOps.

Problem: „testy integracyjne” są drogie

Większość zespołów danych polega na testach integracyjnych. To oznacza uruchamianie pełnego pipeline’u na prawdziwych (albo próbkowanych) danych w chmurze.

Problem? Jest to wolne i kosztowne. Żeby przetestować prostą zmianę logiki w tabeli Gold, często trzeba:

  • Uruchomić klaster (5 minut).
  • Wczytać dane z ADLS.
  • Puścić cały job.
  • Sprawdzić wynik.

Jeśli masz błąd — poprawiasz i czekasz kolejne 10 minut. Ta pętla feedbacku trwa za długo. Co gorsza, płacisz za compute w Databricks tylko po to, żeby sprawdzić, czy 1 + 1 = 2.

Rozwiązanie: testy jednostkowe PySpark

Testy jednostkowe działają inaczej. Nie testują całego pipeline’u. Testują logikę transformacji.

Żeby robić to skutecznie, musimy zmienić sposób pisania kodu. Musimy przestać pisać ogromne notebooki w stylu „spaghetti code” i zacząć pisać czyste funkcje (Pure Functions).

Podejście „spaghetti” (złe)

Typowa komórka w notebooku:

# A typical notebook cell
df = spark.read.table("sales")
df_clean = df.filter(col("amount") > 0).withColumn("tax", col("amount") * 0.23)
df_clean.write.save("clean_sales")


Tego praktycznie nie da się sensownie testować jednostkowo, bo logika (filtrowanie/matematyka) jest wymieszana z I/O (odczyt/zapis). Żeby to uruchomić, potrzebujesz bazy/danych/środowiska.

Podejście „Shift Left” (dobre)

# logic.py
def calculate_tax_and_filter(df: DataFrame) -> DataFrame:
    return df.filter(col("amount") > 0).withColumn("tax", col("amount") * 0.23)

Teraz mogę testować calculate_tax_and_filter bez czytania plików i nawet bez działającego klastra Databricks.

Zestaw narzędzi: Pytest, Chispa i Local Spark

Aby zbudować solidny zestaw testów, potrzebujemy trzech narzędzi.

  1. Pytest
    To standard branżowy dla testów w Pythonie. Wykrywa testy, uruchamia je i generuje czytelny raport.
  2. Chispa
    To biblioteka stricte do testowania PySpark (stworzona przez autora stojącego za Delta Lake). DataFrame’y PySpark to złożone obiekty — nie wystarczy assert df1 == df2. Chispa daje wygodne asercje:

assert_df_equality(actual_df, expected_df)

Jeśli się nie zgadzają, dostajesz dokładną informację, w którym wierszu i kolumnie występuje różnica.

  1. Local Spark (mockowanie sesji)
    Nie potrzebujesz klastra 10-node, żeby przetestować funkcję. Możesz zainstalować pyspark na laptopie (albo na agencie CI) i uruchomić „Local Spark Session”. Działa w pamięci, kosztuje $0 kredytów chmurowych i startuje w 3 sekundy.

Przykład z życia: testowanie transformacji

Załóżmy, że mamy funkcję klasyfikującą klientów. Tak wygląda test w pytest i chispa:

import pytest
from chispa.dataframe_comparer import assert_df_equality
from my_pipeline.transformations import classify_customer
# This fixture creates a tiny Spark session on your laptop
@pytest.fixture(scope="session")
def spark():
    return SparkSession.builder.master("local").appName("test").getOrCreate()
def test_classify_customer_logic(spark):
    # 1. Prepare Mock Data (No ADLS needed!)
    data = [("Alice", 100), ("Bob", 0), ("Charlie", -50)]
    schema = ["name", "spent"]
    source_df = spark.createDataFrame(data, schema)
    # 2. Define Expected Output
    expected_data = [("Alice", 100, "Active"), ("Bob", 0, "Inactive")]
    expected_schema = ["name", "spent", "status"]
    expected_df = spark.createDataFrame(expected_data, expected_schema)
    # 3. Run the Logic
    actual_df = classify_customer(source_df)
    # 4. Assert
    assert_df_equality(actual_df, expected_df, ignore_row_order=True)

Zauważ, co tu się stało:

  • Brak połączenia z Azure.
  • Brak czekania na start klastra.
  • Od razu przetestowaliśmy edge case’y (ujemne wartości).

Integracja z CI/CD: „barierka bezpieczeństwa” (guardrail)

Skoro mamy testy, trzeba je egzekwować. Nie chcemy polegać na tym, że developerzy będą pamiętać o uruchamianiu pytest na swoich laptopach.

Tu wchodzą Azure DevOps i Databricks Asset Bundles (DABs).

Workflow
  • Pull Request: developer tworzy PR w Azure DevOps.
  • Pipeline startuje: zanim kod zostanie wdrożony do Databricks, agent Azure DevOps instaluje pyspark + pytest.
  • Uruchom testy: wykonuje pytest tests/.
  • Blokada albo sukces:
    • Jeśli 100% testów przechodzi -> PR można zmergować.
    • Jeśli 1 test pada -> PR jest blokowany. Nie wdrażasz bugów.

To jest definicja „Shift Left”: wyłapaliśmy błąd w Pull Request, a nie w jobie na produkcji.

Raporty pokrycia (Coverage Reports)

Żeby zrobić wrażenie na managerze, dodaj pytest-cov. Generuje raport pokazujący, jaki procent kodu jest pokryty testami. Możesz to wyświetlać bezpośrednio w dashboardzie Azure DevOps: „Hej, szefie, nasz pipeline ma 94% test coverage”. Brzmi znacznie lepiej niż „wydaje mi się, że działa”.

Deployment z Databricks Asset Bundles (DABs)

Skoro testy przeszły, jak wdrażać?

Kiedyś kopiowaliśmy notebooki ręcznie. Dziś używamy DABs. DABs pozwalają zdefiniować joby, klastry i biblioteki w plikach yaml obok kodu w Pythonie.

Ponieważ logika jest teraz w normalnych plikach Python (modułach), a nie w notebookach, DABs mogą spakować je do pliku .whl (Wheel) i automatycznie wrzucić na klaster.

Twój databricks.yml może wyglądać tak:

bundle:
  name: sales_pipeline

resources:
  jobs:
    daily_sales:
      tasks:
        - task_key: main_task
          python_wheel_task:
            package_name: my_pipeline
            entry_point: run_pipeline

Podsumowanie

Słyszę pytanie: „Czy to nie jest over-engineering? Ja tylko chcę przenosić dane.”

Jeśli jesteś solo developerem utrzymującym jedną tabelę — być może. Ale jeśli pracujesz w zespole, budujesz Data Mesh albo zarządzasz krytycznymi danymi finansowymi, to jest obowiązkowe.

Podejście „Shift Left”:

  • Oszczędza pieniądze: łapiesz bugi zanim spalą compute w chmurze.
  • Oszczędza nerwy: możesz refaktorować bez strachu, że zepsujesz ukrytą logikę.
  • Buduje zaufanie: biznes wie, że dane są zweryfikowane, zanim trafią do dashboardu.

Jesteśmy Data Engineers. Zacznijmy inżynierować tak, jakbyśmy naprawdę mieli na myśli.

Share this post
Data Engineering
Michał Miłosz
MORE POSTS BY THIS AUTHOR
Michał Miłosz

Curious how we can support your business?

TALK TO US