12数据读取与保存

本文最后更新于 2021-08-05 11:42:59

数据读取与保存

从文件中读取数据是创建 RDD 的一种方式.

把数据保存的文件中的操作是一种 Action.

Spark 的数据读取及数据保存可以从两个维度来作区分:文件格式以及文件系统。

文件格式分为:Text文件、Json文件、csv文件、Sequence文件以及Object文件;

文件系统分为:本地文件系统、HDFS、Hbase 以及 数据库。

平时用的比较多的就是: 从 HDFS 读取和保存 Text 文件.

读写Text文件

1
2
3
val rdd1 = sc.textFile("./words.txt")

rdd2.saveAsTextFile("hdfs://hadoop201:9000/words_output")

读写JSON文件

如果 JSON 文件中每一行就是一个 JSON 记录,那么可以通过将 JSON 文件当做文本文件来读取,然后利用相关的 JSON 库对每一条数据进行 JSON 解析。

1
2
3
4
5
6
val rdd1 = sc.textFile("resources/people.json")
import scala.util.parsing.json.JSON
val rdd2 = rdd1.map(JSON.parseFull)
rdd2.collect
//res2: Array[Option[Any]] = Array(Some(Map(name -> Michael)), Some(Map(name -> Andy, age -> 30.0)), Some(Map(name -> Justin, age -> 19.0)))

读写SequenceFile

SequenceFile 文件是 Hadoop 用来存储二进制形式的 key-value 对而设计的一种平面文件(Flat File)。

Spark 有专门用来读取 SequenceFile 的接口。

1
2
3
4
5
6
7
 val rdd1 = sc.parallelize(Array(("a", 1),("b", 2),("c", 3)))
rdd1.saveAsSequenceFile("hdfs://hadoop201:9000/seqFiles")

val rdd1 = sc.sequenceFile[String, Int]("hdfs://hadoop201:9000/seqFiles")
rdd1.collect
//res4: Array[(String, Int)] = Array((a,1), (b,2), (c,3))

读写objectFile文件

对象文件是将对象序列化后保存的文件,采用 Java 的序列化机制。

1
2
3
4
5
6
7
 val rdd1 = sc.parallelize(Array(("a", 1),("b", 2),("c", 3)))
rdd1.saveAsObjectFile("hdfs://hadoop201:9000/obj_file")

val rdd1 = sc.objectFile[(String, Int)]("hdfs://hadoop201:9000/obj_file")
rdd1.collect
/res8: Array[(String, Int)] = Array((a,1), (b,2), (c,3))

读MySQL文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
object JDBCDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Practice").setMaster("local[2]")
val sc = new SparkContext(conf)
//定义连接mysql的参数
val driver = "com.mysql.jdbc.Driver"
val url = "jdbc:mysql://hadoop201:3306/rdd"
val userName = "root"
val passWd = "aaa"

val rdd = new JdbcRDD(
sc, //SparkContext
() => { //创建Connection
Class.forName(driver)
DriverManager.getConnection(url, userName, passWd)
},
"select id, name from user where id >= ? and id <= ?", //SQL语句
1, //下限 第一个占位符
20, //上限 第二个占位符
2, // 分区数
result => (result.getInt(1), result.getString(2)) //resultSet结果封装
)
rdd.collect.foreach(println)

}
}

写如MySQL

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
object JDBCDemo2 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Practice").setMaster("local[2]")
val sc = new SparkContext(conf)
//定义连接mysql的参数
val driver = "com.mysql.jdbc.Driver"
val url = "jdbc:mysql://hadoop201:3306/rdd"
val userName = "root"
val passWd = "aaa"

val rdd: RDD[(Int, String)] = sc.parallelize(Array((110, "police"), (119, "fire")))
// 对每个分区执行 参数函数
rdd.foreachPartition(it => {
//每个分区创建一个连接
Class.forName(driver)
val conn: Connection = DriverManager.getConnection(url, userName, passWd)
//分区内数据foreach
it.foreach(x => {
val statement: PreparedStatement = conn.prepareStatement("insert into user values(?, ?)")
statement.setInt(1, x._1)
statement.setString(2, x._2)
statement.executeUpdate()
})
})



}
}

读Hbase

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object HBaseDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Practice").setMaster("local[2]")
val sc = new SparkContext(conf)

val hbaseConf: Configuration = HBaseConfiguration.create()
hbaseConf.set("hbase.zookeeper.quorum", "hadoop201,hadoop202,hadoop203")
hbaseConf.set(TableInputFormat.INPUT_TABLE, "student")

val rdd: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(
hbaseConf,
classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result])

val rdd2: RDD[String] = rdd.map {
case (_, result) => Bytes.toString(result.getRow)
}
rdd2.collect.foreach(println)
sc.stop()
}
}

写Hbase

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.{SparkConf, SparkContext}

object HBaseDemo2 {


def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Practice").setMaster("local[2]")
val sc = new SparkContext(conf)

val hbaseConf = HBaseConfiguration.create()
hbaseConf.set("hbase.zookeeper.quorum", "hadoop201,hadoop202,hadoop203")
hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, "student")
// 通过job来设置输出的格式的类
val job = Job.getInstance(hbaseConf)
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Put])

val initialRDD = sc.parallelize(List(("100", "apple", "11"), ("200", "banana", "12"), ("300", "pear", "13")))
val hbaseRDD = initialRDD.map(x => {
val put = new Put(Bytes.toBytes(x._1))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(x._2))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("weight"), Bytes.toBytes(x._3))
(new ImmutableBytesWritable(), put)
})
hbaseRDD.saveAsNewAPIHadoopDataset(job.getConfiguration)
}
}


12数据读取与保存
https://jiajun.xyz/2021/07/14/bigdata/10spark/12数据读取与保存/
作者
Lambda
发布于
2021年7月14日
更新于
2021年8月5日
许可协议