Khi làm việc với các tập dữ liệu lớn trongApache SparkMột vấn đề về hiệu suất chung làdata skewĐiều này xảy ra khi một vài phímdominatephân phối dữ liệu, dẫn đếnunevenphân vùng và truy vấn chậm. Nó chủ yếu xảy ra trong các hoạt động đòi hỏishufflingNhưjoinshoặc thường xuyênaggregations.
Một cách thực tế để giảm lệch làsalting, liên quan đến việc nhân tạo phân tán các phím nặng trên nhiều phân vùng. Trong bài viết này, tôi sẽ hướng dẫn bạn thông qua điều này với một ví dụ thực tế.
Làm thế nào Salting giải quyết các vấn đề trượt dữ liệu
Bằng cách thêm arandomlyĐiều này làm cho phân phối dữ liệu đồng đều hơn và phân phối tải trên nhiều công nhân hơn, thay vì gửi hầu hết dữ liệu cho một công nhân và để lại những người khác trống rỗng.
Lợi ích của Salting
-
Reduced Skew: Spreads data evenly across partitions, preventing a few workers overload and improves utilization.
-
Improved Performance: Speeds up joins and aggregations by balancing the workload.
-
Avoids Resource Contention: Reduces the risk of out-of-memory errors caused by large, uneven partitions.
Khi nào nên sử dụng Salting
Trong quá trình sáp nhập hoặc tổng hợp với các phím bị lệch, sử dụng salting khi bạn nhận thấy thời gian shuffle dài hoặc lỗi người thực thi do sai lệch dữ liệu. Nó cũng hữu ích trong các ứng dụng phát trực tuyến thời gian thực nơi phân vùng ảnh hưởng đến hiệu quả xử lý dữ liệu, hoặc khi hầu hết người lao động trống rỗng trong khi một số bị mắc kẹt trong trạng thái chạy.
Lời bài hát Salting Example in Scala
Hãy tạo ra một số dữ liệu với mộtunbalancedChúng ta có thể giả định rằng có hai tập dữ liệu mà chúng ta cần tham gia: một là một tập dữ liệu lớn, và tập dữ liệu nhỏ khác.
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
// Simulated large dataset with skew
val largeDF = Seq(
(1, "txn1"), (1, "txn2"), (1, "txn3"), (2, "txn4"), (3, "txn5")
).toDF("customer_id", "transaction")
// Small dataset
val smallDF = Seq(
(1, "Ahmed"), (2, "Ali"), (3, "Hassan")
).toDF("customer_id", "name")
Chúng ta hãy thêm cột muối vào các tập dữ liệu lớn mà chúng ta sử dụngrandomizationđể phân tán các giá trị của khóa lớn thành các phân vùng nhỏ hơn
// Step 1: create a salting key in the large dataset
val numBuckets = 3
val saltedLargeDF = largeDF.
withColumn("salt", (rand() * numBuckets).cast("int")).
withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt"))
saltedLargeDF.show()
+-----------+-----------+----+------------------+
|customer_id|transaction|salt|salted_customer_id|
+-----------+-----------+----+------------------+
| 1| txn1| 1| 1_1|
| 1| txn2| 1| 1_1|
| 1| txn3| 2| 1_2|
| 2| txn4| 2| 2_2|
| 3| txn5| 0| 3_0|
+-----------+-----------+----+------------------+
Để đảm bảo chúng tôi bao gồm tất cả các phím muối ngẫu nhiên có thể trong các tập dữ liệu lớn, chúng tôi cầnexplodetập dữ liệu nhỏ với tất cả các giá trị muối có thể
// Step 2: Explode rows in smallDF for possible salted keys
val saltedSmallDF = (0 until numBuckets).toDF("salt").
crossJoin(smallDF).
withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt"))
saltedSmallDF.show()
+----+-----------+------+------------------+
|salt|customer_id| name|salted_customer_id|
+----+-----------+------+------------------+
| 0| 1| Ahmed| 1_0|
| 1| 1| Ahmed| 1_1|
| 2| 1| Ahmed| 1_2|
| 0| 2| Ali| 2_0|
| 1| 2| Ali| 2_1|
| 2| 2| Ali| 2_2|
| 0| 3|Hassan| 3_0|
| 1| 3|Hassan| 3_1|
| 2| 3|Hassan| 3_2|
+----+-----------+------+------------------+
Bây giờ chúng ta có thể dễ dàng kết nối hai tập dữ liệu
// Step 3: Perform salted join
val joinedDF = saltedLargeDF.
join(saltedSmallDF, Seq("salted_customer_id", "customer_id"), "inner").
select("customer_id", "transaction", "name")
joinedDF.show()
+-----------+-----------+------+
|customer_id|transaction| name|
+-----------+-----------+------+
| 1| txn2| Ahmed|
| 1| txn1| Ahmed|
| 1| txn3| Ahmed|
| 2| txn4| Ali|
| 3| txn5|Hassan|
+-----------+-----------+------+
Lấy ví dụ trong Python
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, rand, lit, concat
from pyspark.sql.types import IntegerType
# Simulated large dataset with skew
largeDF = spark.createDataFrame([
(1, "txn1"), (1, "txn2"), (1, "txn3"), (2, "txn4"), (3, "txn5")
], ["customer_id", "transaction"])
# Small dataset
smallDF = spark.createDataFrame([
(1, "Ahmed"), (2, "Ali"), (3, "Hassan")
], ["customer_id", "name"])
# Step 1: create a salting key in the large dataset
numBuckets = 3
saltedLargeDF = largeDF.withColumn("salt", (rand() * numBuckets).cast(IntegerType())) \
.withColumn("salted_customer_id", concat(col("customer_id"), lit("_"), col("salt")))
# Step 2: Explode rows in smallDF for possible salted keys
salt_range = spark.range(0, numBuckets).withColumnRenamed("id", "salt")
saltedSmallDF = salt_range.crossJoin(smallDF) \
.withColumn("salted_customer_id", concat(col("customer_id"), lit("_"), col("salt")))
# Step 3: Perform salted join
joinedDF = saltedLargeDF.join(
saltedSmallDF,
on=["salted_customer_id", "customer_id"],
how="inner"
).select("customer_id", "transaction", "name")
ghi chép
- Mã này sử dụng spark.range(...) để bắt chước Scala (0 cho đến khi numBuckets).toDF ("muối").
- Các biểu thức cột được xử lý bằng cách sử dụng col (...), lit (...), và concat (...).
- Các cast to integer sử dụng .cast(IntegerType()).
Lời bài hát: ChoosenumBuckets
Số lượng
- Nếu bạn đặt numBuckets = 100, mỗi phím có thể được chia thành 100 phân vùng phụ. Tuy nhiên, hãy cẩn thận vì sử dụng quá nhiều buckets có thể làm giảm hiệu suất, đặc biệt là đối với các phím có ít dữ liệu. Luôn kiểm tra các giá trị khác nhau dựa trên hồ sơ lệch của tập dữ liệu của bạn.
- If you know how to identify the skewed keys, then you can apply the salting for those keys only, and set the salting for other keys as literal
0
, e.x.-
// Step 1: create a salting key in the large dataset val numBuckets = 3 val saltedLargeDF = largeDF. withColumn("salt", when($"customer_id" === 1, (rand() * numBuckets).cast("int")).otherwise(lit(0))). withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt")) // Step 2: Explode rows in smallDF for possible salted keys val saltedSmallDF = (0 until numBuckets).toDF("salt"). crossJoin(smallDF.filter($"customer_id" === 1)). select("customer_id", "salt", "name"). union(smallDF.filter($"customer_id" =!= 1).withColumn("salt", lit(0)).select("customer_id", "salt", "name")). withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt"))
-
Rule of Thumb:Bắt đầu nhỏ (ví dụ, 10-20) và tăng dần dựa trên kích thước shuffle quan sát và thời gian chạy nhiệm vụ.
Suy nghĩ cuối cùng
Tác dụng và cách sử dụng: Tác dụng và cách sử dụng: Tác dụng và cách sử dụng: Tác dụng: Tác dụng: Tác dụng: Tác dụng: Tác dụng: Tác dụng: Tác dụng: Tác dụng: Tác dụng: Tác dụng: Tác dụng: Tác dụng:SKEWED JOIN
Với điều chỉnh và giám sát chính xác, kỹ thuật này có thể làm giảm đáng kể thời gian thực hiện công việc trên các tập dữ liệu bị biến dạng cao.
Ban đầu được xuất bản tại https://practical-software.com vào ngày 11 tháng 5 năm 2025.
Ban đầu được xuất bản tại