06Hbase集成

本文最后更新于 2021-08-05 11:42:59

Hbase集成

集成MR

统计的需要:HBase的数据都是分布式存储在RegionServer上的,所以对于类似传统关系型数据库的group by操作,扫描器是无能为力的,只有当所有结果都返回到客户端的时候,才能进行统计。这样做一是慢,二是会产生很大的网络开销,所以使用MapReduce在服务器端就进行统计是比较好的方案。

性能的需要:说白了就是“快”!如果遇到较复杂的场景,在扫描器上添加多个过滤器后,扫描的性能很低;或者当数据量很大的时候扫描器也会执行得很慢,原因是扫描器和过滤器内部实现的机制很复杂,虽然使用者调用简单,但是服务器端的性能就不敢保证了

加入依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<encoding>UTF-8</encoding>
<java.version>1.8</java.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>


<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.3.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.3.0</version>
<scope>provided</scope>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs-client -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-client</artifactId>
<version>3.3.0</version>
<scope>provided</scope>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-mapreduce-client-core -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>3.3.0</version>
<scope>provided</scope>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-core -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.2.1</version>
<scope>provided</scope>
</dependency>

<!--<dependency>-->
<!--<groupId>org.apache.hbase</groupId>-->
<!--<artifactId>hbase-server</artifactId>-->
<!--<version>2.2.6</version>-->
<!--</dependency>-->
<!--<dependency>-->
<!--<groupId>org.apache.hbase</groupId>-->
<!--<artifactId>hbase-client</artifactId>-->
<!--<version>2.2.6</version>-->
<!--</dependency>-->

<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase</artifactId>
<version>2.2.6</version>
<type>pom</type>
</dependency>


<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-shaded-mapreduce -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-shaded-mapreduce</artifactId>
<version>2.2.6</version>
</dependency>


</dependencies>

Mapper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* @author king
* TIME: 2020/11/7 - 12:45
*
* TableMapper 简单继承了Mapper 主要是hbase输入类型为ImmutableBytesWritable, Result
* ImmutableBytesWritable -> cloumnkey
**/
public class HbaseMapper extends TableMapper<ImmutableBytesWritable, Put> {

@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
Put put = new Put(key.get());
//遍历添加column行
for(Cell cell: value.rawCells()){
put.addColumn(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell), CellUtil.cloneValue(cell));
}
//将读取到的每行数据写入到context中作为map的输出
context.write(key, put);
}
}

Reducer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* @author king
* TIME: 2020/11/7 - 12:58
**/
public class HbaseReducer extends TableReducer<ImmutableBytesWritable, Put, ImmutableBytesWritable> {

Logger log = LoggerFactory.getLogger(HbaseReducer.class);
@Override
protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
//读出来的每一行数据写入到fruit_mr表中
for(Put put: values){
log.info(Bytes.toString(key.get()));
List<Cell> cells = put.get(Bytes.toBytes("info"), Bytes.toBytes("name"));
cells.forEach(cell->{
log.info(Bytes.toString(CellUtil.cloneFamily(cell)));
log.info(Bytes.toString(CellUtil.cloneQualifier(cell)));
log.info(Bytes.toString(CellUtil.cloneValue(cell)));
});
context.write(key, put);
}
}
}

Driver

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class HbaseDriver extends Configured implements Tool {

public int run(String[] args) throws Exception {
//得到Configuration
Configuration conf = this.getConf();
//创建Job任务
Job job = Job.getInstance(conf, this.getClass().getSimpleName());
job.setJarByClass(HbaseDriver.class);

//配置Job
Scan scan = new Scan();
scan.setCacheBlocks(false);
scan.setCaching(500);

//设置Mapper,注意导入的是mapreduce包下的,不是mapred包下的,后者是老版本
TableMapReduceUtil.initTableMapperJob(
"java_create:mr_test1", //数据源的表名
scan, //scan扫描控制器
HbaseMapper.class,//设置Mapper类
ImmutableBytesWritable.class,//设置Mapper输出key类型
Put.class,//设置Mapper输出value值类型
job//设置给哪个JOB
);
//设置Reducer
TableMapReduceUtil.initTableReducerJob("java_create:mr_test2", HbaseReducer.class, job);
//设置Reduce数量,最少1个
job.setNumReduceTasks(1);

boolean isSuccess = job.waitForCompletion(true);
if (!isSuccess) {
throw new IOException("Job running with error");
}
return isSuccess ? 0 : 1;
}

public static void main(String[] args) throws Exception {
// Let <code>ToolRunner</code> handle generic command-line options
int res = ToolRunner.run(new Configuration(), new HbaseDriver(), args);
System.exit(res);
}

}

