跳至主要內容

大数据Shuffle原理与实践

fatSheep大约 3 分钟

Shuffle概述

MapReduce概述

  • 阶段:Map、Shuffle、Reduce

Map阶段

单机上,针对一小块数据的计算过程

Shuffle阶段

在map阶段的基础上,进行数据移动,为后续的reduce阶段做准备。

Reduce阶段

reduce阶段,对移动后的数据进行处理,依然是在单机上处理一小份数据

Shuffle阶段为什么对性能非常重要

  • M * R次网络连接
  • 大量的数据移动
  • 数据丢失风险
  • 可能存在大量的排序操作
  • 大量的数据序列化,反序列化操作
  • 数据压缩

小结

在大数据场景下,数据shuffle表示了不同分区数据交换的过程,不同的shuffle策略性能差异较大。 目前在各个引擎中shuffle都是优化的重点,在spark框架中,shuffle是支撑spark进行大规模复杂数据处理的基石。

Shuffle算子

Spark中会产生shuffle的算子大概分为四类

repartitionByKeyjoinDistinct
coalescegroupByKeycogroupdistinct
repartitionreduceByKeyjoin
aggregateByKeyleftOuterJoin
combineByKeyintersection
sortByKeysubtract
sortBysubtractByKey

Shuffle算子应用

val text = sc.textFile("mytextfile.txt")
val counts = text
  .flatMap(line => line.split(" "))
  .map(word => (word,1))
  .reduceByKey(_+_)
counts.collect

Spark对Shuffle的抽象

  • 窄依赖:父RDD的每个分片至多被子 RDD中的一个分片所依赖
  • 宽依赖:父RDD中的分片可能被子 RDD中的多个分片所依赖

算子内部的依赖关系

  • Shuffle Dependency
    • 创建会产生shuffle的RDD时,RDD会创建Shuffle Dependency来描述Shuffle相关的信息
    • 构造函数
      • A single key-value pair RDD, i.e. RDD[Product2[K, V]],
      • Partitioner (available as partitioner property),
      • Serializer,
      • Optional key ordering (of Scala’s scala.math.Ordering type),
      • Optional Aggregator,
      • mapSideCombine flag which is disabled (i.e. false) by default.
  • Partitioner
    • 用来将record映射到具体的partition的方法
    • 接口
      • numberPartitions
      • getPartition
  • Aggregator
    • 在map侧合并部分record的函数
    • 接口
      • createCombiner:只有一个value的时候初始化的方法
      • mergeValue:合并一个value到Aggregator中
      • mergeCombiners:合并两个Aggregator

Shuffle过程

spark中的shuffle变迁过程

  • HashShuffle
    • 优点:不需要排序
    • 缺点:打开,创建的文件过多
  • SortShuffle
    • 优点:打开的文件少、支持map-side combine
    • 缺点:需要排序
  • TungstenSortShuffle
    • 优点:更快的排序效率,更高的内存利用效率
    • 缺点:不支持map-side combine

Register Shuffle

  • 由action算子触发DAG Scheduler进行shuffle register
  • Shuffle Register会根据不同的条件决定注册不同的ShuffleHandle

Shuffle参数优化

  • spark.default.parallelism && spark.sql.shuffle.partitions
  • spark.hadoopRDD.ignoreEmptySplits
  • spark.hadoop.mapreduce.input.fileinputformat.split.minsize
  • spark.sql.file.maxPartitionBytes
  • spark.sql.adaptive.enabled && spark.sql.adaptive.shuffle.targetPostShuffleInputSize
  • spark.reducer.maxSizeInFlight
  • spark.reducer.maxReqsInFlight
  • spark.reducer.maxBlocksInFlightPerAddress

Shuffle倾斜优化

  • 倾斜影响
    • 作业运行时间变长
    • Task OOM导致作业失败

常见地倾斜处理办法

  • 提高并行度
    • 优点:足够简单
    • 缺点: 只缓解、不根治
  • AQE Skew Join
    • AQE根据shuffle文件统计数据自动检测倾斜数据,将那些倾斜的分区打散成小的子分区,然后各自进行join。

Push Shuffle

  • 上一部分所讲的shuffle过程存在哪些问题?
    • 数据存储在本地磁盘,没有备份
    • IO 并发:大量 RPC 请求(M*R)
    • IO 吞吐:随机读、写放大(3X)
    • GC 频繁,影响 NodeManager
  • 为了优化该问题,有很多公司都做了思路相近地优化,push shuffle
  • Magnet主要流程 主要为边写边push的模式,在原有的shuffle基础上尝试push聚合数据,但并不强制完成,读取时优先读取push聚合的结果,对于没有来得及完成聚合或者聚合失败的情况,则fallback到原模式。

参考: 【大数据专场 学习资料二】第四届字节跳动青训营open in new window