本文最后更新于 2021-08-05 11:42:59
Hbase集成 集成MR 统计的需要:HBase的数据都是分布式存储在RegionServer上的,所以对于类似传统关系型数据库的group by操作,扫描器是无能为力的,只有当所有结果都返回到客户端的时候,才能进行统计。这样做一是慢,二是会产生很大的网络开销,所以使用MapReduce在服务器端就进行统计是比较好的方案。
性能的需要:说白了就是“快”!如果遇到较复杂的场景,在扫描器上添加多个过滤器后,扫描的性能很低;或者当数据量很大的时候扫描器也会执行得很慢,原因是扫描器和过滤器内部实现的机制很复杂,虽然使用者调用简单,但是服务器端的性能就不敢保证了
加入依赖 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 <properties > <project.build.sourceEncoding > UTF-8</project.build.sourceEncoding > <encoding > UTF-8</encoding > <java.version > 1.8</java.version > <maven.compiler.source > 1.8</maven.compiler.source > <maven.compiler.target > 1.8</maven.compiler.target > </properties > <dependencies > <dependency > <groupId > org.apache.hadoop</groupId > <artifactId > hadoop-common</artifactId > <version > 3.3.0</version > <scope > provided</scope > </dependency > <dependency > <groupId > org.apache.hadoop</groupId > <artifactId > hadoop-hdfs</artifactId > <version > 3.3.0</version > <scope > provided</scope > </dependency > <dependency > <groupId > org.apache.hadoop</groupId > <artifactId > hadoop-hdfs-client</artifactId > <version > 3.3.0</version > <scope > provided</scope > </dependency > <dependency > <groupId > org.apache.hadoop</groupId > <artifactId > hadoop-client</artifactId > <version > 3.3.0</version > </dependency > <dependency > <groupId > org.apache.hadoop</groupId > <artifactId > hadoop-mapreduce-client-core</artifactId > <version > 3.3.0</version > <scope > provided</scope > </dependency > <dependency > <groupId > org.apache.hadoop</groupId > <artifactId > hadoop-core</artifactId > <version > 1.2.1</version > <scope > provided</scope > </dependency > <dependency > <groupId > org.apache.hbase</groupId > <artifactId > hbase</artifactId > <version > 2.2.6</version > <type > pom</type > </dependency > <dependency > <groupId > org.apache.hbase</groupId > <artifactId > hbase-shaded-mapreduce</artifactId > <version > 2.2.6</version > </dependency > </dependencies >
Mapper 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public class HbaseMapper extends TableMapper <ImmutableBytesWritable, Put> { @Override protected void map (ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { Put put = new Put (key.get()); for (Cell cell: value.rawCells()){ put.addColumn(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell), CellUtil.cloneValue(cell)); } context.write(key, put); } }
Reducer 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public class HbaseReducer extends TableReducer <ImmutableBytesWritable, Put, ImmutableBytesWritable> { Logger log = LoggerFactory.getLogger(HbaseReducer.class); @Override protected void reduce (ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException { for (Put put: values){ log.info(Bytes.toString(key.get())); List<Cell> cells = put.get(Bytes.toBytes("info" ), Bytes.toBytes("name" )); cells.forEach(cell->{ log.info(Bytes.toString(CellUtil.cloneFamily(cell))); log.info(Bytes.toString(CellUtil.cloneQualifier(cell))); log.info(Bytes.toString(CellUtil.cloneValue(cell))); }); context.write(key, put); } } }
Driver 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 public class HbaseDriver extends Configured implements Tool { public int run (String[] args) throws Exception { Configuration conf = this .getConf(); Job job = Job.getInstance(conf, this .getClass().getSimpleName()); job.setJarByClass(HbaseDriver.class); Scan scan = new Scan (); scan.setCacheBlocks(false ); scan.setCaching(500 ); TableMapReduceUtil.initTableMapperJob( "java_create:mr_test1" , scan, HbaseMapper.class, ImmutableBytesWritable.class, Put.class, job ); TableMapReduceUtil.initTableReducerJob("java_create:mr_test2" , HbaseReducer.class, job); job.setNumReduceTasks(1 ); boolean isSuccess = job.waitForCompletion(true ); if (!isSuccess) { throw new IOException ("Job running with error" ); } return isSuccess ? 0 : 1 ; } public static void main (String[] args) throws Exception { int res = ToolRunner.run(new Configuration (), new HbaseDriver (), args); System.exit(res); } }
执行 将hbase-site.xml 拷贝到hadoop中
打包后运行:
1 HADOOP_CLASSPATH=`/usr/local/soft/hbase/hbase-2.2.6/bin/hbase mapredcp` hadoop jar hbase_mr_test-1.0-SNAPSHOT.jar priv.king.HbaseDriver -libjars $(/usr/local/soft/hbase/hbase-2.2.6/bin/hbase mapredcp | tr ':' ',' ) ...
注意
当mapper阶段value为 Put类型输出key相同时,有combiner过程,合并
ImmutableBytesWritable,Put等都没有实现 WritableComparable ,也能序列化,是因为在TableMapReduceUtil.initTableReducerJob("java_create:mr_test2", HbaseReducer.class, job);时手动设置了自定义序列化器
conf.setStrings("io.serializations", new String[]{conf.get("io.serializations"), MutationSerialization.class.getName(), ResultSerialization.class.getName()});
集成Hive 通过 Hive 与 HBase 整合,可以将 HBase 的数据通过 Hive 来分析,让 HBase 支持 JOIN、GROUP 等 SQL 查询语法。
实现将批量数据导入到 HBase 表中。
依赖 已有 HDFS、MapReduce、Hive、Zookeeper、HBase 环境。
确保 Hive 的 lib 目录下有 hive-hbase-handler-xxx.jar、Zookeeper jar、HBase Server jar、HBase Client jar 包
将hbase-site.xml 放到hive下
Hbase表存在 1 2 3 4 5 6 7 8 9 10 11 12 13 14 create external table hbase_t1( id string, name string, age string, sex string, friends string ) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,info:name,info:age,info:sex,info:friends") TBLPROPERTIES ("hbase.table.name" = "java_create:mr_test2");
Hbase表不存在 1 2 3 4 5 6 7 8 9 10 11 12 create table hbase_t1( id string, name string, age string, sex string, friends string ) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,info:name,info:age,info:sex,info:friends") TBLPROPERTIES ("hbase.table.name" = "java_create:mr_test2");
managed_table
由hive的表管理数据的生命周期!在hive中,执行droptable时,
不仅将hive中表的元数据删除,还把表目录中的数据删除
external_table
hive表不负责数据的生命周期!在hive中,执行droptable时,
只会将hive中表的元数据删除,不把表目录中的数据删除
Storage Handlers
Storage Handlers是一个扩展模块,帮助hive分析不在hdfs存储的数据!
例如数据存储在hbase上,可以使用hive提供的对hbase的Storage Handlers,来读写hbase中的数据!
native table:
本地表! hive无需通过Storage Handlers就能访问的表。
[ROW FORMAT row_format] [STORED AS file_format] file_format: ORC|TEXTFILE|SEQUNCEFILE|PARQUET
non-native table :
hive必须通过Storage Handlers才能访问的表!
STORED BY ‘storage.handler.class.name’ [WITH SERDEPROPERTIES (…)]
SERDE:hive中序列化器和反序列化器
表中的数据是什么样的格式,就必须使用什么样的SerDe!
纯文本: row format delimited ,默认使用LazySimpleSerDe
JSON格式: 使用JsonSerde
ORC: 使用读取ORC的SerDe
Paquet: 使用读取PaquetSerDe
HDFS files –> InputFileFormat –> <key, value> –> Deserializer –> Row object Row object –> Serializer –> <key, value> –> OutputFileFormat –> HDFS files
调用InputFormat,将文件切成不同的文档。每篇文档即一行(Row)。 调用SerDe的Deserializer,将一行(Row),切分为各个字段。
1 2 3 4 5 6 7 create table testSerde2( name string, friends array < string> )ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' STORED AS TEXTFILE