执行

将hbase-site.xml 拷贝到hadoop中

打包后运行:

1
HADOOP_CLASSPATH=`/usr/local/soft/hbase/hbase-2.2.6/bin/hbase mapredcp` hadoop jar hbase_mr_test-1.0-SNAPSHOT.jar priv.king.HbaseDriver -libjars $(/usr/local/soft/hbase/hbase-2.2.6/bin/hbase mapredcp | tr ':' ',') ...

注意

  • 当mapper阶段value为 Put类型输出key相同时,有combiner过程,合并
  • ImmutableBytesWritable,Put等都没有实现 WritableComparable ,也能序列化,是因为在TableMapReduceUtil.initTableReducerJob("java_create:mr_test2", HbaseReducer.class, job);时手动设置了自定义序列化器
  • conf.setStrings("io.serializations", new String[]{conf.get("io.serializations"), MutationSerialization.class.getName(), ResultSerialization.class.getName()});

集成Hive

通过 Hive 与 HBase 整合,可以将 HBase 的数据通过 Hive 来分析,让 HBase 支持 JOIN、GROUP 等 SQL 查询语法。

实现将批量数据导入到 HBase 表中。

依赖

已有 HDFS、MapReduce、Hive、Zookeeper、HBase 环境。

确保 Hive 的 lib 目录下有 hive-hbase-handler-xxx.jar、Zookeeper jar、HBase Server jar、HBase Client jar 包

将hbase-site.xml 放到hive下

Hbase表存在

1
2
3
4
5
6
7
8
9
10
11
12
13
14
create external table hbase_t1(
id string,
name string,
age string,
sex string,
friends string
)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,info:name,info:age,info:sex,info:friends")
TBLPROPERTIES ("hbase.table.name" = "java_create:mr_test2");

--TBLPROPERTIES ("hbase.table.name" = "java_create:mr_test2"); 可选 默认在default namespace下创建同名表
-- :key ->对应cloumnKey
-- 指定为外部表,因为hbase中存在了

Hbase表不存在

1
2
3
4
5
6
7
8
9
10
11
12
create table hbase_t1(
id string,
name string,
age string,
sex string,
friends string
)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,info:name,info:age,info:sex,info:friends")
TBLPROPERTIES ("hbase.table.name" = "java_create:mr_test2");

--指定为管理表
  • managed_table
    • 由hive的表管理数据的生命周期!在hive中,执行droptable时,
    • 不仅将hive中表的元数据删除,还把表目录中的数据删除
  • external_table
    • hive表不负责数据的生命周期!在hive中,执行droptable时,
    • 只会将hive中表的元数据删除,不把表目录中的数据删除
  • Storage Handlers
    • Storage Handlers是一个扩展模块,帮助hive分析不在hdfs存储的数据!
    • 例如数据存储在hbase上,可以使用hive提供的对hbase的Storage Handlers,来读写hbase中的数据!
  • native table:
    • 本地表! hive无需通过Storage Handlers就能访问的表。
    • [ROW FORMAT row_format] [STORED AS file_format] file_format: ORC|TEXTFILE|SEQUNCEFILE|PARQUET
  • non-native table :
    • hive必须通过Storage Handlers才能访问的表!
    • STORED BY ‘storage.handler.class.name’ [WITH SERDEPROPERTIES (…)]
  • SERDE:hive中序列化器和反序列化器
    • 表中的数据是什么样的格式,就必须使用什么样的SerDe!
    • 纯文本: row format delimited ,默认使用LazySimpleSerDe
    • JSON格式: 使用JsonSerde
    • ORC: 使用读取ORC的SerDe
    • Paquet: 使用读取PaquetSerDe
    • HDFS files –> InputFileFormat –> <key, value> –> Deserializer –> Row object
      Row object –> Serializer –> <key, value> –> OutputFileFormat –> HDFS files
    • 调用InputFormat,将文件切成不同的文档。每篇文档即一行(Row)。
      调用SerDe的Deserializer,将一行(Row),切分为各个字段。
1
2
3
4
5
6
7
create table testSerde2(
name string,
friends array<string>
)
ROW FORMAT SERDE
'org.apache.hive.hcatalog.data.JsonSerDe'
STORED AS TEXTFILE

06Hbase集成
https://jiajun.xyz/2020/11/07/bigdata/03hbase/06Hbase集成/
作者
Lambda
发布于
2020年11月7日
更新于
2021年8月5日
许可协议