Optimierung von Apache Spark
Apache Spark ist ein leistungsstarkes Tool für die Datenverarbeitung, mit dem die Ausführungszeiten im Vergleich zu den MapReduce-Algorithmen von Hadoop oder der Verarbeitung einzelner Knoten um Größenordnungen verbessert werden können. Manchmal kann Spark jedoch selbst mit moderaten Datenmengen und einfachen Verknüpfungen zu kämpfen haben, sei es aufgrund einiger alter Gewohnheiten, die Programmierer von prozeduralen Verarbeitungssystemen übernehmen, oder einfach weil sie es einfach nicht besser wissen. Bevor Sie jedoch beginnen, Ihrem Computercluster weitere Knoten hinzuzufügen oder die bereits vorhandenen mit zusätzlichem RAM auszustatten (etwas, das nicht jeder Entwickler tun kann), finden Sie hier 5 Aspekte der Spark-API, die Sie möglicherweise berücksichtigen sollten, um sicherzustellen, dass Sie das Framework so verwenden, wie es verwendet werden soll:
Daten im Ruhezustand
Die Verwaltung der Speicherstrategie Ihrer Eingabe-/Ausgabe-/temporären Daten ist nicht nur für die Optimierung Ihrer eigenen Anwendung von entscheidender Bedeutung, sondern auch für jede andere Anwendung, die dieselben Daten für ihre Verarbeitung verwendet. Diese Strategie besteht unter anderem aus der Auswahl des richtigen Speichersystems, der richtigen Dateiformate und der Partitionierung der Daten.
Als Entwickler haben Sie wahrscheinlich nicht allzu viel Einfluss auf die Art des Speichersystems, das Sie mit Ihrer Anwendung warten müssen. Es ist jedoch wichtig, die Leistung eines bestimmten Speichers in Ihrem spezifischen Szenario zu berücksichtigen, d. h. ob es sich um die Durchsatzgeschwindigkeit, die Kosten der Datenänderung usw. handelt.
Dateiformate, die von Apache Spark unterstützt werden
Da Spark eine Vielzahl von Datenformaten unterstützt, ist es leicht, sich auf etwas festzulegen, das nicht optimal ist, nur weil wir mit dem ausgewählten Typ einigermaßen vertraut sind, und die potenziellen Leistungseinsparungen anderer, die oft auf verteilte Datenverarbeitung zugeschnitten sind, außer Acht zu lassen.
Sie sollten immer binäre, spaltenförmige Datenformate (z. B. Parquet) Alternativen (z. B. CSV) vorziehen. Sie lassen sich nicht nur schneller laden, sondern Sie können sich auch von der Bearbeitung von Randfällen (Escape-Zeichen, Zeilenumbrüche usw.) auf andere, produktivere Aufgaben konzentrieren.
Aufteilbare Dateitypen
Es ist leicht zu übersehen, dass einige Dateiformate nicht einfach in Blöcke aufgeteilt werden können, was Sie daran hindert, den verteilten Charakter des Spark-Frameworks zu nutzen. Dateien wie ZIP oder JSON müssen in ihrer Gesamtheit gelesen werden, bevor sie interpretiert werden können.
Stellen Sie sicher, dass die Dateien, die Sie in Spark laden, in diesem Sinne teilbar sind.
Partitionierung von Tabellen
Wenn Sie wissen, wie Ihre Tabellen am häufigsten von Endbenutzern (und von Ihnen selbst) gelesen werden, können Sie die Rohdaten der Tabelle auf intelligente Weise in Partitionen aufteilen, die nach dem Schlüssel aufgeteilt sind, den Sie abfragen. Durch die Partitionierung kann in bestimmten Szenarien die Gesamtmenge der in Ihren Spark-Cluster geladenen Daten begrenzt werden, was dramatische Auswirkungen auf die Geschwindigkeit der Codeausführung haben kann.
Datenschiefe in Apache Spark
„Schiefe“ ist ein statistischer Begriff, der verwendet wird, um die Asymmetrie der Daten in einer bestimmten Verteilung zu beschreiben. In Big Data bezeichnen wir Daten als „schief“, wenn sie nicht gleichmäßig verteilt sind und sich daher nur schwer gleichmäßig parallelisieren lassen. Dies wirkt sich insbesondere auf die Join-Operationen in Ihrem Code aus.
Ein Beispiel könnte eine Tabelle sein, in der ein bestimmter Schlüssel einer unverhältnismäßigen Anzahl von Zeilen zugewiesen ist. Wenn Spark eine Verknüpfung mit diesem Schlüssel durchführt, sendet Spark alle Datensätze mit demselben Join-Schlüssel an dieselbe Partition. Somit wird unser „beliebter“ Schlüssel gezwungen, innerhalb einer Aufgabe verarbeitet zu werden, was oft einen großen Shuffle-Overhead bedeutet.
Sie können diese Situationen überwachen, indem Sie auf die Spark-Benutzeroberfläche achten und darauf, was unter der Haube der Ausführung vor sich geht. Wenn Sie für Ihren Job eine Aufgabe identifizieren, die viel länger dauert als andere, mit großen Werten für Shuffle Read Size/Records, können Sie mit Sicherheit davon ausgehen, dass ein Knoten die meiste Arbeit während Ihrer Verarbeitung erledigt. Das ist höchstwahrscheinlich auf den Data Skew zurückzuführen. Wenn Sie wissen möchten, wie dies Ihrem Unternehmen helfen kann, besuchen Sie unsere Beratung im Bereich Datentechnik Seite.
Möglichkeiten, dies zu mildern:
- Verwendung von Broadcast Joins anstelle von Sort Merge Joins
Wenn Sie zwei Tabellen verbinden, von denen eine deutlich kleiner als die andere ist (klein genug, um in den Speicher des Executors zu passen), sollten Sie Spark vielleicht „anweisen“, Broadcast Join anstelle von Sort Merge Join zu verwenden. Dies kann zu einer dramatischen Verbesserung der Verbindungszeiten schiefer Daten führen. Weitere Informationen zu den „Hinweisen“ zum Verbinden: https://spark.apache.org/docs/latest/sql-ref-syntax-qry-select-hints.html#join-hints
Beachten Sie, dass Spark Broadcast-Joins automatisch plant, vorausgesetzt, eine der Join-Seiten ist kleiner als der Broadcast-Join-Schwellenwert (spark.sql.autoBroadcastJoinThreshold-Eigenschaft).
- Wichtiges Salzen
Die Idee hinter Key Salting besteht darin, Ihrem Schlüsselwert zusätzliche, künstliche Variabilität hinzuzufügen. Dies könnte erreicht werden, indem Sie Ihrer Hauptdatentabelle einheitlich zugewiesene „Buckets“ hinzufügen, Ihre Tabelle auf der rechten Seite mit demselben Bereich von Buckets auffüllen (alle anderen Werte als die Bucket-Nummer kopieren) und eine Verknüpfung über Schlüssel UND Bucket durchführen. Dadurch kann der größte einzelne Schlüsselwert auf die Anzahl der Partitionen aufgeteilt werden, die der Anzahl der Buckets entspricht, wodurch wiederum die Daten gleichmäßiger auf Ihre Knoten verteilt werden.
Daten in Spark zwischenspeichern
Der Hauptvorteil von Spark gegenüber dem Map-Reduce-Ansatz von Hadoop (dem ideologischen Vorgänger von Spark) ist die Fähigkeit, RAM für die Berechnung zu nutzen.
Spark verwendet jedoch auch Lazy Evaluation für seinen Ausführungsplan, was bedeutet, dass die Erstellung von Zwischenergebnissen verschoben wird, bis dies unbedingt erforderlich ist. Dies bedeutet auch, dass diese Zwischenergebnisse nicht im Speicher gespeichert werden, da Spark sich nur um den letzten Schritt des Plans kümmert. Das funktioniert gut, wenn Sie eine gerade Transformationspipeline haben, bei der jeder Schritt als Eingabe die Ausgabe des vorherigen verwendet.
Sobald ein Objekt jedoch mehrmals gelesen wird, führt dieser Ansatz dazu, dass es jedes Mal beim Lesen neu berechnet wird. Um dies zu vermeiden, können Sie mithilfe der eingebauten cache () -Methode angeben, welche Datenrahmen zu welchem Zeitpunkt im Speicher zwischengespeichert werden sollen.
Caching ist an sich schon ein fauler Vorgang, daher wird er erst ausgeführt, nachdem direkt auf die Daten zugegriffen wurde.
Neupartitionierung mit Apache Spark
Es ist wichtig zu bedenken, wie sich Ihre Manipulationstechniken auf das Datum auswirken und wie es knotenübergreifend gespeichert wird.
Wenn Sie beispielsweise einen sehr großen Datensatz filtern und am Ende einen vergleichsweise viel kleineren Datensatz haben, der Ihren Filterausdruck erfüllt, kann es vorkommen, dass einige Partitionen auf einigen Worker-Knoten leer sind. Dies führt dazu, dass die Daten ungleichmäßig über das Netzwerk verteilt sind, was den Grad der Parallelität verringert, der bei der Verarbeitung Ihrer neuen, gefilterten Daten erreicht werden kann.
Repartition () und coalesce () sind die integrierten Methoden, die entwickelt wurden, um einen Shuffle zu erzwingen, was zu einer gleichmäßigen Verteilung der Daten auf die Worker-Nodes führt.
Repartition () erstellt die Partitionen von Grund auf neu und führt einen kompletten Shuffle durch. Coalesce () führt Partitionen zusammen, was im Vergleich zu repartition () zu weniger Datenverschiebungen führt (Teile der Daten für die zusammengeführten Partitionen sind bereits vorhanden), kann jedoch zu ungleichmäßigen Partitionen führen, was wiederum zu Datenverzerrungen führt.
UDFs und Broadcast-Variablen
Benutzerdefinierte Funktionen (UDFs) sollten aus Performance-Gründen vermieden werden — sie erzwingen die Darstellung von Daten als Objekte in der JVM. Dies ist ein Preis, den Sie für die Fähigkeit zahlen, Ihre Transformationen in einer höheren Programmiersprache darzustellen. Die strukturierte API von Spark ist frei von dieser Einschränkung und ist im Allgemeinen die empfohlene Methode zur Durchführung von Datenmanipulationen.
Wenn Sie die UDFs jedoch in Ihrem Code verwenden möchten, sollten Sie die Verwendung von Broadcast-Variablen in Betracht ziehen. Wenn Sie ein Datenelement haben, das für mehrere UDF-Aufrufe verwendet wird, bedeutet das Senden, dass es auf allen Knoten eine einzige schreibgeschützte Kopie der Daten gibt, und sie müssen nicht bei jedem UDF-Aufruf erneut gesendet werden. Das Senden kann auch außerhalb des UDF-Anwendungsfalls nützlich sein, z. B. bei der Verwendung von Lookup-Tabellen.
Besuchen Sie unseren Blog für ausführlichere Data Engineering-Artikel:
- Azure SQL-Authentifizierung mit AD
- Einführung in Koalas und Databricks
- Databricks-Tests mit Github-Aktionen