Optymalizacja Apache Spark
Apache Spark to potężne narzędzie do przetwarzania danych, które pozwala na wielokrotne skrócenie czasów wykonania w porównaniu do algorytmów MapReduce w Hadoop lub przetwarzania na pojedynczym węźle. Czasami jednak, czy to z powodu starych nawyków programistów wyniesionych z systemów proceduralnych, czy po prostu braku wiedzy, Spark może mieć problemy nawet z umiarkowanymi ilościami danych i prostymi joinami. Zanim zaczniesz dodawać kolejne węzły do swojego klastra lub dokładać RAM do już istniejących (co nie zawsze jest możliwe), oto 5 aspektów API Spark, które warto rozważyć, aby mieć pewność, że korzystasz z frameworka zgodnie z jego przeznaczeniem:
Data at Rest
Zarządzanie strategią przechowywania danych wejściowych/wyjściowych/tymczasowych jest kluczowe nie tylko dla optymalizacji własnej aplikacji, ale także każdej innej, która korzysta z tych samych danych. Strategia ta obejmuje m.in. wybór odpowiedniego systemu przechowywania, formatów plików i partycjonowania danych.
Jako programista prawdopodobnie nie masz dużego wpływu na typ systemu przechowywania, z którym musi współpracować Twoja aplikacja. Warto jednak wiedzieć, jak dany storage sprawdza się w Twoim scenariuszu – np. pod względem przepustowości czy kosztów modyfikacji danych.
Formaty plików obsługiwane przez Apache Spark
Spark obsługuje wiele formatów danych, przez co łatwo wybrać coś suboptymalnego tylko dlatego, że jesteśmy z tym formatem zaznajomieni, ignorując potencjalne zyski wydajnościowe innych, często stworzonych z myślą o przetwarzaniu rozproszonym.
Zawsze warto preferować binarne, kolumnowe formaty danych (np. Parquet) zamiast alternatyw (np. CSV). Są one nie tylko szybsze w ładowaniu, ale pozwalają też skupić się na innych zadaniach zamiast obsługi przypadków brzegowych (znaki specjalne, nowe linie itd.).
Splittable file types
Łatwo przeoczyć, że niektóre formaty plików nie mogą być łatwo dzielone na fragmenty, co uniemożliwia wykorzystanie rozproszonej natury Spark. Pliki takie jak ZIP czy JSON muszą być wczytane w całości przed interpretacją.
Upewnij się, że pliki ładowane do Spark są pod tym względem splittable.
Table Partitioning
Znając sposób, w jaki Twoje tabele będą najczęściej odczytywane przez użytkowników końcowych (i przez Ciebie), możesz inteligentnie podzielić surowe dane tabeli na partycje, rozdzielając je po kluczu, po którym będziesz zapytania. Partycjonowanie pozwala w niektórych scenariuszach ograniczyć ilość danych ładowanych do klastra Spark, co może znacząco przyspieszyć wykonanie kodu.
Data skewness w Apache Spark
„Skewness” to termin statystyczny opisujący asymetrię rozkładu danych. W Big Data mówimy o „skewed data”, gdy dane nie są równomiernie rozłożone, przez co trudno je równomiernie zrównoleglić. Szczególnie dotyczy to operacji Join.
Przykładem może być tabela, w której jeden klucz występuje w nieproporcjonalnie dużej liczbie wierszy. Podczas joinowania po tym kluczu Spark wysyła wszystkie rekordy z tym samym kluczem do tej samej partycji. W efekcie „popularny” klucz jest przetwarzany w jednym zadaniu, co często oznacza duży narzut Shuffle.
Możesz monitorować takie sytuacje, obserwując Spark UI i to, co dzieje się pod maską. Jeśli zauważysz, że jedno zadanie trwa znacznie dłużej niż inne, a wartości Shuffle Read Size / Records są duże, możesz założyć, że jeden węzeł wykonuje większość pracy – to najprawdopodobniej efekt Data Skew. Jeśli chcesz dowiedzieć się, jak to może pomóc Twojej firmie, odwiedź naszą stronę Data Engineering Consultancy.
Jak sobie z tym radzić:
- Broadcast Join zamiast Sort Merge Join
Jeśli łączysz dwie tabele, z których jedna jest znacznie mniejsza (mieści się w pamięci executora), możesz „zasugerować” Sparkowi użycie Broadcast Join zamiast Sort Merge Join. Może to znacząco przyspieszyć joiny na skewed data. Więcej o join hints: https://spark.apache.org/docs/latest/sql-ref-syntax-qry-select-hints.html#join-hints
Pamiętaj, że Spark sam planuje Broadcast Join, jeśli jedna ze stron joinu jest mniejsza niż próg spark.sql.autoBroadcastJoinThreshold. - Key salting
Key salting polega na wprowadzeniu dodatkowej, sztucznej zmienności do wartości klucza. Można to osiągnąć przez dodanie losowych „bucketów” do głównej tabeli, powielenie tabeli po prawej stronie joinu o ten sam zakres bucketów (kopiując wartości poza numerem bucketa) i wykonanie joinu po kluczu i buckecie. Pozwala to rozłożyć największy klucz na liczbę partycji równą liczbie bucketów, co równomierniej rozkłada dane.
Caching Data in Spark
Główną przewagą Spark nad MapReduce w Hadoop jest możliwość wykorzystania RAM w obliczeniach.
Spark stosuje jednak Lazy Evaluation, czyli odkładanie obliczeń do momentu, gdy są naprawdę potrzebne. Oznacza to, że wyniki pośrednie nie są zapisywane w pamięci, bo Spark interesuje tylko końcowy wynik planu. To działa dobrze, gdy pipeline transformacji jest liniowy.
Jednak gdy obiekt jest odczytywany więcej niż raz, jest za każdym razem przeliczany od nowa. Aby tego uniknąć, możesz wskazać, które DataFrame’y mają być cachowane w pamięci, używając metody cache().
Cache’owanie jest również operacją leniwą – wykona się dopiero po bezpośrednim dostępie do danych.
Repartitioning w Apache Spark
Ważne jest, jak Twoje techniki manipulacji wpływają na dane i ich rozkład na węzłach.
Na przykład, filtrując bardzo duży zbiór danych i zostawiając znacznie mniejszy, możesz skończyć z pustymi partycjami na niektórych workerach. To prowadzi do nierównomiernego rozkładu danych i zmniejsza poziom równoległości.
Repartition() i coalesce() to metody, które wymuszają shuffle i równomierny rozkład danych.
Repartition() tworzy partycje od nowa i wykonuje pełny shuffle. Coalesce() łączy partycje, co oznacza mniej ruchu danych, ale może prowadzić do nierównych partycji i data skew.
UDFs i Broadcast Variables
User-Defined Functions (UDFs) powinny być unikane ze względów wydajnościowych – wymuszają reprezentację danych jako obiektów w JVM, co jest kosztowne. Spark’s structured API jest wolne od tego ograniczenia i generalnie zalecane do manipulacji danymi.
Jeśli jednak musisz użyć UDF, rozważ Broadcast Variables. Jeśli masz dane wykorzystywane w wielu wywołaniach UDF, broadcastowanie oznacza, że na każdym węźle jest jedna, tylko do odczytu kopia danych, bez konieczności przesyłania ich przy każdym wywołaniu. Broadcast jest przydatny także poza UDF, np. przy lookup tables.
Analityka big data w czasie rzeczywistym definicja i korzyści
Inżynieria danych automatyzacja dla wydajności
Data stream solutions jak ich używać