02Spark参数

Spark 参数配置

https://spark.apache.org/docs/latest/configuration.html

https://spark.apache.org/docs/latest/sql-performance-tuning.html

Name Default Meaning
spark.app.name None 应用名
spark.driver.cores 1 driver 核心数
spark.driver.memory 1g driver内存
spark.driver.memoryOverhead driverMemory * spark.driver.memoryOverheadFactor, with minimum of 384 driver预留内存大小
spark.driver.memoryOverheadFactor 0.1 预留内存占比因子
spark.executor.memory 1g executor内存
spark.executor.memoryOverhead executorMemory * spark.executor.memoryOverheadFactor, with minimum of 384 executor预留内存大小
spark.executor.memoryOverheadFactor 0.1 预留内存占比因子
spark.driver.maxResultSize 1g 返回每个分区序列化结果到driver的最大大小,适用于将数据返回driver的算子
spark.reducer.maxSizeInFlight 48m shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据
spark.reducer.maxReqsInFlight Int.MaxValue 限制在任何给定点获取块的远程请求数量,当集群过大是避免负载过高
spark.reducer.maxBlocksInFlightPerAddress Int.MaxValue 限制了每个 Reduce 任务从给定主机端口获取的远程块数。当在一次获取中或同时从给定地址请求大量块时,这可能会使服务执行器或节点管理器崩溃。当启用外部 shuffle 时,这对于减少节点管理器上的负载特别有用
spark.shuffle.compress true shuffle压缩 spark.io.compression.codec
spark.shuffle.file.buffer 32k 每个 shuffle 文件输出流的内存缓冲区大小,这些缓冲区可减少创建中间 shuffle 文件时进行的磁盘寻道和系统调用次数
spark.shuffle.unsafe.file.output.buffer 32k Unsafe shuffle writer 缓冲区
spark.shuffle.spill.diskWriteBufferSize 1024 * 1024 The buffer size, in bytes, to use when writing the sorted records to an on-disk file.
spark.shuffle.io.maxRetries 3 发生io相关异常的时候重试,在长时间gc或网络不稳定的情况下有用
spark.shuffle.io.numConnectionsPerPeer 1 主机之间的连接会被重用,以减少大型集群的连接积累。对于硬盘较多而主机较少的集群,这可能会导致并发量不足以饱和所有磁盘
spark.shuffle.io.preferDirectBufs true 优先堆外缓冲区
spark.shuffle.io.retryWait 5s io重试间隔
spark.shuffle.io.connectionTimeout value of spark.network.timeout 连接超时
spark.shuffle.io.connectionCreationTimeout value of spark.shuffle.io.connectionTimeout 创建连接超时
spark.shuffle.service.enabled false ESS
spark.shuffle.maxChunksBeingTransferred Long.MAX_VALUE 在 shuffle 服务上允许同时传输的最大块数
spark.shuffle.sort.bypassMergeThreshold 200 bypassMergeShuffle 启用限定的分区数
spark.shuffle.spill.compress true shuffle spill 压缩 spark.io.compression.codec
spark.shuffle.
minNumPartitionsToHighlyCompress
2000 当下游 ReduceTask 个数大于某一阈值(默认 2000),就会将MapStatus进行压缩,所有小于 accurateBlockThreshold(默认100M)的值都会被一个平均值所代替填充,可能会影响AQE
spark.shuffle.accurateBlockThreshold 100M
spark.shuffle.reduceLocality.enabled true 是否为reduce任务配置本机计算偏好
spark.shuffle.mapOutput.minSizeForBroadcast 512k executor获取mapstatus的时候当数据量较大时使用广播
spark.shuffle.detectCorrupt true 检测获取到的block中是否有损坏
spark.shuffle.detectCorrupt.useExtraMemory false 如果流被压缩或包装,那么我们可选择将前 maxBytesInFlight/3 个字节解压缩/解包到内存中,以检查该部分数据是否损坏。但即使“detectCorruptUseExtraMemory”配置已关闭,或者如果损坏发生较晚,我们仍会在稍后的流中检测到损坏
spark.shuffle.useOldFetchProtocol false
spark.shuffle.readHostLocalDisk true 如果启用(并且 spark.shuffle.useOldFetchProtocol 被禁用),则从同一主机上运行的块管理器请求的 shuffle 块将直接从磁盘读取
spark.shuffle.checksum.enabled true 是否计算 shuffle 数据的校验和。如果启用,Spark 将计算 map 输出文件内每个分区数据的校验和值,并将这些值存储在磁盘上的校验和文件中。当检测到 shuffle 数据损坏时,Spark 将尝试使用校验和文件来诊断损坏的原因(例如网络问题、磁盘问题等)。
spark.shuffle.checksum.algorithm ADLER32 ADLER32, CRC32.
spark.broadcast.compress true 广播数据压缩 spark.io.compression.codec
spark.checkpoint.compress false spark.io.compression.codec
spark.io.compression.codec lz4 lz4,lzf, snappy, and zstd
spark.serializer org.apache.spark.serializer.JavaSerializer
spark.memory.fraction 0.6 用于执行和存储的内存占总内存的比例
spark.memory.storageFraction 0.5 执行内存和存储内存的比例
spark.memory.offHeap.enabled false 是否开启堆外内存
spark.memory.offHeap.size 0 堆外内存大小 byte
spark.broadcast.blockSize 4m TorrentBroadcastFactory 中每个块的大小,值过大会影响广播并行度,值过小会影响BlockManager
spark.broadcast.checksum true enable checksum for broadcast
spark.executor.cores 1 executor的核心数
spark.default.parallelism join、reduceByKey 和 parallelize 等转换返回 RDD默认分区数
spark.locality.wait 3s 在放弃并在非本地节点上启动数据本地任务之前,等待多长时间启动该任务。相同的等待时间将用于逐步完成多个本地级别(进程本地、节点本地、机架本地,然后是任何级别),默认值已经效果较好
spark.scheduler.mode FIFO 同一个context下的调度策略 FAIR、FIFO
spark.excludeOnFailure.enabled false 禁止在失败过多任务的executor上调度任务
spark.speculation false 推测执行,不同task的执行时间可能不一样,有的task很快就执行完成了,而有的可能执行很长一段时间也没有完成,再启动一个task,完成后将另外一个kill掉,对集群中性能不一致的机器比较适用
spark.task.cpus 1 Number of cores to allocate for each task.
spark.dynamicAllocation.enabled false 动态申请资源
spark.dynamicAllocation.executorIdleTimeout 60s
spark.dynamicAllocation.cachedExecutorIdleTimeout infinity
spark.dynamicAllocation.initialExecutors spark.dynamicAllocation.minExecutors
spark.dynamicAllocation.maxExecutors infinity
spark.dynamicAllocation.minExecutors 0
spark.dynamicAllocation.executorAllocationRatio 1 默认情况下,动态分配将根据要处理的任务数量请求足够的执行器以最大化并行度。虽然这可以最大限度地减少作业的延迟,但对于小任务,此设置可能会由于执行器分配开销而浪费大量资源
spark.dynamicAllocation.schedulerBacklogTimeout 1s 如果启用了动态分配,并且待处理任务积压的时间超过此时间,则会请求新的执行器
spark.dynamicAllocation.shuffleTracking.enabled true 保持有shuffle数据的executor存活
spark.dynamicAllocation.shuffleTracking.timeout infinity
spark.sql.adaptive.advisoryPartitionSizeInBytes 67108864 shuffle分区的建议大小,用于合并小分区或切割数据倾斜分区,spark.sql.adaptive.shuffle.targetPostShuffleInputSize 3.0以前的版本为该参数
spark.sql.adaptive.autoBroadcastJoinThreshold none(spark.sql.autoBroadcastJoinThreshold) 最大能广播的大小
spark.sql.adaptive.localShuffleReader.enabled true 将 sort-merge join 转换为 broadcast-hash join 之后,尝试使用本地 shuffle reader 来读取 shuffle 数据
spark.sql.adaptive.coalescePartitions.enabled true 根据spark.sql.adaptive.advisoryPartitionSizeInBytes大小合并相邻的小分区
spark.sql.adaptive.coalescePartitions.initialPartitionNum none 如果未配置 默认 spark.sql.shuffle.partitions,初始化较高的值,然后再通过连续分许合并解决避免小任务,较高的初始值能是数据分布更加均匀
spark.sql.adaptive.coalescePartitions.minPartitionSize 1MB 合并后最小的分区大小,其值最多为 spark.sql.adaptive.advisoryPartitionSizeInBytes 的 20%。当分区合并期间忽略目标大小(这是默认情况)时,这很有用
spark.sql.adaptive.coalescePartitions.parallelismFirst true Spark 在合并连续的 shuffle 分区时会忽略 spark.sql.adaptive.advisoryPartitionSizeInBytes(默认 64MB)指定的目标大小,并且仅遵守 spark.sql.adaptive.coalescePartitions.minPartitionSize(默认 1MB)指定的最小分区大小,以最大化并行度。这是为了避免在启用自适应查询执行时出现性能下降。建议将此配置设置为 false
spark.sql.adaptive.enabled true 开启aqe优化
spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold 0 配置允许构建本地哈希映射的每个分区的最大大小(以字节为单位)。如果此值不小于 spark.sql.adaptive.advisoryPartitionSizeInBytes 且所有分区大小不大于此配置,则无论 spark.sql.join.preferSortMergeJoin 的值如何,连接选择都会优先使用 shuffled hash join 而不是 sort merge join。
spark.sql.adaptive.skewJoin.enabled true
spark.sql.adaptive.skewJoin.skewedPartitionFactor 5.0 如果分区大小大于此因子乘以中值分区大小,并且大于 spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes,则认为分区倾斜
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes 256MB 如果分区大小(以字节为单位)大于此阈值,并且大于 spark.sql.adaptive.skewJoin.skewedPartitionFactor 与中值分区大小的乘积,则认为该分区倾斜。理想情况下,此配置应设置为大于 spark.sql.adaptive.advisoryPartitionSizeInBytes。
spark.sql.adaptive.forceOptimizeSkewedJoin false 当为真时,强制启用 OptimizeSkewedJoin,这是一个自适应规则,用于优化倾斜连接以避免落后任务,即使它引入了额外的混洗。
spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled true rebalance 时候根据spark.sql.adaptive.advisoryPartitionSizeInBytes合并小分区
spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor 0.2 如果分区的大小小于此因子乘以 spark.sql.adaptive.advisoryPartitionSizeInBytes,则将在拆分期间合并分区。
spark.sql.autoBroadcastJoinThreshold 10MB
spark.sql.broadcastTimeout 300 广播加入时的广播等待时间超时(秒)。
spark.sql.cbo.enabled false
spark.sql.cbo.joinReorder.dp.star.filter false 用于控制星型查询(大表join多个小表)在动态规划连接重排序过程中的过滤器应用的配置选项。启用该选项可以优化连接顺序,提升查询性能
spark.sql.cbo.joinReorder.dp.threshold 12 动态规划算法中允许的最大连接节点数。
spark.sql.cbo.joinReorder.enabled false
spark.sql.cbo.planStats.enabled false 当为真时,逻辑计划将从目录中获取行数和列统计信息。
spark.sql.cbo.starSchemaDetection false 根据星型模式检测启用连接重新排序
spark.sql.files.maxPartitionBytes 128MB 读取文件时打包到单个分区的最大字节数。此配置仅在使用基于文件的源(例如 Parquet、JSON 和 ORC)时有效。
spark.sql.files.maxPartitionNum none 建议(不保证)的最大分割文件分区数。如果设置了该值,当初始分区数超过该值时,Spark 将重新缩放每个分区以使分区数接近该值。此配置仅在使用基于文件的源(例如 Parquet、JSON 和 ORC)时有效。
spark.sql.files.maxRecordsPerFile 0 写入单个文件的最大记录数。如果此值为零或负数,则没有限制。
spark.sql.files.minPartitionNum none 建议(不保证)的最小文件分区数。如果未设置,则默认值为 spark.sql.leafNodeDefaultParallelism。此配置仅在使用基于文件的源(例如 Parquet、JSON 和 ORC)时有效。
spark.sql.optimizer.dynamicPartitionPruning.enabled true
spark.sql.optimizer.runtime.bloomFilter.applicationSideScanSizeThreshold 10GB Bloom 过滤器应用端的聚合扫描字节大小需要超过此值才能注入 Bloom 过滤器
spark.sql.optimizer.runtime.bloomFilter.creationSideThreshold 10MB 布隆过滤器创建端计划的大小阈值。估计大小需要低于此值才能尝试注入布隆过滤器。
spark.sql.optimizer.runtime.bloomFilter.enabled true
spark.sql.optimizer.runtime.bloomFilter.expectedNumItems 1000000 运行时布隆过滤器的默认预期items
spark.sql.optimizer.runtime.bloomFilter.maxNumBits 67108864 布隆过滤器使用的最大位数
spark.sql.optimizer.runtime.bloomFilter.maxNumItems 4000000 布隆过滤器允许的最大预期items
spark.sql.optimizer.runtime.bloomFilter.numBits 8388608 布隆过滤器使用的默认位数
spark.sql.optimizer.runtime.rowLevelOperationGroupFilter.enabled true
spark.sql.optimizer.runtimeFilter.number.threshold 10 单个查询中注入的运行时过滤器(非 DPP)总数。这是为了防止驱动程序因过多的布隆过滤器而导致 OOM。
spark.sql.optimizer.runtimeFilter.semiJoinReduction.enabled false
spark.sql.parquet.aggregatePushdown false 如果为 true,聚合将被推送到 Parquet 进行优化。支持 MIN、MAX 和 COUNT 作为聚合表达式。对于 MIN/MAX,支持布尔值、整数、浮点数和日期类型。对于 COUNT,支持所有数据类型。如果任何 Parquet 文件页脚缺少统计信息,则会抛出异常。
spark.sql.parquet.filterPushdown true 启用 Parquet 过滤器下推优化
spark.sql.shuffle.partitions 200 spark.default.parallelism 配置负责控制默认RDD的partithion数,spark.sql.shuffle.partitions 执行sql或sql类算子时shuffle分区数spark.default.parallelism 主要用于控制 RDD 操作的默认并行度级别,而不是 Spark SQL,所以对于 Spark SQL 并不生效
spark.sql.shuffledHashJoinFactor 3 如果小端的数据大小乘以该因子后仍然小于大端,则可以选择 Shuffle Hash Join。
spark.sql.statistics.fallBackToHdfs false 如果为 true,则如果无法从表元数据中获得表统计信息,它将回退到 HDFS。这对于确定表是否足够小以使用广播连接非常有用。此标志仅对非分区 Hive 表有效。对于非分区数据源表,如果无法获得表统计信息,它将自动重新计算。对于分区数据源和分区 Hive 表,如果无法获得表统计信息,则为“spark.sql.defaultSizeInBytes”。
spark.sql.statistics.histogram.enabled false 在计算列统计信息时生成直方图。直方图可以提供更好的估算精度
spark.sql.statistics.size.autoUpdate.enabled false 一旦表的数据发生变化,启用表大小的自动更新。如果表的文件总数非常大,这可能会很昂贵并减慢数据更改命令的速度
spark.sql.ui.explainMode formatted 配置 Spark SQL UI 中使用的查询解释模式。值可以是 ‘simple’、’extended’、’codegen’、’cost’ 或 ‘formatted’。默认值为 ‘formatted’。

02Spark参数
https://jiajun.xyz/2024/06/26/bigdata/00something/02Spark参数/
作者
Lambda
发布于
2024年6月26日
许可协议