本文最后更新于 2021-08-05 11:42:59
RDD函数中的传递
Spark 进行编程的时候, 初始化工作是在 driver端完成的, 而实际的运行程序是在executor端进行的. 所以就涉及到了进程间的通讯, 数据是需要序列化的.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| object SerDemo { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("SerDemo").setMaster("local[*]") val sc = new SparkContext(conf) val rdd: RDD[String] = sc.parallelize(Array("hello world", "hello atguigu", "atguigu", "hahah"), 2) val searcher = new Searcher("hello") val result: RDD[String] = searcher.getMatchedRDD1(rdd) result.collect.foreach(println) } }
class Searcher(val query: String){ def isMatch(s : String) ={ s.contains(query) } def getMatchedRDD1(rdd: RDD[String]) ={ rdd.filter(isMatch) } def getMatchedRDD2(rdd: RDD[String]) ={ rdd.filter(_.contains(query)) } def getMatchedRDD3(rdd: RDD[String]) ={ val q = query rdd.filter(_.contains(q)) } }
|
kryo序列化框架
Java 的序列化比较重, 能够序列化任何的类. 比较灵活,但是相当的慢, 并且序列化后对象的体积也比较大.
Spark 出于性能的考虑, 支持另外一种序列化机制: kryo (2.0开始支持). kryo 比较快和简洁.(速度是Serializable的10倍). 想获取更好的性能应该使用 kryo 来序列化.
从2.0开始, Spark 内部已经在使用 kryo 序列化机制: 当 RDD 在 Shuffle数据的时候, 简单数据类型, 简单数据类型的数组和字符串类型已经在使用 kryo 来序列化.
有一点需要注意的是: 即使使用 kryo 序列化, 也要继承 Serializable 接口.
1 2 3 4 5 6 7 8
| val conf: SparkConf = new SparkConf() .setAppName("SerDemo") .setMaster("local[*]") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .registerKryoClasses(Array(classOf[Searcher]))
|