05HbaseAPI

本文最后更新于 2021-08-05 11:42:59

HBASE API

dependency

1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>2.2.6</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.2.6</version>
</dependency>

在resource加上hbase-site.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-->
<configuration>
<!--
The following properties are set for running HBase as a single process on a
developer workstation. With this configuration, HBase is running in
"stand-alone" mode and without a distributed file system. In this mode, and
without further configuration, HBase and ZooKeeper data are stored on the
local filesystem, in a path under the value configured for `hbase.tmp.dir`.
This value is overridden from its default value of `/tmp` because many
systems clean `/tmp` on a regular basis. Instead, it points to a path within
this HBase installation directory.

Running against the `LocalFileSystem`, as opposed to a distributed
filesystem, runs the risk of data integrity issues and data loss. Normally
HBase will refuse to run in such an environment. Setting
`hbase.unsafe.stream.capability.enforce` to `false` overrides this behavior,
permitting operation. This configuration is for the developer workstation
only and __should not be used in production!__

See also https://hbase.apache.org/book.html#standalone_dist
-->
<property>
<name>hbase.tmp.dir</name>
<value>/usr/local/soft/hbase/hbase-2.2.6/habase_temp_dir</value>
</property>
<property>
<name>hbase.unsafe.stream.capability.enforce</name>
<value>false</value>
</property>

<property>
<name>hbase.rootdir</name>
<value>hdfs://node1:9000/hbase</value>
</property>
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
<!-- 0.98后的新变动,之前版本没有.port,默认端口为60000 -->
<property>
<name>hbase.master.port</name>
<value>16000</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>node1:2181,node2:2181,node3:2181</value>
</property>
<property>
<name>hbase.zookeeper.property.dataDir</name>
<value>/usr/local/soft/zookeeper/apache-zookeeper-3.6.2-bin/data</value>
</property>
</configuration>

ConnectionUtil

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* @author king
* TIME: 2020/11/3 - 23:14
* 创建和关闭Connection对象
* 可以使用HBaseConfiguration.create(),返回的Configuration,
* 既包含hadoop8个配置文件的参数,又包含hbase-default.xml和hbase-site.xml中所有的参数配置!
**/
public class ConnectionUtil {

public static Connection getConn (){
try {
return ConnectionFactory.createConnection();
} catch (IOException e) {
e.printStackTrace();
return null;
}
}

public static void close(Connection conn) throws IOException {

if (conn !=null) {
conn.close();
}

}


}

NameSpaceUtil

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
public class NameSpaceUtil {
private static Logger logger= LoggerFactory.getLogger(NameSpaceUtil.class);

//查询所有的名称空间
public static List<String> listNameSpace(Connection conn) throws IOException{
List<String> nss=new ArrayList<>();
//提供一个Admin
Admin admin = conn.getAdmin();
//查询所有的库
NamespaceDescriptor[] namespaceDescriptors = admin.listNamespaceDescriptors();
for (NamespaceDescriptor namespaceDescriptor : namespaceDescriptors) {
//取出每个库描述中库的名称
nss.add(namespaceDescriptor.getName());
}
//关闭admin
admin.close();
return nss;
}

//判断是否库存在
public static boolean ifNSExists(Connection conn,String nsname) throws IOException {
//库名校验
if (StringUtils.isBlank(nsname)) {
logger.error("请输入正常的库名!");
//在后台提示,库名非法
return false;
}
//提供一个Admin
Admin admin = conn.getAdmin();
//根据库名查询对应的NS,如果找不到就抛异常
try {
admin.getNamespaceDescriptor(nsname);
return true;
} catch (Exception e) {
return false;
}finally {
//关闭admin
admin.close();
}
}

//创建库
public static boolean creatNS(Connection conn,String nsname) throws IOException {
//库名校验
if (StringUtils.isBlank(nsname)) {
logger.error("请输入正常的库名!");
//在后台提示,库名非法
return false;
}
//提供一个Admin
Admin admin = conn.getAdmin();
//新建库
try {
//先创建库的定义或描述
NamespaceDescriptor descriptor = NamespaceDescriptor.create(nsname).build();
admin.createNamespace(descriptor);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}finally {
//关闭admin
admin.close();
}
}

//删除库
public static boolean deleteNS(Connection conn,String nsname) throws IOException {
//库名校验
if (StringUtils.isBlank(nsname)) {
logger.error("请输入正常的库名!");
//在后台提示,库名非法
return false;
}

//提供一个Admin
Admin admin = conn.getAdmin();
//只能删除空库,判断当前库是否为empty,不为空无法删除
//查询当前库下有哪些表
List<String> tables = getTablesInNameSpace(conn, nsname);
if (tables.size()==0) {
admin.deleteNamespace(nsname);
//关闭admin
admin.close();
return true;
}else {
//关闭admin
admin.close();
logger.error(nsname+"库非空!无法删除!");
return false;

}
}

//查询库下有哪些表
public static List<String> getTablesInNameSpace(Connection conn, String nsname) throws IOException {
//库名校验
if (StringUtils.isBlank(nsname)) {
logger.error("请输入正常的库名!");
//在后台提示,库名非法
return null;
}
List<String> tables=new ArrayList<>();
//提供一个Admin
Admin admin = conn.getAdmin();
//查询当前库所有的表
List<TableDescriptor> tds = admin.listTableDescriptorsByNamespace(Bytes.toBytes(nsname));
for (TableDescriptor td : tds) {
tables.add(td.getTableName().getNameWithNamespaceInclAsString());
}
//关闭admin
admin.close();
return tables;
}
}

