W świecie data engineering mamy zły nawyk. Lubimy testować na produkcji.
Pisujemy trochę SQL-a albo kodu PySpark, klikamy „Run” w notebooku Databricks, i jeśli widzimy zielone checkmarki, mówimy: „Działa!”. Potem to harmonogramujemy, idziemy do domu i liczymy na najlepsze.
Ale „uruchamia się” nie znaczy „działa poprawnie”.
Czy logika poprawnie obsługuje wartości null? Co się stanie, gdy wejście jest puste? Czy ten złożony join faktycznie odfiltrował duplikaty, czy tylko wyglądało, jakby to zrobił?
Software Engineers rozwiązali ten problem 20 lat temu. Nazywają to „Shift Left” — czyli przesunięciem testowania na możliwie najwcześniejszy etap rozwoju. W 2026 czas, aby Data Engineers przestali zachowywać się jak kowboje i zaczęli działać jak Software Engineers.
Dziś chcę opowiedzieć o testach jednostkowych (Unit Testing) w PySpark, o tym jak oszczędzać koszty chmury dzięki testom lokalnym oraz jak zautomatyzować całość z użyciem Databricks Asset Bundles (DABs) i Azure DevOps.
Problem: „testy integracyjne” są drogie
Większość zespołów danych polega na testach integracyjnych. To oznacza uruchamianie pełnego pipeline’u na prawdziwych (albo próbkowanych) danych w chmurze.
Problem? Jest to wolne i kosztowne. Żeby przetestować prostą zmianę logiki w tabeli Gold, często trzeba:
- Uruchomić klaster (5 minut).
- Wczytać dane z ADLS.
- Puścić cały job.
- Sprawdzić wynik.
Jeśli masz błąd — poprawiasz i czekasz kolejne 10 minut. Ta pętla feedbacku trwa za długo. Co gorsza, płacisz za compute w Databricks tylko po to, żeby sprawdzić, czy 1 + 1 = 2.
Rozwiązanie: testy jednostkowe PySpark
Testy jednostkowe działają inaczej. Nie testują całego pipeline’u. Testują logikę transformacji.
Żeby robić to skutecznie, musimy zmienić sposób pisania kodu. Musimy przestać pisać ogromne notebooki w stylu „spaghetti code” i zacząć pisać czyste funkcje (Pure Functions).
Podejście „spaghetti” (złe)
Typowa komórka w notebooku:
# A typical notebook cell
df = spark.read.table("sales")
df_clean = df.filter(col("amount") > 0).withColumn("tax", col("amount") * 0.23)
df_clean.write.save("clean_sales")
Tego praktycznie nie da się sensownie testować jednostkowo, bo logika (filtrowanie/matematyka) jest wymieszana z I/O (odczyt/zapis). Żeby to uruchomić, potrzebujesz bazy/danych/środowiska.
Podejście „Shift Left” (dobre)
# logic.py
def calculate_tax_and_filter(df: DataFrame) -> DataFrame:
return df.filter(col("amount") > 0).withColumn("tax", col("amount") * 0.23)
Teraz mogę testować calculate_tax_and_filter bez czytania plików i nawet bez działającego klastra Databricks.
Zestaw narzędzi: Pytest, Chispa i Local Spark
Aby zbudować solidny zestaw testów, potrzebujemy trzech narzędzi.
- Pytest
To standard branżowy dla testów w Pythonie. Wykrywa testy, uruchamia je i generuje czytelny raport. - Chispa
To biblioteka stricte do testowania PySpark (stworzona przez autora stojącego za Delta Lake). DataFrame’y PySpark to złożone obiekty — nie wystarczyassert df1 == df2. Chispa daje wygodne asercje:
assert_df_equality(actual_df, expected_df)Jeśli się nie zgadzają, dostajesz dokładną informację, w którym wierszu i kolumnie występuje różnica.
- Local Spark (mockowanie sesji)
Nie potrzebujesz klastra 10-node, żeby przetestować funkcję. Możesz zainstalowaćpysparkna laptopie (albo na agencie CI) i uruchomić „Local Spark Session”. Działa w pamięci, kosztuje $0 kredytów chmurowych i startuje w 3 sekundy.
Przykład z życia: testowanie transformacji
Załóżmy, że mamy funkcję klasyfikującą klientów. Tak wygląda test w pytest i chispa:
import pytest
from chispa.dataframe_comparer import assert_df_equality
from my_pipeline.transformations import classify_customer# This fixture creates a tiny Spark session on your laptop
@pytest.fixture(scope="session")
def spark():
return SparkSession.builder.master("local").appName("test").getOrCreate()def test_classify_customer_logic(spark):
# 1. Prepare Mock Data (No ADLS needed!)
data = [("Alice", 100), ("Bob", 0), ("Charlie", -50)]
schema = ["name", "spent"]
source_df = spark.createDataFrame(data, schema) # 2. Define Expected Output
expected_data = [("Alice", 100, "Active"), ("Bob", 0, "Inactive")]
expected_schema = ["name", "spent", "status"]
expected_df = spark.createDataFrame(expected_data, expected_schema) # 3. Run the Logic
actual_df = classify_customer(source_df) # 4. Assert
assert_df_equality(actual_df, expected_df, ignore_row_order=True)
Zauważ, co tu się stało:
- Brak połączenia z Azure.
- Brak czekania na start klastra.
- Od razu przetestowaliśmy edge case’y (ujemne wartości).
Integracja z CI/CD: „barierka bezpieczeństwa” (guardrail)
Skoro mamy testy, trzeba je egzekwować. Nie chcemy polegać na tym, że developerzy będą pamiętać o uruchamianiu pytest na swoich laptopach.
Tu wchodzą Azure DevOps i Databricks Asset Bundles (DABs).
Workflow
- Pull Request: developer tworzy PR w Azure DevOps.
- Pipeline startuje: zanim kod zostanie wdrożony do Databricks, agent Azure DevOps instaluje
pyspark+pytest. - Uruchom testy: wykonuje
pytest tests/. - Blokada albo sukces:
- Jeśli 100% testów przechodzi -> PR można zmergować.
- Jeśli 1 test pada -> PR jest blokowany. Nie wdrażasz bugów.
To jest definicja „Shift Left”: wyłapaliśmy błąd w Pull Request, a nie w jobie na produkcji.
Raporty pokrycia (Coverage Reports)
Żeby zrobić wrażenie na managerze, dodaj pytest-cov. Generuje raport pokazujący, jaki procent kodu jest pokryty testami. Możesz to wyświetlać bezpośrednio w dashboardzie Azure DevOps: „Hej, szefie, nasz pipeline ma 94% test coverage”. Brzmi znacznie lepiej niż „wydaje mi się, że działa”.
Deployment z Databricks Asset Bundles (DABs)
Skoro testy przeszły, jak wdrażać?
Kiedyś kopiowaliśmy notebooki ręcznie. Dziś używamy DABs. DABs pozwalają zdefiniować joby, klastry i biblioteki w plikach yaml obok kodu w Pythonie.
Ponieważ logika jest teraz w normalnych plikach Python (modułach), a nie w notebookach, DABs mogą spakować je do pliku .whl (Wheel) i automatycznie wrzucić na klaster.
Twój databricks.yml może wyglądać tak:
bundle:
name: sales_pipeline
resources:
jobs:
daily_sales:
tasks:
- task_key: main_task
python_wheel_task:
package_name: my_pipeline
entry_point: run_pipeline
Podsumowanie
Słyszę pytanie: „Czy to nie jest over-engineering? Ja tylko chcę przenosić dane.”
Jeśli jesteś solo developerem utrzymującym jedną tabelę — być może. Ale jeśli pracujesz w zespole, budujesz Data Mesh albo zarządzasz krytycznymi danymi finansowymi, to jest obowiązkowe.
Podejście „Shift Left”:
- Oszczędza pieniądze: łapiesz bugi zanim spalą compute w chmurze.
- Oszczędza nerwy: możesz refaktorować bez strachu, że zepsujesz ukrytą logikę.
- Buduje zaufanie: biznes wie, że dane są zweryfikowane, zanim trafią do dashboardu.
Jesteśmy Data Engineers. Zacznijmy inżynierować tak, jakbyśmy naprawdę mieli na myśli.

.webp)


