Testowanie Databricks z działaniami GitHub
Dla tych, którzy go zainspirowali i nigdy go nie przeczytają. I dla Marcina!
Wprowadzenie
Databricks Connect (więcej informacji tutaj) zapewnia dobry sposób interakcji z klastrami Azure Databricks na komputerze lokalnym (przy użyciu IDE lub dowolnej aplikacji niestandardowej). Konfiguracja jest prosta - podajesz adres URL i osobisty token wygenerowany z obszaru roboczego Databricks i możesz rozpocząć pracę z DBFS, klastrami itp. Działa to bezbłędnie, dopóki nie chcesz rozpocząć uruchamianie kodu aplikacji lokalnie, np. jeśli potrzebujesz użyć dbutils. W tym celu musisz wygenerować kolejny osobisty token z podwyższonymi uprawnieniami (do zrobienia tylko w notatniku Databricks), który jest ważny tylko przez 48 godzin. W przypadku niektórych podstawowych kontroli jest to więcej niż wystarczające, ale jeśli chcesz użyć go w bardziej zautomatyzowanym rozwiązaniu, istnieje potrzeba regeneracji tego tokena co dwa dni, aby działał. Ponieważ nie mogłem się doczekać przeprowadzenia kilku testów integracyjnych dla mojej aplikacji Databricks, musiałem znaleźć bardziej solidne rozwiązanie, ponieważ miało ono być wykonywane dla mojej gałęzi deweloperskiej za każdym razem, gdy wykryto push.
Potrzebujesz pomocy z databricks? Sprawdź nasze usługi inżynierii danych i zobacz, jak możemy Ci pomóc.
Pomysł testowania integracji dla mojej aplikacji
Ponieważ moja aplikacja miała ponad 50 kanałów danych, każdy zorganizowany w osobną klasę pythona obejmującą przetwarzanie od końca do końca, od odczytu ze źródła, przekształceń poprzez zapisywanie w końcowej warstwie do pamięci BLOB i tabeli Azure Synapse. Ponieważ niektórzy użytkownicy biznesowi (właściwie zaawansowani użytkownicy) korzystali ze środowiska DEV Databricks, chciałem mieć sposób na upewnienie się, że każda zmiana połączona z gałęzi funkcji z gałęzią deweloperską działa poprawnie, zanim została faktycznie zainstalowana w DEV Databricks.
Pierwotnie mój przepływ pracy w GitHub wydawał gałąź programistyczną za każdym razem, gdy wykrywał push do katalogu aplikacji. Chodziło o to, aby przetestować gałąź programistyczną przed jej wydaniem do katalogu aplikacji i na podstawie wyniku testu tylko w przypadku powodzenia, wydana do katalogu docelowego.
Nowy przepływ pracy miał wyglądać następująco:
- Zainstaluj wymagania wstępne
- Wdrożenie w katalogu Databricks TEST
- Przetestuj obciążenie fikcyjne za pomocą katalogu TEST
- Wdrożenie w katalogu docelowym Databricks
Jedynym problemem było to, że nadal potrzebowałem tego podwyższonego tokena uprawnień, aby móc uruchamiać moje klasy lokalnie na zaproponowanym komputerze, który wykonywał przepływ pracy.
Rozwiązanie - Databricks Jobs API
Databricks Job API zapewnia możliwość uruchamiania zadań wewnątrz obszaru roboczego Databricks. Może to być zadanie niestandardowe lub powiązane z istniejącym notatnikiem. Gdy będziemy w stanie skonfigurować notatnik, który wykonuje wszystkie potrzebne operacje w Databricks, nie musimy już mieć podwyższonego tokena uprawnień i jesteśmy w stanie skonfigurować zautomatyzowany sposób testowania kanałów.
Rozważmy tę prostą klasę paszy poniżej. Stworzyłem go tylko dla tego artykułu, więc brakuje mu wielu funkcji i zasadniczo wszystko, co robi, to odczytanie ze źródła i zapisanie go w pamięci BLOB (obie lokalizacje zamontowane wewnątrz DBFS):
z pyspark.sql importuj SparkSession
klasa TestDataFeed (obiekt):
„"” Klasa Dummy do testowania integracji „"”
def __init__ (self, dbutils):
siebie. _df = Brak
siebie. _ctx = SparkSession.builder.getOrCreate ()
self.dbutils = dbutils
self.source_dir = '/mnt/test_in/test_integracji/srce_dir'
self.destination_dir = '/mnt/test_out/test_integracji/dest_dir'
def odczyt (samo, ścieżka = brak):
path = path lub self.source_dir
iskra = ja. _ctx
opts = dict (format = 'parkiet', kompresja = 'snappy')
siebie. _df = spark.read.load (ścieżka, **opcje)
powrót sam
transformacja def (ja):
przejść
def save2blob (self, Path=brak):
path = path lub self.destination_dir
opts = dict (tryb = 'nadpis', format = 'parkiet', kompresja = 'snappy')
siebie. _df.write.save (ścieżka, **opts)
def run (self):
Self.read ()
samotransformacja ()
self.save2blob ()
Prosty test dla tej klasy czytałby tylko z katalogu źródłowego i policzyć liczbę pobranych rekordów. Metoda będzie wyglądać następująco:
def test_testDataFeed ():
o = TestDataFeed (dbutils)
o.read ()
o.transformacja ()
y = o._df.count ()
potwierdź y> 0, „TestDataFeed Dummy pull”
Ogólnie testy mogą być dokładniejsze i sprawdzać wyniki przekształceń oraz czy zapisanie w katalogu docelowym i/lub tabeli ASDW powiodło się, ale w tym przypadku zakończymy test tylko tym sprawdzeniem.
Zwykle metoda testowa byłaby umieszczona w module wewnątrz katalogu testów poza testowanym modułem:
• module_tested.py
— klasa TestDataFeed
• Testy/
— test_module_tested.py
• Test_testDataFeed ()
Ale aby móc korzystać z dbutils, będziemy musieli go trochę zmodyfikować i umieścić go w notebooku Databricks, aby mógł używać dbutils.
Zamiast tworzyć notatnik dla każdej funkcji testowej, notatnik będzie ogólny i będzie mógł odczytać nazwę klasy przekazaną jako parametr. Na podstawie nazwy klasy zaimportuje ją dynamicznie i wykona funkcję testową. Może to wyglądać jak poniższy przykład:
import sys
sys.path.insert (0, '/dbfs/twó_app_katalog_test')
import datę i godzinę
import importlib
cls_name = GetArgument („nazwa_klasy”)
pkg_name = GetArgument („nazwa pkg_name”)
spróbuj:
jeśli nie cls_name lub nie pkg_name:
raise ImportError ('Nazwa klasy lub nazwa pakietu nie może być pusta! ')
d=datetime.datetime.now ()
ex = brak
clss = Brak
o = Brak
module = importlib.import_module (nazwa pkg_name)
clss = getattr (moduł, nazwa cls_)
o = clss ()
o.read ()
o.transformacja ()
y = o._df.count ()
assert y> 0, cls_name + "dummy pull”
z wyjątkiem wyjątku jako e:
ex = e
wreszcie:
dbutils.notebook.exit (ex)
Aby uruchomić ten notatnik, będziemy potrzebować małego utila, który ma możliwość wywołania interfejsu API Databricks Jobs, który może wyglądać następująco:
def run_notebook (
ścieżka notatnika,
identyfikator_klastra,
dane uwierzytelniające dbr,
base_url,
nazwa_cl_name,
nazwa pkgname,
):
def_timeout_sec = 180
def_check_timeout_sec = 5
job_name = 'Test fałszywy integracji'
base_parameters = {'nazwa_klasy': nazwa cls_, 'pkg_name': nazwa pkg_name}
dane przesyłania pracy = {
'run_name': nazwa_pracy,
'existing_cluster_id': identyfikator_klastra,
'timeout_sekunds': def_timeout_sec,
'notebook_task': {'ścieżka notatnika': ścieżka_notatnika,
'parametry_baze': parametry_bazy},
}
run_cmd = żądanie.post (base_url + 'api/2.0/jobs/runs/submit',
data=json.dumps (zadania_przesyłania danych),
auth=dbr_poświadczenia)
runjson = run_cmd.text
d = json.loads (runjson)
job_run_id = d ['identyfikator']
print 'Zadanie przesłane (Run_ID: '+ str (job_run_id) +') '
status_cmd = requests.get (base_url + 'api/2.0/jobs/runs/get? run_id='
+ str (job_run_id),
data=json.dumps (zadania_przesyłania danych),
auth=dbr_poświadczenia)
jobjson = status_cmd.text
j = json.loads (jobjson)
print 'Uruchom adres URL strony: '+ j [' run_page_url ']
i = 0
job_timed_out=Fałsz
podczas gdy nie job_timed_out:
time.sleep (def_check_timeout_sec)
status_cmd = żądanie.get (base_url
+ 'api/2.0/jobs/runs/get? run_id='
+ str (job_run_id),
data=json.dumps (zadania_przesyłania danych),
auth=dbr_poświadczenia)
jobjson = status_cmd.text
j = json.loads (jobjson)
job_current_state = j ['stan'] ['stan_cyklu życia']
job_run_id = j ['identyfikator']
jeśli job_current_state w ['TERMINATED', 'INTERNAL_ERROR',
„POMINIĘTY”]:
print 'Status zadania: '+ job_current_state
przerwa
# sprawdź przez maksymalnie 30 minut
jeśli i >= def_timeout_sec:
job_timed_out=Prawda
status_cmd = żądanie.get (base_url
+ 'api/2.0/jobs/runs/get-output? run_id='
+ str (job_run_id),
data=json.dumps (zadania_przesyłania danych),
auth=dbr_poświadczenia)
jobjson = status_cmd.text
j = json.loads (jobjson)
wydrukuj „Status zadania: WYGASŁ CZAS”
drukuj „Praca zajmuje więcej niż oczekiwano. Aby uzyskać więcej informacji, zapoznaj się z dziennikiem pracy: '
drukuj j ['metadane'] ['run_page_url']
przerwa
i = i+ 1
jeśli nie job_timed_out:
status_cmd = żądanie.get (base_url
+ 'api/2.0/jobs/runs/get-output? run_id='
+ str (job_run_id),
data=json.dumps (zadania_przesyłania danych),
auth=dbr_poświadczenia)
jobjson = status_cmd.text
j = json.loads (jobjson)
job_output = j ['notebook_output'] ['wynik']
assert job_output == 'Brak', job_output
drukuj „ZADANIE UDAŁO SIĘ”
Ponieważ ten skrypt może być wykonywany lokalnie bez specjalnego tokena, zastąpiłem metodę test_testDataFeed, aby uruchomić run_notebook zamiast rzeczywistego testu. Trafi do Databricks i wykona dla mnie notebook, który jest w stanie uruchomić moje testy.
Zwolnienie będzie teraz wyglądać jak poniżej. Pamiętaj, że przekazuję token Databricks jako zmienną env, ponieważ jest to przechowywane w tajemnicach GitHub.
<base_url_of_databricks>BASE_URL = ''
<notebook_path_of_your_test_notebook>NOTEBOOK_PATH = ''
DBR_TOKEN = os.environment.get ('DBR_TOKEN')
DBR_CREDENTIALS = ('token', DBR_TOKEN)
def test_testDataFeed ():
run_notebook (
cluster_id=Cluster_ID,
DBR_CREDENTIALS=Dane uwierzytelniające DBR_,
base_url=Base_URL,
notebook_path=Ścieżka notatnika,
cls_name='Dane testowe',
pkg_name='moduł.data_feed_test',
)
Przepływ pracy dla akcji GitHub wykona następnie pytest w celu uruchomienia testów. Przepływ pracy należy wykonać w następujących krokach:
nazwa: Ciągłe wdrażanie - Databricks
na:
nacisnąć:
gałęzie: [rozwijaj, mistrz]
oferty pracy:
buduj:
uruchomienie: ubuntu-latest
kroki:
- wykorzystuje: akcje/kasę @v2
- nazwa: Skonfiguruj Python 3.7
używa: actions/setup-python @v2
z:
wersja python: 3.7
- nazwa: Zainstaluj wymagania wstępne
bieg: |
pip install -r your_app_directory/requirements.txt
cat > ~/.databrickscfg <<EOF
[DOMYŚLNIE]
host = https://eastus2.azuredatabricks.net
token = $ ([$ {GITHUB_REF##*/} == „master”] && echo „$ {{Secrets.DB_Token_Prod}}” || echo „$ {{Secrets.db_Token_dev}}”)
EOF
- nazwa: Wdrożyć do instancji testowej Databricks
bieg: |
dbfs rm -r dbfs: /twó_app_katalog_test_katalogu
dbfs mkdirs dbfs: /twórz_app_katalog_test/dzienniki
znajdź.\ (! -regex '.*/\.. *'\) -type f -name „*.py” | cut -c 11- $a | xargs -i dbfs cp -r the_app/ {} dbfs: /your_app_directory_test/ {} --overwrite
- nazwa: Testowy ładunek manekinowy
env:
DBR_TOKEN_SECRET_DEV: $ {{Secrets.db_token_dev}}
bieg: |
eksportuj DBR_TOKEN="$DBR_TOKEN_SECRET_DEV”
pytest --workers 5 the_app/modules/tests/test_module_tested.py
- nazwa: Wdrożyć do docelowej instancji Databricks
bieg: |
znajdź.\ (! -regex '.*/\.. *'\) -type f -name „*.py” | cut -c 11- $a | xargs -i dbfs cp -r the_app/ {} dbfs: /your_app_directory/ {} --overwrite
Po zakończeniu testu wynik pokazuje podsumowanie każdego testu:
Przetestuj obciążenie manekinem
====================== 32 minęło w latach 135.50 (0:02:15) ======================
Uruchom pytest --workers 5 the_app/modules/tests/test_module_tested.py
====================================== Rozpoczyna się sesja testowa ==============================
platforma linux - Python 3.7.8, pytest-6.0.1, py-1.9.0, pluggy-0.13.1
<repo_name>rootdir: /home/runner/work//<repo_name>
wtyczki: parallel-0.1.0, testmon-1.0.3
zebrano 32 przedmiotów
pytest-parallel: 5 pracowników (procesy), 1 test na pracownika (wątek)
...
====================== 32 minęło w latach 135.50 (0:02:15) ======================
Jeśli którekolwiek z kontroli nie powiodło się, katalog your_app_pozostaje nietknięty, a programista odpowiedzialny za scalanie musi sprawdzić dokładny problem w dziennikach pytest i odpowiednio naprawić. A aplikacja nadal działa!
Jest to tylko jedno z możliwych rozwiązań tego problemu i jak zawsze oryginalny pomysł powinien być jedynie kropkiem do szerszej dyskusji na temat wszelkich ulepszeń opisanych procedur. Opisane testy obejmują tylko bardzo podstawowe testy, które mogą zapewnić, że aplikacja nie ma większych problemów, takich jak błędy składniowe, dostarczone nieprawidłowe katalogi lub niedostępne, ale mogą być dobrą bazą dla bardziej złożonych testów.
Odwiedź nasz blog, aby uzyskać bardziej szczegółowe artykuły dotyczące inżynierii danych:
- Co to jest denormalizacja bazy danych?
- Czym jest reklama programowa?
- Analityka Big Data w czasie rzeczywistym