TableUtil

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
public class TableUtil {
private static Logger logger = LoggerFactory.getLogger(TableUtil.class);
//验证表名是否合法并返回
public static TableName checkTableName(String tableName, String nsname) {
if (StringUtils.isBlank(tableName)) {
logger.error("请输入正确的表名!");
return null;
}
return TableName.valueOf(nsname, tableName);
}
//判断表是否存在
public static boolean ifTableExists(Connection conn, String tableName, String nsname) throws IOException {
//校验表名
TableName tn = checkTableName(tableName, nsname);
if (tn == null) {
return false;
}
Admin admin = conn.getAdmin();
//判断表是否存在,需要传入TableName对象
boolean tableExists = admin.tableExists(tn);
admin.close();
return tableExists;
}
//创建表
public static boolean createTable(Connection conn, String tableName, String nsname, String... cfs) throws IOException {
//校验表名
TableName tn = checkTableName(tableName, nsname);
if (tn == null) {
return false;
}
//至少需要传入一个列族
if (cfs.length < 1) {
logger.error("至少需要指定一个列族!");
return false;
}
Admin admin = conn.getAdmin();
//创建表的描述
List<ColumnFamilyDescriptor> cfList = new ArrayList<>();
for (String cf : cfs) {
//TTL(Time-To-Live):每个Cell的数据超时时间(当前时间 - 最后更新的时间)
//MinVersion:如果当前存储的所有时间版本都早于TTL,至少MIN_VERSION个最新版本会保留下来。这样确保在你的查询以及数据早于TTL时有结果返回。
ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder
.newBuilder(Bytes.toBytes(cf))
//设置cell不过期
.setTimeToLive(HConstants.FOREVER)
.setMaxVersions(10)
.setMinVersions(3)
.build();
cfList.add(cfd);
}
TableDescriptor td = TableDescriptorBuilder.newBuilder(tn)
.setColumnFamilies(cfList)
.build();

//根据表的描述创建表
admin.createTable(td);
admin.close();
return true;
}
//删除表
public static boolean dropTable(Connection conn, String tableName, String nsname) throws IOException {
//检查表是否存在
if (!ifTableExists(conn, tableName, nsname)) {
return false;
}
//校验表名
TableName tn = checkTableName(tableName, nsname);
Admin admin = conn.getAdmin();
//删除之前需要先禁用表
admin.disableTable(tn);
//删除表
admin.deleteTable(tn);
admin.close();
return true;
}
}

