Büyük verilerle çalıştığınızdaApache SparkOrtak bir performans sorunudata skewBu, birkaç anahtarındominateVerilerin dağıtımı,unevenpartisyonlar ve yavaş sorgular. esas olarak gerektiren işlemler sırasında meydana gelir.shufflinggibijoinsya da düzenliaggregations.
Döviz azaltmanın pratik bir yolusalting, bu, çoklu bölünmeler üzerinde ağır anahtarları yapay olarak yaymayı içerir. bu yazıda, bunu pratik bir örnekle size rehberlik edeceğim.
Salting Data Skew Sorunlarını Nasıl Çözer
eklemek için arandomlyBirleşik anahtarın sayısını oluşturduktan sonra bu kombine anahtarı birleştirerek, büyük anahtarları daha eşit şekilde dağıtabiliriz.Bu, veri dağılımını daha eşit hale getirir ve yükü daha fazla işçiye yayar, verilerin çoğunu bir işçiye göndermek yerine diğerlerini boş bırakır.
Salinanın Faydaları
-
Reduced Skew: Spreads data evenly across partitions, preventing a few workers overload and improves utilization.
-
Improved Performance: Speeds up joins and aggregations by balancing the workload.
-
Avoids Resource Contention: Reduces the risk of out-of-memory errors caused by large, uneven partitions.
Salatayı ne zaman kullanmalısınız
Kaydırılmış anahtarlarla birleştirme veya birleştirme sırasında, veri kaydırması nedeniyle uzun şeffaf zamanlar veya yürütücü hataları fark ettiğinizde tuzlama kullanın. Aynı zamanda, bölünmenin veri işleme verimliliğini etkilediği gerçek zamanlı akış uygulamalarında da yararlıdır, ya da çoğu işçi boşalırken birkaç kişi çalıştırma durumunda sıkışırsa.
Scala ile ilgili örnekler
Bazı verileri birunbalancedBirleştirmek istediğimiz iki veri kümesi var: biri büyük bir veri kümesi, diğeri ise küçük bir veri kümesi.
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
// Simulated large dataset with skew
val largeDF = Seq(
(1, "txn1"), (1, "txn2"), (1, "txn3"), (2, "txn4"), (3, "txn5")
).toDF("customer_id", "transaction")
// Small dataset
val smallDF = Seq(
(1, "Ahmed"), (2, "Ali"), (3, "Hassan")
).toDF("customer_id", "name")
Kullandığımız büyük veri kümelerine tuzlu sütunu ekleyelim.randomizationBüyük anahtarın değerlerini daha küçük bölümlere dağıtmak için
// Step 1: create a salting key in the large dataset
val numBuckets = 3
val saltedLargeDF = largeDF.
withColumn("salt", (rand() * numBuckets).cast("int")).
withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt"))
saltedLargeDF.show()
+-----------+-----------+----+------------------+
|customer_id|transaction|salt|salted_customer_id|
+-----------+-----------+----+------------------+
| 1| txn1| 1| 1_1|
| 1| txn2| 1| 1_1|
| 1| txn3| 2| 1_2|
| 2| txn4| 2| 2_2|
| 3| txn5| 0| 3_0|
+-----------+-----------+----+------------------+
Büyük veri kümelerindeki tüm olası rastgele tuzlu anahtarları kapsayacağımızı sağlamak için,explodeTüm olası tuzlu değerlerle küçük veri kümesi
// Step 2: Explode rows in smallDF for possible salted keys
val saltedSmallDF = (0 until numBuckets).toDF("salt").
crossJoin(smallDF).
withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt"))
saltedSmallDF.show()
+----+-----------+------+------------------+
|salt|customer_id| name|salted_customer_id|
+----+-----------+------+------------------+
| 0| 1| Ahmed| 1_0|
| 1| 1| Ahmed| 1_1|
| 2| 1| Ahmed| 1_2|
| 0| 2| Ali| 2_0|
| 1| 2| Ali| 2_1|
| 2| 2| Ali| 2_2|
| 0| 3|Hassan| 3_0|
| 1| 3|Hassan| 3_1|
| 2| 3|Hassan| 3_2|
+----+-----------+------+------------------+
Şimdi iki veritabanını kolayca birleştirebiliriz
// Step 3: Perform salted join
val joinedDF = saltedLargeDF.
join(saltedSmallDF, Seq("salted_customer_id", "customer_id"), "inner").
select("customer_id", "transaction", "name")
joinedDF.show()
+-----------+-----------+------+
|customer_id|transaction| name|
+-----------+-----------+------+
| 1| txn2| Ahmed|
| 1| txn1| Ahmed|
| 1| txn3| Ahmed|
| 2| txn4| Ali|
| 3| txn5|Hassan|
+-----------+-----------+------+
Python ile ilgili örnekler
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, rand, lit, concat
from pyspark.sql.types import IntegerType
# Simulated large dataset with skew
largeDF = spark.createDataFrame([
(1, "txn1"), (1, "txn2"), (1, "txn3"), (2, "txn4"), (3, "txn5")
], ["customer_id", "transaction"])
# Small dataset
smallDF = spark.createDataFrame([
(1, "Ahmed"), (2, "Ali"), (3, "Hassan")
], ["customer_id", "name"])
# Step 1: create a salting key in the large dataset
numBuckets = 3
saltedLargeDF = largeDF.withColumn("salt", (rand() * numBuckets).cast(IntegerType())) \
.withColumn("salted_customer_id", concat(col("customer_id"), lit("_"), col("salt")))
# Step 2: Explode rows in smallDF for possible salted keys
salt_range = spark.range(0, numBuckets).withColumnRenamed("id", "salt")
saltedSmallDF = salt_range.crossJoin(smallDF) \
.withColumn("salted_customer_id", concat(col("customer_id"), lit("_"), col("salt")))
# Step 3: Perform salted join
joinedDF = saltedLargeDF.join(
saltedSmallDF,
on=["salted_customer_id", "customer_id"],
how="inner"
).select("customer_id", "transaction", "name")
Notlar
- Bu kod, Scala’yı taklit etmek için spark.range(...) kullanır (0 numBuckets).toDF (“salt”).
- Sütun ifadeleri col(...), lit(...), ve concat(...) kullanılarak işlenir.
- Tam sayıyı oluşturmak için .cast (IntegerType()) kullanın.
Tuning Tipi: SeçmenumBuckets
Numaralar
- NumBuckets = 100'e ayarlanırsa, her anahtar 100 alt bölüme bölünebilir. Ancak, çok fazla kutu kullanarak performans düşebilir çünkü dikkatli olun, özellikle de az veri olan anahtarlar için. Her zaman veritabanının sapma profili temelinde farklı değerleri test edin.
- If you know how to identify the skewed keys, then you can apply the salting for those keys only, and set the salting for other keys as literal
0
, e.x.-
// Step 1: create a salting key in the large dataset val numBuckets = 3 val saltedLargeDF = largeDF. withColumn("salt", when($"customer_id" === 1, (rand() * numBuckets).cast("int")).otherwise(lit(0))). withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt")) // Step 2: Explode rows in smallDF for possible salted keys val saltedSmallDF = (0 until numBuckets).toDF("salt"). crossJoin(smallDF.filter($"customer_id" === 1)). select("customer_id", "salt", "name"). union(smallDF.filter($"customer_id" =!= 1).withColumn("salt", lit(0)).select("customer_id", "salt", "name")). withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt"))
-
Rule of Thumb:Küçük (örneğin, 10-20) başlatın ve gözlemlenen shuffle boyutlarına ve görev sürelerine göre yavaş yavaş artırın.
Son Düşünceler
Geleneksel bölünme veya ipuçları sırasında Apache Spark'ta kaydırma yönetimi için etkili ve basit bir yöntemdir (SKEWED JOIN
Doğru ayarlanma ve izleme ile, bu teknik, çok sapmış veri kümelerinde işin gerçekleştirme süresini önemli ölçüde azaltabilir.
Orijinal olarak 11 Mayıs 2025 tarihinde https://practical-software.com adresinde yayınlandı.
Başlangıçta yayınlanan