13累加器&广播

本文最后更新于 2021-08-05 11:42:59

累加器&广播

累加器

累加器用来对信息进行聚合,通常在向 Spark 传递函数时,比如使用 map() 函数或者用 filter() 传条件时,可以使用驱动器程序中定义的变量,但是集群中运行的每个任务都会得到这些变量的一份新的副本,所以更新这些副本的值不会影响驱动器中的对应变量。

如果我们想实现所有分片处理时更新共享变量的功能,那么累加器可以实现我们想要的效果。

累加器是一种变量, 仅仅支持“add”, 支持并发. 累加器用于去实现计数器或者求和.

累加器的更新操作最好放在action中, Spark 可以保证每个 task 只执行一次. 如果放在 transformations 操作中则不能保证只更新一次.有可能会被重复执行

内置累加器

longAccumulator

doubleAccumulator

CollectionAccumulator

1
2
3
4
5
6
7
8
9
10
11
12
object AccDemo1 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Practice").setMaster("local[2]")
val sc = new SparkContext(conf)
val rdd: RDD[String] = sc.textFile("file://" + ClassLoader.getSystemResource("words.txt").getPath)
// 得到一个 Long 类型的累加器. 将从 0 开始累加
val emptyLineCount: LongAccumulator = sc.longAccumulator
rdd.foreach(s => if (s.trim.length == 0) emptyLineCount.add(1))
println(emptyLineCount.value)
}
}

自定义累加器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class MyAcc extends AccumulatorV2[String, java.util.List[String]] {
private val _list: java.util.List[String] = Collections.synchronizedList(new ArrayList[String]())
override def isZero: Boolean = _list.isEmpty

override def copy(): AccumulatorV2[String, util.List[String]] = {
val newAcc = new MyAcc
_list.synchronized {
newAcc._list.addAll(_list)
}
newAcc
}

override def reset(): Unit = _list.clear()

override def add(v: String): Unit = _list.add(v)

override def merge(other: AccumulatorV2[String, util.List[String]]): Unit =other match {
case o: MyAcc => _list.addAll(o.value)
case _ => throw new UnsupportedOperationException(
s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
}

override def value: util.List[String] = java.util.Collections.unmodifiableList(new util.ArrayList[String](_list))
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def main(args: Array[String]): Unit = {
val pattern = """^\d+$"""
val conf = new SparkConf().setAppName("Practice").setMaster("local[2]")
val sc = new SparkContext(conf)
// 统计出来非纯数字, 并计算纯数字元素的和
val rdd1 = sc.parallelize(Array("abc", "a30b", "aaabb2", "60", "20"))

val acc = new MyAcc
//必须注册
sc.register(acc)
val rdd2: RDD[Int] = rdd1.filter(x => {
val flag: Boolean = x.matches(pattern)
if (!flag) acc.add(x)
flag
}).map(_.toInt)
println(rdd2.reduce(_ + _))
println(acc.value)
}

广播变量

广播变量在每个节点上保存一个只读的变量的缓存, 而不用给每个 task 来传送一个 copy.

相当于每个Executor只有一份副本,task共享

1
2
3
val broadcastVar = sc.broadcast(Array(1, 2, 3))

broadcastVar.value

13累加器&广播
https://jiajun.xyz/2021/07/18/bigdata/10spark/13累加器&广播/
作者
Lambda
发布于
2021年7月18日
更新于
2021年8月5日
许可协议