本文最后更新于 2021-08-05 11:42:59
累加器&广播 累加器 累加器用来对信息进行聚合,通常在向 Spark 传递函数时,比如使用 map() 函数或者用 filter() 传条件时,可以使用驱动器程序中定义的变量,但是集群中运行的每个任务都会得到这些变量的一份新的副本,所以更新这些副本的值不会影响驱动器中的对应变量。
如果我们想实现所有分片处理时更新共享变量的功能,那么累加器可以实现我们想要的效果。
累加器是一种变量, 仅仅支持“add”, 支持并发. 累加器用于去实现计数器或者求和.
累加器的更新操作最好放在action中 , Spark 可以保证每个 task 只执行一次. 如果放在 transformations 操作中则不能保证只更新一次.有可能会被重复执行
内置累加器 longAccumulator
doubleAccumulator
CollectionAccumulator
1 2 3 4 5 6 7 8 9 10 11 12 object AccDemo1 { def main (args: Array [String ]): Unit = { val conf = new SparkConf ().setAppName("Practice" ).setMaster("local[2]" ) val sc = new SparkContext (conf) val rdd: RDD [String ] = sc.textFile("file://" + ClassLoader .getSystemResource("words.txt" ).getPath) val emptyLineCount: LongAccumulator = sc.longAccumulator rdd.foreach(s => if (s.trim.length == 0 ) emptyLineCount.add(1 )) println(emptyLineCount.value) } }
自定义累加器 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 class MyAcc extends AccumulatorV2 [String , java.util.List [String ]] { private val _list: java.util.List [String ] = Collections .synchronizedList(new ArrayList [String ]()) override def isZero : Boolean = _list.isEmpty override def copy (): AccumulatorV2 [String , util.List [String ]] = { val newAcc = new MyAcc _list.synchronized { newAcc._list.addAll(_list) } newAcc } override def reset (): Unit = _list.clear() override def add (v: String ): Unit = _list.add(v) override def merge (other: AccumulatorV2 [String , util.List [String ]]): Unit =other match { case o: MyAcc => _list.addAll(o.value) case _ => throw new UnsupportedOperationException ( s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName} " ) } override def value : util.List [String ] = java.util.Collections .unmodifiableList(new util.ArrayList [String ](_list)) }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 def main (args: Array [String ]): Unit = { val pattern = """^\d+$""" val conf = new SparkConf ().setAppName("Practice" ).setMaster("local[2]" ) val sc = new SparkContext (conf) val rdd1 = sc.parallelize(Array ("abc" , "a30b" , "aaabb2" , "60" , "20" )) val acc = new MyAcc sc.register(acc) val rdd2: RDD [Int ] = rdd1.filter(x => { val flag: Boolean = x.matches(pattern) if (!flag) acc.add(x) flag }).map(_.toInt) println(rdd2.reduce(_ + _)) println(acc.value) }
广播变量 广播变量在每个节点上保存一个只读的变量的缓存, 而不用给每个 task 来传送一个 copy.
相当于每个Executor只有一份副本,task共享
1 2 3 val broadcastVar = sc.broadcast(Array (1 , 2 , 3 )) broadcastVar.value