1,695 lecturas
1,695 lecturas

Cómo corregir el desvío de datos en Apache Spark con la técnica de salto

por hacker3208...7m2025/06/27
Read on Terminal Reader

Demasiado Largo; Para Leer

Aprenda a corregir el desvío de datos en Apache Spark utilizando la técnica de salado para mejorar el rendimiento y equilibrar las particiones en Scala y PySpark.
featured image - Cómo corregir el desvío de datos en Apache Spark con la técnica de salto
Islam Elbanna HackerNoon profile picture
0-item
1-item

Cuando trabajamos con grandes conjuntos de datos enApache SparkUn problema de rendimiento común esdata skewEsto ocurre cuando algunas clavesdominateLa distribución de los datos, que conduce aunevenparticiones y consultas lentas. Esto ocurre principalmente durante las operaciones que requierenshufflingcomojoinsO incluso regularesaggregations.


Una forma práctica de reducir el desgaste essalting, que implica la difusión artificial de claves pesadas en múltiples particiones. En este post, te guiaré a través de esto con un ejemplo práctico.


Cómo Salting resuelve problemas de escape de datos

Al agregar arandomlygenerado número a la clave de unión y luego uniéndose sobre esta clave combinada, podemos distribuir las claves grandes de manera más uniforme. Esto hace que la distribución de los datos sea más uniforme y distribuye la carga a más trabajadores, en lugar de enviar la mayor parte de los datos a un trabajador y dejar a los demás inactivos.

Beneficios del salado

  • Reduced Skew: Spreads data evenly across partitions, preventing a few workers overload and improves utilization.

  • Improved Performance: Speeds up joins and aggregations by balancing the workload.

  • Avoids Resource Contention: Reduces the risk of out-of-memory errors caused by large, uneven partitions.


Cuándo usar salsa

Durante las juntas o agregaciones con teclas distorsionadas, utilice saltar cuando observe largos tiempos de ruido o fallas del ejecutor debido a la desviación de datos. También es útil en aplicaciones de streaming en tiempo real donde la partición afecta a la eficiencia del procesamiento de datos, o cuando la mayoría de los trabajadores están vacíos mientras algunos están atrapados en un estado de ejecución.


Un ejemplo en la escala

Vamos a generar algunos datos con ununbalancedPodemos suponer que hay dos conjuntos de datos que necesitamos unir: uno es un conjunto de datos grande, y el otro es un conjunto de datos pequeño.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

// Simulated large dataset with skew
val largeDF = Seq(
  (1, "txn1"), (1, "txn2"), (1, "txn3"), (2, "txn4"), (3, "txn5")
).toDF("customer_id", "transaction")

// Small dataset
val smallDF = Seq(
  (1, "Ahmed"), (2, "Ali"), (3, "Hassan")
).toDF("customer_id", "name")


Añadamos la columna salada a los grandes conjuntos de datos, que utilizamosrandomizationpara distribuir los valores de la clave grande en particiones más pequeñas

// Step 1: create a salting key in the large dataset
val numBuckets = 3
val saltedLargeDF = largeDF.
    withColumn("salt", (rand() * numBuckets).cast("int")).
    withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt"))

saltedLargeDF.show()
+-----------+-----------+----+------------------+
|customer_id|transaction|salt|salted_customer_id|
+-----------+-----------+----+------------------+
|          1|       txn1|   1|               1_1|
|          1|       txn2|   1|               1_1|
|          1|       txn3|   2|               1_2|
|          2|       txn4|   2|               2_2|
|          3|       txn5|   0|               3_0|
+-----------+-----------+----+------------------+


Para asegurarnos de que cubrimos todas las posibles claves saladas aleatorias en los grandes conjuntos de datos, necesitamosexplodeEl pequeño conjunto de datos con todos los posibles valores salados

// Step 2: Explode rows in smallDF for possible salted keys
val saltedSmallDF = (0 until numBuckets).toDF("salt").
    crossJoin(smallDF).
    withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt")) 

saltedSmallDF.show()
+----+-----------+------+------------------+
|salt|customer_id|  name|salted_customer_id|
+----+-----------+------+------------------+
|   0|          1| Ahmed|               1_0|
|   1|          1| Ahmed|               1_1|
|   2|          1| Ahmed|               1_2|
|   0|          2|   Ali|               2_0|
|   1|          2|   Ali|               2_1|
|   2|          2|   Ali|               2_2|
|   0|          3|Hassan|               3_0|
|   1|          3|Hassan|               3_1|
|   2|          3|Hassan|               3_2|
+----+-----------+------+------------------+


