本文最后更新于 2021-08-05 11:42:59
自定义数据源 其实就是自定义接收器
需要继承Receiver,并实现onStart、onStop方法来自定义数据源采集。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 object MySource { def apply (host: String , port: Int ): MySource = new MySource (host, port) }class MySource (host: String , port: Int ) extends Receiver [String ](StorageLevel .MEMORY_ONLY ) { override def onStart (): Unit = { new Thread ("Socket Receiver" ){ override def run (): Unit = { receive() } }.start() } def receive ()={ val socket = new Socket (host, port) val reader = new BufferedReader (new InputStreamReader (socket.getInputStream, StandardCharsets .UTF_8 )) var line: String = null while (!isStopped && (line = reader.readLine()) != null ){ store(line) } reader.close() socket.close() restart("Trying to connect again" ) } override def onStop (): Unit = { } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 object MySourceDemo { def main (args: Array [String ]): Unit = { val conf = new SparkConf ().setAppName("StreamingWordCount" ).setMaster("local[*]" ) val ssc = new StreamingContext (conf, Seconds (5 )) val lines: ReceiverInputDStream [String ] = ssc.receiverStream[String ](MySource ("node1" , 9999 )) val words: DStream [String ] = lines.flatMap(_.split("""\s+""" )) val wordAndOne: DStream [(String , Int )] = words.map((_, 1 )) val count: DStream [(String , Int )] = wordAndOne.reduceByKey(_ + _) count.print ssc.start() ssc.awaitTermination() ssc.stop(false ) } }