DataUtil

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
/**
* @author king
* TIME: 2020/11/3 - 23:05
*
* 数据的增删改查,需要使用的是Table
*
* Put: 代表对单行数据的put操作
* Get: 代表对单行数据的Get操作
* Result: scan或get的单行的所有的记录
* Cell: 代表一个单元格,hbase提供了CellUtil.clonexxx(Cell),来获取cell中的列族、列名和值属性
*
* 在hbase中,操作的数据都是以byte[]形式存在,需要把常用的数据类型转为byte[]
* hbase提供了Bytes工具类
* Bytes.toBytes(x): 基本数据类型转byte[]
* Bytes.toXxx(x): 从byte[]转为Xxx类型!
**/
public class DataUtil {
//先获取到表的table对象
public static Table getTable(Connection conn, String tableName, String nsname) throws IOException {
//验证表名是否合法
TableName tn = TableUtil.checkTableName(tableName, nsname);
if (tn == null) {
return null;
}
//根据TableName获取对应的Table
return conn.getTable(tn);
}
//put 表名,rowkey,列名(列族名:列名),value
public static void put(Connection conn,String tableName,String nsname,String rowkey,String cloumnFamily,
String cloumnQualifier,String value) throws IOException {
//获取表对象
Table table = getTable(conn, tableName, nsname);
if (table==null) {
return;
}
//创建一个Put对象
Put put = new Put(Bytes.toBytes(rowkey));
//向put中设置cell的细节信息
put.addColumn(Bytes.toBytes(cloumnFamily), Bytes.toBytes(cloumnQualifier), Bytes.toBytes(value));
//.addColumn(family, qualifier, value)
table.put(put);
table.close();
}
// get 表名 rowkey
public static void get(Connection conn,String tableName,String nsname,String rowkey) throws IOException {
//获取表对象
Table table = getTable(conn, tableName, nsname);
if (table==null) {
return ;
}
Get get = new Get(Bytes.toBytes(rowkey));
//设置单行查询的详细信息
//设置查哪个列
//get.addColumn(family, qualifier)
//设置查哪个列族
//get.addFamily(family)
//只查某个时间戳的数据
//get.setTimeStamp(timestamp)
//设置返回的versions
//get.setMaxVersions(maxVersions)
Result result = table.get(get);
//System.out.println(result);
parseResult(result);
table.close();
}
//遍历result
public static void parseResult(Result result) {
if (result != null) {
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
System.out.println("行:"+ Bytes.toString(CellUtil.cloneRow(cell))+
" 列族:"+Bytes.toString(CellUtil.cloneFamily(cell))+" 列名:"+
Bytes.toString(CellUtil.cloneQualifier(cell))+
" 值:"+Bytes.toString(CellUtil.cloneValue(cell)));
}
}
}


public static void scan(Connection conn, String tableName, String nsname) throws Exception {
Table table = getTable(conn, tableName, nsname);

if (table == null) {
return;
}

Scan scan = new Scan();
//起始行
scan.withStartRow(Bytes.toBytes(1));
//结束行
scan.withStopRow(Bytes.toBytes(50));

// scan.addColumn()
// scan.addFamily()
//设置过滤器
// scan.setFilter()
// scan.setLimit()
// scan.setxxxxxx
//多行result的集合
ResultScanner scanner = table.getScanner(scan);

Iterator<Result> iterator = scanner.iterator();
while (iterator.hasNext()) {
Result next = iterator.next();
parseResult(next);

}
table.close();
}


public static void delete(Connection conn, String tableName, String nsname, String rowKey) throws Exception {
Table table = getTable(conn, tableName, nsname);

if (table == null) {
return;
}

Delete delete = new Delete(Bytes.toBytes(rowKey));
// 删除最新 version
// delete.addColumn()
// 删除all version
// delete.addColumns()
// delete.addFamily()
table.delete(delete);
table.close();
}
}

05HbaseAPI
https://jiajun.xyz/2020/11/04/bigdata/03hbase/05HbaseAPI/
作者
Lambda
发布于
2020年11月4日
更新于
2021年8月5日
许可协议