01简介

Doris简介

Frontend(FE):Doris 系统的元数据和调度节点。

Backend(BE):Doris 系统的计算和存储节点。

Broker:Broker 为一个独立的无状态进程。封装了文件系统接口,提供 Doris 读取远端存储系统中文件的能力。

大规模并行分析(MPP)数据库(Analytical Massively Parallel Processing (MPP)

建表

Doris支持支持单分区和复合分区两种建表方式。

在复合分区中:

  • 第一级称为 Partition,即分区。用户可以指定某一维度列作为分区列(当前只支持整型和时间类型的列),并指定每个分区的取值范围。
  • 第二级称为 Distribution,即分桶。用户可以指定一个或多个维度列以及桶数对数据进行 HASH 分布

以下场景推荐使用复合分区

  • 有时间维度或类似带有有序值的维度,可以以这类维度列作为分区列。分区粒度可以根据导入频次、分区数据量等进行评估。
  • 历史数据删除需求:如有删除历史数据的需求(比如仅保留最近N 天的数据)。使用复合分区,可以通过删除历史分区来达到目的。也可以通过在指定分区内发送 DELETE 语句进行数据删除。
  • 解决数据倾斜问题:每个分区可以单独指定分桶数量。如按天分区,当每天的数据量差异很大时,可以通过指定分区的分桶数,合理划分不同分区的数据,分桶列建议选择区分度大的列。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
-- 单分区
-- pv这是一个指标列, Doris内部会对指标列做聚合操作, 这个列的聚合方法是求和(SUM)
-- 分桶列为 siteid,桶数为 10
CREATE TABLE table1
(
siteid INT DEFAULT '10',
citycode SMALLINT,
username VARCHAR(32) DEFAULT '',
pv BIGINT SUM DEFAULT '0'
)
AGGREGATE KEY(siteid, citycode, username)
DISTRIBUTED BY HASH(siteid) BUCKETS 10
PROPERTIES("replication_num" = "1");

-- 复合分区
-- 使用 event_day 列作为分区列,建立3个分区: p201706, p201707, p201708 左闭右开
-- 每个分区使用 siteid 进行哈希分桶,桶数为10
CREATE TABLE table2
(
event_day DATE,
siteid INT DEFAULT '10',
citycode SMALLINT,
username VARCHAR(32) DEFAULT '',
pv BIGINT SUM DEFAULT '0'
)
AGGREGATE KEY(event_day, siteid, citycode, username)
PARTITION BY RANGE(event_day)
(
PARTITION p201706 VALUES LESS THAN ('2017-07-01'),
PARTITION p201707 VALUES LESS THAN ('2017-08-01'),
PARTITION p201708 VALUES LESS THAN ('2017-09-01')
)
DISTRIBUTED BY HASH(siteid) BUCKETS 10
PROPERTIES("replication_num" = "1");

Rollup

Rollup 可以理解为 Table 的一个物化索引结构。物化 是因为其数据在物理上独立存储,而 索引 的意思是,Rollup可以调整列顺序以增加前缀索引的命中率,也可以减少key列以增加数据的聚合度。

对于 table1 明细数据是 siteid, citycode, username 三者构成一组 key,从而对 pv 字段进行聚合;如果业务方经常有看城市 pv 总量的需求,可以建立一个只有 citycode, pv 的rollup。

1
ALTER TABLE table1 ADD ROLLUP rollup_city(citycode, pv);

Rollup 建立之后,查询不需要指定 Rollup 进行查询。还是指定原有表进行查询即可。程序会自动判断是否应该使用 Rollup。

数据表的查询

内存限制

为了防止用户的一个查询可能因为消耗内存过大。查询进行了内存控制,一个查询任务,在单个 BE 节点上默认使用不超过 2GB 内存。

1
2
3
4
5
6
SHOW VARIABLES LIKE "%mem_limit%";

-- session级别
SET exec_mem_limit = 8589934592;
-- global级别
SET GLOBAL exec_mem_limit = 8589934592;

查询超时

当前默认查询时间设置为最长为 300 秒,如果一个查询在 300 秒内没有完成,则查询会被 Doris 系统 cancel 掉。

1
2
3
SHOW VARIABLES LIKE "%query_timeout%";

SET query_timeout = 60;

Broadcast/Shuffle Join

系统默认实现 Join 的方式,是将小表进行条件过滤后,将其广播到大表所在的各个节点上,形成一个内存 Hash 表,然后流式读出大表的数据进行Hash Join。但是如果当小表过滤后的数据量无法放入内存的话,此时 Join 将无法完成,通常的报错应该是首先造成内存超限。

如果遇到上述情况,建议显式指定 Shuffle Join,也被称作 Partitioned Join。即将小表和大表都按照 Join 的 key 进行 Hash,然后进行分布式的 Join。这个对内存的消耗就会分摊到集群的所有计算节点上。

Doris会自动尝试进行 Broadcast Join,如果预估小表过大则会自动切换至 Shuffle Join。注意,如果此时显式指定了 Broadcast Join 也会自动切换至 Shuffle Join。

1
2
3
4
5
6
-- 默认
select sum(table1.pv) from table1 join table2 where table1.siteid = 2;
-- 显式broadcast
select sum(table1.pv) from table1 join [broadcast] table2 where table1.siteid = 2;
-- 显式shuffle
select sum(table1.pv) from table1 join [shuffle] table2 where table1.siteid = 2;

数据模型

聚合模型

表中的列按照是否设置了 AggregationType,分为 Key (维度列) 和 Value(指标列)。没有设置 AggregationType 的,如 user_iddateage … 等称为 Key,而设置了 AggregationType 的称为 Value

当我们导入数据时,对于 Key 列相同的行会聚合成一行,而 Value 列会按照设置的 AggregationType 进行聚合。 AggregationType 目前有以下四种聚合方式:

  1. SUM:求和,多行的 Value 进行累加。
  2. REPLACE:替代,下一批数据中的 Value 会替换之前导入过的行中的 Value。
  3. MAX:保留最大值。
  4. MIN:保留最小值。

Uniq 模型

在某些多维分析场景下,用户更关注的是如何保证 Key 的唯一性,即如何获得 Primary Key 唯一性约束。因此,我们引入了 Uniq 的数据模型。该模型本质上是聚合模型的一个特例,也是一种简化的表结构表示方式。我们举例说明。

Uniq 模型完全可以用聚合模型中的 REPLACE 方式替代。其内部的实现方式和数据存储方式也完全一样。这里不再继续举例说明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
CREATE TABLE IF NOT EXISTS example_db.expamle_tbl
(
`user_id` LARGEINT NOT NULL COMMENT "用户id",
`username` VARCHAR(50) NOT NULL COMMENT "用户昵称",
`city` VARCHAR(20) COMMENT "用户所在城市",
`age` SMALLINT COMMENT "用户年龄",
`sex` TINYINT COMMENT "用户性别",
`phone` LARGEINT COMMENT "用户电话",
`address` VARCHAR(500) COMMENT "用户地址",
`register_time` DATETIME COMMENT "用户注册时间"
)
UNIQUE KEY(`user_id`, `username`)
... /* 省略 Partition 和 Distribution 信息 */



CREATE TABLE IF NOT EXISTS example_db.expamle_tbl
(
`user_id` LARGEINT NOT NULL COMMENT "用户id",
`username` VARCHAR(50) NOT NULL COMMENT "用户昵称",
`city` VARCHAR(20) REPLACE COMMENT "用户所在城市",
`age` SMALLINT REPLACE COMMENT "用户年龄",
`sex` TINYINT REPLACE COMMENT "用户性别",
`phone` LARGEINT REPLACE COMMENT "用户电话",
`address` VARCHAR(500) REPLACE COMMENT "用户地址",
`register_time` DATETIME REPLACE COMMENT "用户注册时间"
)
AGGREGATE KEY(`user_id`, `username`)
... /* 省略 Partition 和 Distribution 信息 */

Duplicate 模型

这种数据模型区别于 Aggregate 和 Uniq 模型。数据完全按照导入文件中的数据进行存储,不会有任何聚合。即使两行数据完全相同,也都会保留。 而在建表语句中指定的 DUPLICATE KEY,只是用来指明底层数据按照那些列进行排序。

1
2
3
4
5
6
7
8
9
10
11
12
CREATE TABLE IF NOT EXISTS example_db.expamle_tbl
(
`timestamp` DATETIME NOT NULL COMMENT "日志时间",
`type` INT NOT NULL COMMENT "日志类型",
`error_code` INT COMMENT "错误码",
`error_msg` VARCHAR(1024) COMMENT "错误详细信息",
`op_id` BIGINT COMMENT "负责人id",
`op_time` DATETIME COMMENT "处理时间"
)
DUPLICATE KEY(`timestamp`, `type`)
... /* 省略 Partition 和 Distribution 信息 */

动态分区

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
CREATE TABLE test.test_tb
(
signalid varchar(32),
callid varchar(32),
caller varchar(32),
callee varchar(32),
day_id DATE
)
ENGINE=olap
PARTITION BY RANGE (day_id)
(
PARTITION p1 VALUES LESS THAN ("2021-05-01"),
PARTITION p2 VALUES LESS THAN ("2021-06-01"),
PARTITION p3 VALUES LESS THAN ("2021-07-01"),
PARTITION p4 VALUES LESS THAN ("2021-08-01")
)
DISTRIBUTED BY HASH(day_id) BUCKETS 31
PROPERTIES (
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.end" = "3",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "10"
);

dynamic_partition.enable:默认为false,true表示开启动态分区
dynamic_partition.time_unit:按什么字时间进行分区,可选择为HOUR,DAY,WEEK,MONTH
dynamic_partition.end:用于指定提前创建的分区数量,值必须大于0
dynamic_partition.prefix:用于指定自动创建的分区名前缀,但是本文已经制定分区值了,就用不上这个参数
dynamic_partition.buckets:用于指定自动创建的分区分桶数量

前缀索引

不同于传统的数据库设计,Doris 不支持在任意列上创建索引。Doris 这类 MPP 架构的 OLAP 数据库,通常都是通过提高并发,来处理大量数据的。

本质上,Doris 的数据存储在类似 SSTable(Sorted String Table)的数据结构中。该结构是一种有序的数据结构,可以按照指定的列进行排序存储。在这种数据结构上,以排序列作为条件进行查找,会非常的高效。

在 Aggregate、Uniq 和 Duplicate 三种数据模型中。底层的数据存储,是按照各自建表语句中,AGGREGATE KEY、UNIQ KEY 和 DUPLICATE KEY 中指定的列进行排序存储的

而前缀索引,即在排序的基础上,实现的一种根据给定前缀列,快速查询数据的索引方式。

我们将一行数据的前 36 个字节 作为这行数据的前缀索引。当遇到 VARCHAR 类型时,前缀索引会直接截断。我们举例说明:

  1. 以下表结构的前缀索引为 user_id(8Byte) + age(4Bytes) + message(prefix 20 Bytes)。
ColumnName Type
user_id BIGINT
age INT
message VARCHAR(100)
max_dwell_time DATETIME
min_dwell_time DATETIME
  1. 以下表结构的前缀索引为 user_name(20 Bytes)。即使没有达到 36 个字节,因为遇到 VARCHAR,所以直接截断,不再往后继续。
ColumnName Type
user_name VARCHAR(20)
age INT
message VARCHAR(100)
max_dwell_time DATETIME
min_dwell_time DATETIME

当我们的查询条件,是前缀索引的前缀时,可以极大的加快查询速度。比如在第一个例子中,我们执行如下查询:

1
SELECT * FROM table WHERE user_id=1829239 and age=20;

该查询的效率会远高于如下查询:

1
SELECT * FROM table WHERE age=20

所以在建表时,正确的选择列顺序,能够极大地提高查询效率

因为建表时已经指定了列顺序,所以一个表只有一种前缀索引。这对于使用其他不能命中前缀索引的列作为条件进行的查询来说,效率上可能无法满足需求。因此,我们可以通过创建 ROLLUP 来人为的调整列顺序。举例说明。

Base 表结构如下:

ColumnName Type
user_id BIGINT
age INT
message VARCHAR(100)
max_dwell_time DATETIME
min_dwell_time DATETIME

我们可以在此基础上创建一个 ROLLUP 表:

ColumnName Type
age INT
user_id BIGINT
message VARCHAR(100)
max_dwell_time DATETIME
min_dwell_time DATETIME

索引

前面的查询实践中已经介绍过 Doris 的前缀索引,即 Doris 会把 Base/Rollup 表中的前 36 个字节(有 varchar 类型则可能导致前缀索引不满 36 个字节,varchar 会截断前缀索引,并且最多使用 varchar 的 20 个字节)在底层存储引擎单独生成一份排序的稀疏索引数据(数据也是排序的,用索引定位,然后在数据中做二分查找),然后在查询的时候会根据查询中的条件来匹配每个 Base/Rollup 的前缀索引,并且选择出匹配前缀索引最长的一个 Base/Rollup。

物化视图

物化视图是将预先计算(根据定义好的 SELECT 语句)好的数据集,存储在 Doris 中的一个特殊的表。

  • 对于那些经常重复的使用相同的子查询结果的查询性能大幅提升。
  • Doris自动维护物化视图的数据,无论是新的导入,还是删除操作都能保证base 表和物化视图表的数据一致性。无需任何额外的人工维护成本。
  • 查询时,会自动匹配到最优物化视图,并直接从物化视图中读取数据。

物化视图则在覆盖了 Rollup 的功能的同时,还能支持更丰富的聚合函数。所以物化视图其实是 Rollup 的一个超集

1
2
3
4
5
6
7
create table sales_records(record_id int, seller_id int, store_id int, sale_date date, sale_amt bigint) distributed by hash(record_id) properties("replication_num" = "1");

create materialized view store_amt as select store_id, sum(sale_amt) from sales_records group by store_id;

desc sales_records all;

DROP MATERIALIZED VIEW

ENGINE

本示例中,ENGINE 的类型是 olap,即默认的 ENGINE 类型。在 Doris 中,只有这个 ENGINE 类型是由 Doris 负责数据管理和存储的。其他 ENGINE 类型,如 mysql、broker、es 等等,本质上只是对外部其他数据库或系统中的表的映射,以保证 Doris 可以读取这些数据。而 Doris 本身并不创建、管理和存储任何非 olap ENGINE 类型的表和数据。


01简介
https://jiajun.xyz/2021/11/04/bigdata/12Doris/01简介/
作者
Lambda
发布于
2021年11月4日
许可协议