Ahora podemos unir fácilmente los dos conjuntos de datos

// Step 3: Perform salted join
val joinedDF = saltedLargeDF.
    join(saltedSmallDF, Seq("salted_customer_id", "customer_id"), "inner").
    select("customer_id", "transaction", "name")

joinedDF.show()
+-----------+-----------+------+
|customer_id|transaction|  name|
+-----------+-----------+------+
|          1|       txn2| Ahmed|
|          1|       txn1| Ahmed|
|          1|       txn3| Ahmed|
|          2|       txn4|   Ali|
|          3|       txn5|Hassan|
+-----------+-----------+------+


Un ejemplo en Python

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, rand, lit, concat
from pyspark.sql.types import IntegerType

# Simulated large dataset with skew
largeDF = spark.createDataFrame([
    (1, "txn1"), (1, "txn2"), (1, "txn3"), (2, "txn4"), (3, "txn5")
], ["customer_id", "transaction"])

# Small dataset
smallDF = spark.createDataFrame([
    (1, "Ahmed"), (2, "Ali"), (3, "Hassan")
], ["customer_id", "name"])

# Step 1: create a salting key in the large dataset
numBuckets = 3
saltedLargeDF = largeDF.withColumn("salt", (rand() * numBuckets).cast(IntegerType())) \
    .withColumn("salted_customer_id", concat(col("customer_id"), lit("_"), col("salt")))

# Step 2: Explode rows in smallDF for possible salted keys
salt_range = spark.range(0, numBuckets).withColumnRenamed("id", "salt")
saltedSmallDF = salt_range.crossJoin(smallDF) \
    .withColumn("salted_customer_id", concat(col("customer_id"), lit("_"), col("salt")))

# Step 3: Perform salted join
joinedDF = saltedLargeDF.join(
    saltedSmallDF,
    on=["salted_customer_id", "customer_id"],
    how="inner"
).select("customer_id", "transaction", "name")

Notas

  • Este código utiliza spark.range(...) para imitar Scala (0 hasta numBuckets).toDF("salt").
  • Las expresiones de columna se manejan utilizando col(...), lit(...), y concat(...).
  • El cast a integer utiliza .cast (IntegerType()).


Tipo de Tuning: ElegirNúmeros

Números
  • Si establece numBuckets = 100, cada tecla se puede dividir en 100 sub-particiones. Sin embargo, tenga cuidado porque el uso de demasiados buckets puede disminuir el rendimiento, especialmente para las teclas con pocos datos.
  • If you know how to identify the skewed keys, then you can apply the salting for those keys only, and set the salting for other keys as literal 0, e.x.

    •     // Step 1: create a salting key in the large dataset
              val numBuckets = 3
              val saltedLargeDF = largeDF.
                  withColumn("salt", when($"customer_id" === 1, (rand() * numBuckets).cast("int")).otherwise(lit(0))).
                  withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt"))
        
          // Step 2: Explode rows in smallDF for possible salted keys
              val saltedSmallDF = (0 until numBuckets).toDF("salt").
                  crossJoin(smallDF.filter($"customer_id" === 1)).
                  select("customer_id", "salt", "name").
                  union(smallDF.filter($"customer_id" =!= 1).withColumn("salt", lit(0)).select("customer_id", "salt", "name")).
                  withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt"))
      


Rule of Thumb:Comience pequeño (por ejemplo, 10-20) y aumente gradualmente en función de los tamaños observados y el tiempo de ejecución de tareas.


Pensamientos finales

El salto es un método eficaz y sencillo para gestionar el desvío en Apache Spark cuando la partición tradicional o las pistas (SKEWED JOINCon el ajuste y el seguimiento adecuados, esta técnica puede reducir significativamente los tiempos de ejecución de tareas en conjuntos de datos altamente distorsionados.


Publicado originalmente en https://practical-software.com el 11 de mayo de 2025.

Originalmente publicado enhttps://practical-software.comEl 11 de mayo de 2025.https://practical-software.comhttps://practical-software.com

Trending Topics

blockchaincryptocurrencyhackernoon-top-storyprogrammingsoftware-developmenttechnologystartuphackernoon-booksBitcoinbooks