objectJDBCDemo{ defmain(args: Array[String]): Unit = { val conf = newSparkConf().setAppName("Practice").setMaster("local[2]") val sc = newSparkContext(conf) //定义连接mysql的参数 val driver = "com.mysql.jdbc.Driver" val url = "jdbc:mysql://hadoop201:3306/rdd" val userName = "root" val passWd = "aaa"
val rdd = newJdbcRDD( sc, //SparkContext () => { //创建Connection Class.forName(driver) DriverManager.getConnection(url, userName, passWd) }, "select id, name from user where id >= ? and id <= ?", //SQL语句 1, //下限 第一个占位符 20, //上限 第二个占位符 2, // 分区数 result => (result.getInt(1), result.getString(2)) //resultSet结果封装 ) rdd.collect.foreach(println)
objectJDBCDemo2{ defmain(args: Array[String]): Unit = { val conf = newSparkConf().setAppName("Practice").setMaster("local[2]") val sc = newSparkContext(conf) //定义连接mysql的参数 val driver = "com.mysql.jdbc.Driver" val url = "jdbc:mysql://hadoop201:3306/rdd" val userName = "root" val passWd = "aaa"
val rdd: RDD[(Int, String)] = sc.parallelize(Array((110, "police"), (119, "fire"))) // 对每个分区执行 参数函数 rdd.foreachPartition(it => { //每个分区创建一个连接 Class.forName(driver) val conn: Connection = DriverManager.getConnection(url, userName, passWd) //分区内数据foreach it.foreach(x => { val statement: PreparedStatement = conn.prepareStatement("insert into user values(?, ?)") statement.setInt(1, x._1) statement.setString(2, x._2) statement.executeUpdate() }) })
objectHBaseDemo{ defmain(args: Array[String]): Unit = { val conf = newSparkConf().setAppName("Practice").setMaster("local[2]") val sc = newSparkContext(conf)
val hbaseConf: Configuration = HBaseConfiguration.create() hbaseConf.set("hbase.zookeeper.quorum", "hadoop201,hadoop202,hadoop203") hbaseConf.set(TableInputFormat.INPUT_TABLE, "student")
val rdd: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD( hbaseConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
val rdd2: RDD[String] = rdd.map { case (_, result) => Bytes.toString(result.getRow) } rdd2.collect.foreach(println) sc.stop() } }