17自定义数据源

本文最后更新于 2021-08-05 11:42:59

自定义数据源

其实就是自定义接收器

需要继承Receiver,并实现onStart、onStop方法来自定义数据源采集。

object MySource{
  def apply(host: String, port: Int): MySource = new MySource(host, port)
}

class MySource(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY){
  /*
  接收器启动的时候调用该方法. This function must initialize all resources (threads, buffers, etc.) necessary for receiving data.
  这个函数内部必须初始化一些读取数据必须的资源
  该方法不能阻塞, 所以 读取数据要在一个新的线程中进行.
   */
  override def onStart(): Unit = {

    // 启动一个新的线程来接收数据
    new Thread("Socket Receiver"){
      override def run(): Unit = {
        receive()
      }
    }.start()
  }

  // 此方法用来接收数据
  def receive()={
    val socket = new Socket(host, port)
    val reader = new BufferedReader(new InputStreamReader(socket.getInputStream, StandardCharsets.UTF_8))
    var line: String = null
    // 当 receiver没有关闭, 且reader读取到了数据则循环发送给spark
    while (!isStopped && (line = reader.readLine()) != null ){
      // 发送给spark
      store(line)
    }
    // 循环结束, 则关闭资源
    reader.close()
    socket.close()

    // 重启任务
    restart("Trying to connect again")
  }
  override def onStop(): Unit = {

  }

}
object MySourceDemo {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("StreamingWordCount").setMaster("local[*]")
    // 1. 创建SparkStreaming的入口对象: StreamingContext  参数2: 表示事件间隔
    val ssc = new StreamingContext(conf, Seconds(5))
    // 2. 创建一个DStream
    val lines: ReceiverInputDStream[String] = ssc.receiverStream[String](MySource("node1", 9999))
    // 3. 一个个的单词
    val words: DStream[String] = lines.flatMap(_.split("""\s+"""))
    // 4. 单词形成元组
    val wordAndOne: DStream[(String, Int)] = words.map((_, 1))
    // 5. 统计单词的个数
    val count: DStream[(String, Int)] = wordAndOne.reduceByKey(_ + _)
    //6. 显示
    count.print
    //7. 启动流式任务开始计算
    ssc.start()
    //8. 等待计算结束才退出主程序
    ssc.awaitTermination()
    ssc.stop(false)
  }
}

17自定义数据源
https://jiajun.xyz/2021/07/20/bigdata/10spark/17自定义数据源/
作者
Lambda
发布于
2021年7月20日
更新于
2021年8月5日
许可协议