07RDD编程
本文最后更新于 2024-05-27 01:15:19
RDD编程
创建RDD
parallelize
1 | |
makeRdd
1 | |
- 一旦 RDD 创建成功, 就可以通过并行的方式去操作这个分布式的数据集了.
- parallelize和makeRDD还有一个重要的参数就是把数据集切分成的分区数.
- Spark 会为每个分区运行一个任务(task). 正常情况下, Spark 会自动的根据你的集群来设置分区数
外部创建RDD
Spark 也可以从任意 Hadoop 支持的存储数据源来创建分布式数据集.
可以是本地文件系统, HDFS, Cassandra, HBase, Amazon S3 等等.
Spark 支持 文本文件, SequenceFiles, 和其他所有的 Hadoop InputFormat.
1 | |
- url可以是本地文件系统文件, hdfs://…, s3n://…等等
- 如果是使用的本地文件系统的路径, 则必须每个节点都要存在这个路径
- 所有基于文件的方法, 都支持目录, 压缩文件, 和通配符(
*). 例如: textFile(“/my/directory”), textFile(“/my/directory/*.txt”), and textFile(“/my/directory/*.gz”). - textFile还可以有第二个参数, 表示分区数. 默认情况下, 每个块对应一个分区.(对 HDFS 来说, 块大小默认是 128M). 可以传递一个大于块数的分区数, 但是不能传递一个比块数小的分区数.
RDD转换(单value)
在 RDD 上支持 2 种操作:transformation和action
在 Spark 中几乎所有的transformation操作都是懒执行的(lazy), 也就是说transformation操作并不会立即计算他们的结果, 而是记住了这个操作.
只有当通过一个action来获取结果返回给驱动程序的时候这些转换操作才开始计算.
但是我们可以通过persist (or cache)方法来持久化一个 RDD 在内存中, 也可以持久化到磁盘上, 来加快访问速度.
map
对每个元素都执行map操作
1 | |
mapPartitions
mapPartitions 传入参数是一个迭代器,包含一个分区的数据,有几个分区mapPartitions就执行几次
返回也是一个迭代器
1 | |
mapPartitionsWithIndex
mapPartitionsWithIndex可以得到分区序号
1 | |
flatMap
将集合展开,将集合拍平
1 | |
filter
1 | |
glom
每一个分区的元素合并成一个数组,形成新的 RDD 类型是RDD[Array[T]]
结果例如:Array(Array(10), Array(20, 30), Array(40), Array(50, 60))
1 | |
groupBy
分组
This operation may be very expensive. If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using PairRDDFunctions.aggregateByKey or PairRDDFunctions.reduceByKey will provide much better performance
1 | |
sample
随机抽样 sample(withReplacement, fraction, seed)
- 以指定的随机种子随机抽样出比例为fraction的数据,(抽取到的数量是: size * fraction). 需要注意的是得到的结果并不能保证准确的比
- withReplacement表示是抽出的数据是否放回,true为有放回的抽样,false为无放回的抽样. 放回表示数据有可能会被重复抽取到, false 则不可能重复抽取到. 如果是false, 则fraction必须是:[0,1], 是 true 则大于等于0,大于1表示几倍就可以了.
- seed用于指定随机数生成器种子。 一般用默认的, 或者传入当前的时间戳
- 可以用于判断数据倾斜
distinct
对 RDD 中元素执行去重操作. 参数表示任务的数量.默认值和分区数保持一致
1 | |
coalesce
缩减分区数到指定的数量,用于大数据集过滤后,提高小数据集的执行效率
默认不进行shuffle,第二个参数可以控制是否shuffle,第三个参数为分区器(默认hash)
默认不能增加分区,加上shuffle可以增加
1 | |
repartition
根据新的分区数, 重新 shuffle 所有的数据, 这个操作总会通过网络.
新的分区数相比以前可以多, 也可以少
coalesce重新分区,可以选择是否进行shuffle过程。由参数shuffle: Boolean = false/true决定。
repartition实际上是调用的coalesce,进行shuffle
sortBy
排序,默认正序
sortBy(func,[ascending], [numTasks]) ascending是否升序
1 | |
pipe
pipe(command, [envVars])
管道,针对每个分区,把 RDD 中的每个数据通过管道传递给shell命令或脚本,返回输出的RDD。一个分区执行一次这个命令. 如果只有一个分区, 则执行一次命令.
脚本要放在 worker 节点可以访问到的位置
RDD转换(双value)
union
求并集. 对源 RDD 和参数 RDD 求并集后返回一个新的 RDD
不去重
1 | |
可以使用++实现相同功能
subtract
计算差集. 从原 RDD 中减去 原 RDD 和 otherDataset 中的共同的部分,otherDataset 有原RDD没有的也不计入结果
1 | |
intersection
计算交集. 对源 RDD 和参数 RDD 求交集后返回一个新的 RDD
1 | |
cartesian
计算 2 个 RDD 的笛卡尔积. 尽量避免使用
1 | |
zip
拉链操作. 需要注意的是, 在 Spark 中, 两个 RDD 的元素的数量和分区数都必须相同, 否则会抛出异常.(在 scala 中, 两个集合的长度可以不同)
1 | |
zipPartitions
分区与分区之间拉,只要求分区数相同即可。
1 | |
zipWithIndex
与下标zip,返回元组,第二个为下标,第一个为值
1 | |
RDD转换(Key-Value)

大多数的 Spark 操作可以用在任意类型的 RDD 上, 但是有一些比较特殊的操作只能用在key-value类型的 RDD 上.
这些特殊操作大多都涉及到 shuffle 操作, 比如: 按照 key 分组(group), 聚集(aggregate)等.
partitionBy
对 pairRDD 进行分区操作,如果原有的 partionRDD 的分区器和传入的分区器相同, 则返回原 pairRDD,否则会生成 ShuffleRDD,即会产生 shuffle 过程
产生shuffle就相当于一个新的阶段(stage),一个阶段内都是并行执行,互相不影响
1 | |
reduceByKey
将相同key的value聚合到一起
在shuffle之前有combine(预聚合)操作(所有的聚合算子都有预聚合,调用底层函数的时候有默认参数mapSideConbine=true)
1 | |
groupByKey
按照key进行分组.
只能用于keyValue类型,groupBy可以用于任意类型
1 | |
foldByKey
按key进行折叠
1 | |
aggregateByKey
相当于foldByKey和groupBy的结合
在shuffle前(分区内)进行foldByKey
在shuffle后(分区间)进行groupBy
能够实现分区内和分区间的执行逻辑不同(foldByKey和groupByKey都是相同的)
1 | |
1 | |
combineByKey
在aggregateByKey基础上,初始值根据第一个key的value生成
1 | |
4种聚合函数的区别
- reduceByKey
- 分区内聚合和分区间聚合逻辑相同
- foldByKey
- 分区内和分区间聚合逻辑相同,但是有初始值,返回值value类型与初始值直接相关
- aggregateByKey
- 有初始值,分区内和分区间逻辑可以不同
- combineByKey
- 可以根据key的第一个value计算初始值(类型可以变化),分区内和分区间逻辑可以不同
4个函数底层都调用了combineByKeyWithClassTag
sortByKey
根据key排序,可以指定升序降序,默认升序
1 | |
mapValues
…..
cogroup
1 | |
join
内连接:
在类型为(K,V)和(K,W)的 RDD 上调用,返回一个相同 key 对应的所有元素对在一起的(K,(V,W))的RDD
1 | |
如果某一个 RDD 有重复的 Key, 则会分别与另外一个 RDD 的相同的 Key进行组合
也支持外连接: leftOuterJoin, rightOuterJoin, and fullOuterJoin.
repartitionAndSortWithinPartitions
分区,并且分区后key有序
行动算子
collect
以数组的形式返回RDD种的所有元素
将分区间的的所有数据拉到driver端
count
返回RDD中元素的个数
take(n)
以数组形式返回RDD中前n个元素
数据也是拉到driver端
first
取出第一个元素,取出的结果不是数组的形式。
takeOrdered
takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
返回排序后的前 n 个元素, 默认是升序排列.
countByKey
mapValues(_ => 1L).reduceByKey(_ + _).collect()
针对(K,V)类型的 RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。
可以用来查看数据是否倾斜
reduce
数聚集 RDD 中的所有元素,先聚合分区内数据,再聚合分区间数据
1 | |
fold
折叠
fold(zeroValue: T)(op: (T, T) => T): T
aggregate
分区内和分区间聚合逻辑可以不同
有初始值
初始零值在分区内聚合参与一次,在分区间聚合的时候还要参与一次
aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
foreach
每个函数是在 Executor 上执行的, 不是在 driver 端执行的.
foreachPartition
每个分区执行一次
save相关算子
saveAsTextFile
saveAsObjectFile
saveAsSequenceFile