本文最后更新于 2021-08-05 11:42:59
转换&窗口 转换 不带状态,RDD之间没有联系
可以把Dstream的操作转换为 RDD操作
适用于 需要使用RDD的算子而Dstream中没有
类似于 forEachRDD(rdd=>{}),但是foreachRDD是行动算子(常用于把数据写出到外部系统),transform为转换算子。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 def main (args: Array [String ]): Unit = { val conf = new SparkConf ().setMaster("local[2]" ).setAppName("streamWordCount" ) val sc = new StreamingContext (conf,Seconds (3 )) val stream = sc.socketTextStream("node1" ,9999 ) val value = stream.transform(rdd => { rdd.flatMap(_.split(" " )).map((_, 1 )).reduceByKey(_ + _) }) value.print() sc.start() sc.awaitTermination() }
updateStateBykey 带状态,实现所有周期结果累计
必须有checkpoint
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 def main (args: Array [String ]): Unit = { val conf = new SparkConf ().setMaster("local[2]" ).setAppName("streamWordCount" ) val sc = new StreamingContext (conf,Seconds (3 )) val stream = sc.socketTextStream("node1" ,9999 ) sc.checkpoint("check" ) stream.flatMap(_.split(" " )).map((_,1 )).updateStateByKey((seq:Seq [Int ],opt:Option [Int ])=>{ Some (seq.sum+opt.getOrElse(0 )) }).print sc.start() sc.awaitTermination() }
Window reduceByKeyAndWindow Spark Streaming 也提供了窗口计算, 允许执行转换操作作用在一个窗口内的数据.
窗口在 DStream 上每滑动一次, 落在窗口内的那些 RDD会结合在一起, 然后在上面操作产生新的 RDD, 组成了 window DStream.
窗口时间和步长必须是一个批次的整数倍
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 def reduceByKeyAndWindow ( reduceFunc: (V , V ) => V , windowDuration: Duration ): DStream [(K , V )] def reduceByKeyAndWindow ( reduceFunc: (V , V ) => V , windowDuration: Duration , slideDuration: Duration ): DStream [(K , V )]def reduceByKeyAndWindow ( reduceFunc: (V , V ) => V , windowDuration: Duration , slideDuration: Duration , numPartitions: Int ): DStream [(K , V )] def reduceByKeyAndWindow ( reduceFunc: (V , V ) => V , windowDuration: Duration , slideDuration: Duration , partitioner: Partitioner ): DStream [(K , V )]
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 def main (args: Array [String ]): Unit = { val conf = new SparkConf ().setMaster("local[2]" ).setAppName("streamWordCount" ) val sc = new StreamingContext (conf,Seconds (3 )) val stream = sc.socketTextStream("node1" ,9999 ) val value = stream.flatMap(_.split(" " )).map((_, 1 )).reduceByKeyAndWindow(_ + _, Seconds (12 ),Seconds (6 )) value.print() sc.start() sc.awaitTermination() }
优化计算 reduceByKeyAndWindow 减少窗口滑动过程中的重复计算,上一个窗口和新的窗口会有重叠部分, 重叠部分的值可以不用重复计算了. 第一个参数就是新的值, 第二个参数是旧的值. newWindow = oldWindow+(newIn-oldOut)
要有checkPoint
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 def reduceByKeyAndWindow ( reduceFunc: (V , V ) => V , invReduceFunc: (V , V ) => V , windowDuration: Duration , slideDuration: Duration = self.slideDuration, numPartitions: Int = ssc.sc.defaultParallelism, filterFunc: ((K , V )) => Boolean = null ): DStream [(K , V )] def reduceByKeyAndWindow ( reduceFunc: (V , V ) => V , invReduceFunc: (V , V ) => V , windowDuration: Duration , slideDuration: Duration , partitioner: Partitioner , filterFunc: ((K , V )) => Boolean ): DStream [(K , V )]
1 2 3 4 5 val value = stream .flatMap(_.split(" " )) .map((_, 1 )) .reduceByKeyAndWindow(_ + _, _-_,Seconds (12 ),Seconds (6 )) .print
window 基于对源 DStream 窗化的批次进行计算返回一个新的 Dstream
1 def window (windowDuration: Duration , slideDuration: Duration ): DStream [T ]
countByWindow 返回一个滑动窗口计数流中的元素的个数。
1 2 3 def countByWindow ( windowDuration: Duration , slideDuration: Duration ): DStream [Long ]
countByValueAndWindow 对(K,V)对的DStream调用,返回(K,Long)对的新DStream,其中每个key的的对象的v是其在滑动窗口中频率
1 2 3 4 5 6 def countByValueAndWindow ( windowDuration: Duration , slideDuration: Duration , numPartitions: Int = ssc.sc.defaultParallelism) (implicit ord: Ordering [T ] = null ) : DStream [(T , Long )]