15流式概念

本文最后更新于 2022-08-28 11:30:56

流式概念

动态表&连续查询

动态表 是 Flink 的支持流数据的 Table API 和 SQL 的核心概念。与表示批处理数据的静态表不同,动态表是随时间变化的。可以像查询静态批处理表一样查询它们。查询动态表将生成一个 连续查询一个连续查询永远不会终止,结果会生成一个动态表。查询不断更新其(动态)结果表,以反映其(动态)输入表上的更改。

Dynamic tables

在流上定义表

为了使用关系查询处理流,必须将其转换成 Table。从概念上讲,流的每条记录都被解释为对结果表的 INSERT 操作。本质上我们正在从一个 INSERT-only 的 changelog 流构建表。Append mode

连续查询

在动态表上计算一个连续查询,并生成一个新的动态表。与批处理查询不同,连续查询从不终止,并根据其输入表上的更新更新其结果表。在任何时候,连续查询的结果在语义上与以批处理模式在输入表快照上执行的相同查询的结果相同。

查询更新先前输出的结果,即定义结果表的 changelog 流包含 INSERTUPDATE 操作。

Continuous Non-Windowed Query

查询只附加到结果表,即结果表的 changelog 流只包含 INSERT 操作。

Continuous Group-Window Query

查询限制

有些查询代价太高而无法计算,这可能是由于它们需要维护的状态大小,也可能是由于计算更新代价太高。

  • 状态大小: 连续查询在无界流上计算,通常应该运行数周或数月。因此,连续查询处理的数据总量可能非常大。必须更新先前输出的结果的查询需要维护所有输出的行,以便能够更新它们。例如,第一个查询示例需要存储每个用户的 URL 计数,以便能够增加该计数并在输入表接收新行时发送新结果。如果只跟踪注册用户,则要维护的计数数量可能不会太高。但是,如果未注册的用户分配了一个惟一的用户名,那么要维护的计数数量将随着时间增长,并可能最终导致查询失败。
1
2
3
SELECT user, COUNT(url)
FROM clicks
GROUP BY user;
  • 计算更新: 有些查询需要重新计算和更新大量已输出的结果行,即使只添加或更新一条输入记录。显然,这样的查询不适合作为连续查询执行。下面的查询就是一个例子,它根据最后一次单击的时间为每个用户计算一个 RANK。一旦 click 表接收到一个新行,用户的 lastAction 就会更新,并必须计算一个新的排名。然而,由于两行不能具有相同的排名,所以所有较低排名的行也需要更新。
1
2
3
4
SELECT user, RANK() OVER (ORDER BY lastAction)
FROM (
SELECT user, MAX(cTime) AS lastAction FROM clicks GROUP BY user
);

表到流的转换

模式 INSERT UPDATE DELETE
Append 支持 不支持 不支持
Upsert 支持 支持 支持
Retract 支持 支持 支持

Append-only 流

仅通过 INSERT 操作修改的动态表可以通过输出插入的行转换为流。

结果一旦输出以后便不会再有变更,Append 输出模式的最大特性是不可变性(immutability),而不可变性最令人向往的优势便是安全,比如线程安全或者 Event Sourcing 的可恢复性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
public class Test {
public static void main(String[] args) throws Exception{

args=new String[]{"--application","flink/src/main/java/com/bigdata/flink/tableSqlAppendUpsertRetract/application.properties"};

//1、解析命令行参数
ParameterTool fromArgs = ParameterTool.fromArgs(args);
ParameterTool parameterTool = ParameterTool.fromPropertiesFile(fromArgs.getRequired("application"));

String kafkaBootstrapServers = parameterTool.getRequired("kafkaBootstrapServers");
String browseTopic = parameterTool.getRequired("browseTopic");
String browseTopicGroupID = parameterTool.getRequired("browseTopicGroupID");

//2、设置运行环境
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);
streamEnv.setParallelism(1);

//3、注册Kafka数据源
Properties browseProperties = new Properties();
browseProperties.put("bootstrap.servers",kafkaBootstrapServers);
browseProperties.put("group.id",browseTopicGroupID);
DataStream<UserBrowseLog> browseStream=streamEnv
.addSource(new FlinkKafkaConsumer010<>(browseTopic, new SimpleStringSchema(), browseProperties))
.process(new BrowseKafkaProcessFunction());
tableEnv.registerDataStream("source_kafka_browse_log",browseStream,"userID,eventTime,eventType,productID,productPrice,eventTimeTimestamp");


//4、注册AppendStreamTableSink
String[] sinkFieldNames={"userID","eventTime","eventType","productID","productPrice","eventTimeTimestamp"};
DataType[] sinkFieldTypes={DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING(),DataTypes.INT(),DataTypes.BIGINT()};
TableSink<Row> myAppendStreamTableSink = new MyAppendStreamTableSink(sinkFieldNames,sinkFieldTypes);
tableEnv.registerTableSink("sink_stdout",myAppendStreamTableSink);


//5、连续查询
//将userID为user_1的记录输出到外部存储
String sql="insert into sink_stdout select userID,eventTime,eventType,productID,productPrice,eventTimeTimestamp from source_kafka_browse_log where userID='user_1'";
tableEnv.sqlUpdate(sql);

//6、开始执行
tableEnv.execute(Test.class.getSimpleName());

}


/**
* 解析Kafka数据
* 将Kafka JSON String 解析成JavaBean: UserBrowseLog
* UserBrowseLog(String userID, String eventTime, String eventType, String productID, int productPrice, long eventTimeTimestamp)
*/
private static class BrowseKafkaProcessFunction extends ProcessFunction<String, UserBrowseLog> {
@Override
public void processElement(String value, Context ctx, Collector<UserBrowseLog> out) throws Exception {
try {

UserBrowseLog log = JSON.parseObject(value, UserBrowseLog.class);

// 增加一个long类型的时间戳
// 指定eventTime为yyyy-MM-dd HH:mm:ss格式的北京时间
java.time.format.DateTimeFormatter format = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
OffsetDateTime eventTime = LocalDateTime.parse(log.getEventTime(), format).atOffset(ZoneOffset.of("+08:00"));
// 转换成毫秒时间戳
long eventTimeTimestamp = eventTime.toInstant().toEpochMilli();
log.setEventTimeTimestamp(eventTimeTimestamp);

out.collect(log);
}catch (Exception ex){
log.error("解析Kafka数据异常...",ex);
}
}
}

/**
* 自定义 AppendStreamTableSink
* AppendStreamTableSink 适用于表只有Insert的场景
*/
private static class MyAppendStreamTableSink implements AppendStreamTableSink<Row> {

private TableSchema tableSchema;

public MyAppendStreamTableSink(String[] fieldNames, DataType[] fieldTypes) {
this.tableSchema = TableSchema.builder().fields(fieldNames,fieldTypes).build();
}

@Override
public TableSchema getTableSchema() {
return tableSchema;
}

@Override
public DataType getConsumedDataType() {
return tableSchema.toRowDataType();
}

// 已过时
@Override
public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
return null;
}

// 已过时
@Override
public void emitDataStream(DataStream<Row> dataStream) {}

@Override
public DataStreamSink<Row> consumeDataStream(DataStream<Row> dataStream) {
return dataStream.addSink(new SinkFunction());
}

private static class SinkFunction extends RichSinkFunction<Row> {
public SinkFunction() {
}

@Override
public void invoke(Row value, Context context) throws Exception {
System.out.println(value);
}
}
}

}
//user_1,2016-01-01 10:02:06,browse,product_5,20,1451613726000
//user_1,2016-01-01 10:02:00,browse,product_5,20,1451613720000
//user_1,2016-01-01 10:02:15,browse,product_5,20,1451613735000
//user_1,2016-01-01 10:02:02,browse,product_5,20,1451613722000
//user_1,2016-01-01 10:02:16,browse,product_5,20,1451613736000

Retract流

retract 流包含两种类型的 message: add messagesretract messages 。通过将INSERT 操作编码为 add message、将 DELETE 操作编码为 retract message、将 UPDATE 操作编码为更新(先前)行的 retract message 和更新(新)行的 add message,将动态表转换为 retract 流。下图显示了将动态表转换为 retract 流的过程。

Retract 模式会将更新分成两条表示增减量的消息,一条是 (false, OldRow) 的撤回(Retract)操作,一条是 (true, NewRow) 的积累(Accumulate)操作。这样的好处是,在主键出现变化的情况下,Upsert 输出模式无法撤回旧主键的记录,导致数据不准确,而 Retract 模式则不存在这个问题

Dynamic tables

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
public class Test {
public static void main(String[] args) throws Exception{

args=new String[]{"--application","flink/src/main/java/com/bigdata/flink/tableSqlAppendUpsertRetract/application.properties"};

//1、解析命令行参数
ParameterTool fromArgs = ParameterTool.fromArgs(args);
ParameterTool parameterTool = ParameterTool.fromPropertiesFile(fromArgs.getRequired("application"));

String kafkaBootstrapServers = parameterTool.getRequired("kafkaBootstrapServers");
String browseTopic = parameterTool.getRequired("browseTopic");
String browseTopicGroupID = parameterTool.getRequired("browseTopicGroupID");

//2、设置运行环境
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);
streamEnv.setParallelism(1);

//3、注册Kafka数据源
Properties browseProperties = new Properties();
browseProperties.put("bootstrap.servers",kafkaBootstrapServers);
browseProperties.put("group.id",browseTopicGroupID);
DataStream<UserBrowseLog> browseStream=streamEnv
.addSource(new FlinkKafkaConsumer010<>(browseTopic, new SimpleStringSchema(), browseProperties))
.process(new BrowseKafkaProcessFunction());
tableEnv.registerDataStream("source_kafka_browse_log",browseStream,"userID,eventTime,eventType,productID,productPrice,eventTimeTimestamp");


//4、注册RetractStreamTableSink
String[] sinkFieldNames={"userID","browseNumber"};
DataType[] sinkFieldTypes={DataTypes.STRING(),DataTypes.BIGINT()};
RetractStreamTableSink<Row> myRetractStreamTableSink = new MyRetractStreamTableSink(sinkFieldNames,sinkFieldTypes);
tableEnv.registerTableSink("sink_stdout",myRetractStreamTableSink);


//5、连续查询
//统计每个Uid的浏览次数
String sql="insert into sink_stdout select userID,count(1) as browseNumber from source_kafka_browse_log where userID in ('user_1','user_2') group by userID ";
tableEnv.sqlUpdate(sql);


//6、开始执行
tableEnv.execute(Test.class.getSimpleName());


}


/**
* 解析Kafka数据
* 将Kafka JSON String 解析成JavaBean: UserBrowseLog
* UserBrowseLog(String userID, String eventTime, String eventType, String productID, int productPrice, long eventTimeTimestamp)
*/
private static class BrowseKafkaProcessFunction extends ProcessFunction<String, UserBrowseLog> {
@Override
public void processElement(String value, Context ctx, Collector<UserBrowseLog> out) throws Exception {
try {

UserBrowseLog log = JSON.parseObject(value, UserBrowseLog.class);

// 增加一个long类型的时间戳
// 指定eventTime为yyyy-MM-dd HH:mm:ss格式的北京时间
java.time.format.DateTimeFormatter format = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
OffsetDateTime eventTime = LocalDateTime.parse(log.getEventTime(), format).atOffset(ZoneOffset.of("+08:00"));
// 转换成毫秒时间戳
long eventTimeTimestamp = eventTime.toInstant().toEpochMilli();
log.setEventTimeTimestamp(eventTimeTimestamp);

out.collect(log);
}catch (Exception ex){
log.error("解析Kafka数据异常...",ex);
}
}
}

/**
* 自定义RetractStreamTableSink
*
* Table在内部被转换成具有Add(增加)和Retract(撤消/删除)的消息流,最终交由DataStream的SinkFunction处理。
* DataStream里的数据格式是Tuple2类型,如Tuple2<Boolean, Row>。
* Boolean是Add(增加)或Retract(删除)的flag(标识)。Row是真正的数据类型。
* Table中的Insert被编码成一条Add消息。如Tuple2<True, Row>。
* Table中的Update被编码成两条消息。一条删除消息Tuple2<False, Row>,一条增加消息Tuple2<True, Row>。
*/
private static class MyRetractStreamTableSink implements RetractStreamTableSink<Row> {

private TableSchema tableSchema;

public MyRetractStreamTableSink(String[] fieldNames, DataType[] fieldTypes) {
this.tableSchema = TableSchema.builder().fields(fieldNames,fieldTypes).build();
}

@Override
public TableSchema getTableSchema() {
return tableSchema;
}

// 已过时
@Override
public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
return null;
}

// 已过时
@Override
public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {}

// 最终会转换成DataStream处理
@Override
public DataStreamSink<Tuple2<Boolean, Row>> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
return dataStream.addSink(new SinkFunction());
}

@Override
public TypeInformation<Row> getRecordType() {
return new RowTypeInfo(tableSchema.getFieldTypes(),tableSchema.getFieldNames());
}

private static class SinkFunction extends RichSinkFunction<Tuple2<Boolean, Row>> {
public SinkFunction() {
}

@Override
public void invoke(Tuple2<Boolean, Row> value, Context context) throws Exception {
Boolean flag = value.f0;
if(flag){
System.out.println("增加... "+value);
}else {
System.out.println("删除... "+value);
}
}
}
}
}
//增加... (true,user_1,1)
//删除... (false,user_1,1) //user_1更新时被编译成两条消息 //先是一条删除的消息
//增加... (true,user_1,2) //再是一条增加的消息
//增加... (true,user_2,1) //同理user_2
//删除... (false,user_1,2)
//增加... (true,user_1,3)
//删除... (false,user_1,3)
//增加... (true,user_1,4)
//删除... (false,user_2,1)
//增加... (true,user_2,2)
//删除... (false,user_1,4)
//增加... (true,user_1,5)

Upsert 流

upsert 流包含两种类型的 message: upsert messagesdelete messages。转换为 upsert 流的动态表需要(可能是组合的)唯一键。通过将 INSERTUPDATE 操作编码为 upsert message,将 DELETE 操作编码为 delete message ,将具有唯一键的动态表转换为流。消费流的算子需要知道唯一键的属性,以便正确地应用 message。与 retract 流的主要区别在于 UPDATE 操作是用单个 message 编码的,因此效率更高。下图显示了将动态表转换为 upsert 流的过程。

支持 Append-Only 的操作和在有主键的前提下的 UPDATE 和 DELETE 操作。Upsert 模式依赖业务主键来实现输出结果的更新和删除,因此非常适合 KV 数据库,比如HBase、JDBC 的 TableSink 都使用了这种方式。

Dynamic tables

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
public class Test {
public static void main(String[] args) throws Exception{

args=new String[]{"--application","flink/src/main/java/com/bigdata/flink/tableSqlAppendUpsertRetract/application.properties"};

//1、解析命令行参数
ParameterTool fromArgs = ParameterTool.fromArgs(args);
ParameterTool parameterTool = ParameterTool.fromPropertiesFile(fromArgs.getRequired("application"));

String kafkaBootstrapServers = parameterTool.getRequired("kafkaBootstrapServers");
String browseTopic = parameterTool.getRequired("browseTopic");
String browseTopicGroupID = parameterTool.getRequired("browseTopicGroupID");

//2、设置运行环境
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);
streamEnv.setParallelism(1);

//3、注册Kafka数据源
Properties browseProperties = new Properties();
browseProperties.put("bootstrap.servers",kafkaBootstrapServers);
browseProperties.put("group.id",browseTopicGroupID);
DataStream<UserBrowseLog> browseStream=streamEnv
.addSource(new FlinkKafkaConsumer010<>(browseTopic, new SimpleStringSchema(), browseProperties))
.process(new BrowseKafkaProcessFunction());
tableEnv.registerDataStream("source_kafka_browse_log",browseStream,"userID,eventTime,eventType,productID,productPrice,eventTimeTimestamp");

//4、注册UpsertStreamTableSink
String[] sinkFieldNames={"userID","browseNumber"};
DataType[] sinkFieldTypes={DataTypes.STRING(),DataTypes.BIGINT()};
UpsertStreamTableSink<Row> myRetractStreamTableSink = new MyUpsertStreamTableSink(sinkFieldNames,sinkFieldTypes);
tableEnv.registerTableSink("sink_stdout",myRetractStreamTableSink);

//5、连续查询
//统计每个Uid的浏览次数
String sql="insert into sink_stdout select userID,count(1) as browseNumber from source_kafka_browse_log where userID in ('user_1','user_2') group by userID ";
tableEnv.sqlUpdate(sql);

//6、开始执行
tableEnv.execute(Test.class.getSimpleName());
}


/**
* 解析Kafka数据
* 将Kafka JSON String 解析成JavaBean: UserBrowseLog
* UserBrowseLog(String userID, String eventTime, String eventType, String productID, int productPrice, long eventTimeTimestamp)
*/
private static class BrowseKafkaProcessFunction extends ProcessFunction<String, UserBrowseLog> {
@Override
public void processElement(String value, Context ctx, Collector<UserBrowseLog> out) throws Exception {
try {

UserBrowseLog log = JSON.parseObject(value, UserBrowseLog.class);

// 增加一个long类型的时间戳
// 指定eventTime为yyyy-MM-dd HH:mm:ss格式的北京时间
java.time.format.DateTimeFormatter format = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
OffsetDateTime eventTime = LocalDateTime.parse(log.getEventTime(), format).atOffset(ZoneOffset.of("+08:00"));
// 转换成毫秒时间戳
long eventTimeTimestamp = eventTime.toInstant().toEpochMilli();
log.setEventTimeTimestamp(eventTimeTimestamp);

out.collect(log);
}catch (Exception ex){
log.error("解析Kafka数据异常...",ex);
}
}
}

/**
* 自定义UpsertStreamTableSink
* Table在内部被转换成具有Add(增加)和Retract(撤消/删除)的消息流,最终交由DataStream的SinkFunction处理。
* Boolean是Add(增加)或Retract(删除)的flag(标识)。Row是真正的数据类型。
* Table中的Insert被编码成一条Add消息。如Tuple2<True, Row>。
* Table中的Update被编码成一条Add消息。如Tuple2<True, Row>。
* 在SortLimit(即order by ... limit ...)的场景下,被编码成两条消息。一条删除消息Tuple2<False, Row>,一条增加消息Tuple2<True, Row>。
*/
private static class MyUpsertStreamTableSink implements UpsertStreamTableSink<Row> {

private TableSchema tableSchema;

public MyUpsertStreamTableSink(String[] fieldNames, DataType[] fieldTypes) {
this.tableSchema = TableSchema.builder().fields(fieldNames,fieldTypes).build();
}

@Override
public TableSchema getTableSchema() {
return tableSchema;
}


// 设置Unique Key
// 如上SQL中有GroupBy,则这里的唯一键会自动被推导为GroupBy的字段
@Override
public void setKeyFields(String[] keys) {}

// 是否只有Insert
// 如上SQL场景,需要Update,则这里被推导为isAppendOnly=false
@Override
public void setIsAppendOnly(Boolean isAppendOnly) {}

@Override
public TypeInformation<Row> getRecordType() {
return new RowTypeInfo(tableSchema.getFieldTypes(),tableSchema.getFieldNames());
}

// 已过时
@Override
public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {}

// 最终会转换成DataStream处理
@Override
public DataStreamSink<Tuple2<Boolean, Row>> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
return dataStream.addSink(new SinkFunction());
}

@Override
public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
return null;
}

private static class SinkFunction extends RichSinkFunction<Tuple2<Boolean, Row>> {
public SinkFunction() {
}

@Override
public void invoke(Tuple2<Boolean, Row> value, Context context) throws Exception {
Boolean flag = value.f0;
if(flag){
System.out.println("增加... "+value);
}else {
System.out.println("删除... "+value);
}
}
}
}
}
//增加... (true,user_1,1)
//增加... (true,user_1,2) //user_1更新时被编译成一条消息
//增加... (true,user_1,3) //user_1更新时被编译成一条消息
//增加... (true,user_1,4)
//增加... (true,user_2,1) //同理user_2更新
//增加... (true,user_2,2)
//增加... (true,user_1,5)

时间属性

每种类型的表都可以有时间属性,可以在用CREATE TABLE DDL创建表的时候指定、也可以在 DataStream 中指定、也可以在定义 TableSource 时指定。一旦时间属性定义好,它就可以像普通列一样使用,也可以在时间相关的操作中使用。

只要时间属性没有被修改,而是简单地从一个表传递到另一个表,它就仍然是一个有效的时间属性。时间属性可以像普通的时间戳的列一样被使用和计算。

1
2
3
4
5
6
7
val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) // default

// 或者:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

处理时间

在DDL中定义

PROCTIME() 就可以定义处理时间,函数 PROCTIME() 的返回类型是 TIMESTAMP_LTZ

1
2
3
4
5
6
7
8
9
10
11
CREATE TABLE user_actions (
user_name STRING,
data STRING,
user_action_time AS PROCTIME() -- 声明一个额外的列作为处理时间属性
) WITH (
...
);

SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);

DataStream 到 Table 转换时定义

1
2
3
4
5
6
val stream: DataStream[(String, String)] = ...

// 声明一个额外的字段作为时间属性字段
val table = tEnv.fromDataStream(stream, $"UserActionTimestamp", $"user_name", $"data", $"user_action_time".proctime)

val windowedTable = table.window(Tumble over 10.minutes on $"user_action_time" as "userActionWindow")

TableSource 定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 定义一个由处理时间属性的 table source
class UserActionSource extends StreamTableSource[Row] with DefinedProctimeAttribute {

override def getReturnType = {
val names = Array[String]("user_name" , "data")
val types = Array[TypeInformation[_]](Types.STRING, Types.STRING)
Types.ROW(names, types)
}

override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
// create stream
val stream = ...
stream
}

override def getProctimeAttribute = {
// 这个名字的列会被追加到最后,作为第三列
"user_action_time"
}
}

// register table source
tEnv.registerTableSource("user_actions", new UserActionSource)

val windowedTable = tEnv
.from("user_actions")
.window(Tumble over 10.minutes on $"user_action_time" as "userActionWindow")

事件时间

DDL中定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
CREATE TABLE user_actions (
user_name STRING,
data STRING,
user_action_time TIMESTAMP(3),
-- 声明 user_action_time 是事件时间属性,并且用 延迟 5 秒的策略来生成 watermark
WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
...
);

SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);

-- 源数据中的时间戳数据表示为一个纪元 (epoch) 时间,通常是一个 long 值,例如 1618989564564,建议将事件时间属性定义在 TIMESTAMP_LTZ 列上:
CREATE TABLE user_actions (
user_name STRING,
data STRING,
ts BIGINT,
time_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
-- declare time_ltz as event time attribute and use 5 seconds delayed watermark strategy
WATERMARK FOR time_ltz AS time_ltz - INTERVAL '5' SECOND
) WITH (
...
);

SELECT TUMBLE_START(time_ltz, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(time_ltz, INTERVAL '10' MINUTE);

DataStream 到 Table 转换时定义

事件时间属性可以用 .rowtime 后缀在定义 DataStream schema 的时候来定义。时间戳和 watermark在这之前一定是在 DataStream 上已经定义好了。 在从 DataStream 转换到 Table 时,由于 DataStream 没有时区概念,因此 Flink 总是将 rowtime 属性解析成 TIMESTAMP WITHOUT TIME ZONE 类型,并且将所有事件时间的值都视为 UTC 时区的值。

在从 DataStreamTable 转换时定义事件时间属性有两种方式。取决于用 .rowtime 后缀修饰的字段名字是否是已有字段,事件时间字段可以是:

  • 在 schema 的结尾追加一个新的字段
  • 替换一个已经存在的字段。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// Option 1:

// 基于 stream 中的事件产生时间戳和 watermark
val stream: DataStream[(String, String)] = inputStream.assignTimestampsAndWatermarks(...)

// 声明一个额外的逻辑字段作为事件时间属性
val table = tEnv.fromDataStream(stream, $"user_name", $"data", $"user_action_time".rowtime)


// Option 2:

// 从第一个字段获取事件时间,并且产生 watermark
val stream: DataStream[(Long, String, String)] = inputStream.assignTimestampsAndWatermarks(...)

// 第一个字段已经用作事件时间抽取了,不用再用一个新字段来表示事件时间了
val table = tEnv.fromDataStream(stream, $"user_action_time".rowtime, $"user_name", $"data")

// Usage:

val windowedTable = table.window(Tumble over 10.minutes on $"user_action_time" as "userActionWindow")

TableSource 定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// 定义一个有事件时间属性的 table source
class UserActionSource extends StreamTableSource[Row] with DefinedRowtimeAttributes {

override def getReturnType = {
val names = Array[String]("user_name" , "data", "user_action_time")
val types = Array[TypeInformation[_]](Types.STRING, Types.STRING, Types.LONG)
Types.ROW(names, types)
}

override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
// 构造 DataStream
// ...
// 基于 "user_action_time" 定义 watermark
val stream = inputStream.assignTimestampsAndWatermarks(...)
stream
}

override def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor] = {
// 标记 "user_action_time" 字段是事件时间字段
// 给 "user_action_time" 构造一个时间属性描述符
val rowtimeAttrDescr = new RowtimeAttributeDescriptor(
"user_action_time",
new ExistingField("user_action_time"),
new AscendingTimestamps)
val listRowtimeAttrDescr = Collections.singletonList(rowtimeAttrDescr)
listRowtimeAttrDescr
}
}

// register the table source
tEnv.registerTableSource("user_actions", new UserActionSource)

val windowedTable = tEnv
.from("user_actions")
.window(Tumble over 10.minutes on $"user_action_time" as "userActionWindow")

时区

TIMESTAMP

  • TIMESTAMP(p)TIMESTAMP(p) WITHOUT TIME ZONE 的简写, 精度 p 支持的范围是0-9, 默认是6。
  • TIMESTAMP 用于描述年, 月, 日, 小时, 分钟, 秒 和 小数秒对应的时间戳。
  • TIMESTAMP 可以通过一个字符串来指定,例如:
1
2
3
4
Flink SQL> SELECT TIMESTAMP '1970-01-01 00:00:04.001';
+-------------------------+
| 1970-01-01 00:00:04.001 |
+-------------------------+

TIMESTAMP_LTZ

TIMESTAMP_LTZ(p)TIMESTAMP(p) WITH LOCAL TIME ZONE 的简写, 精度 p 支持的范围是0-9, 默认是6

在计算和可视化时, 每个 TIMESTAMP_LTZ 类型的数据都是使用的 session (会话)中配置的时区。

TIMESTAMP_LTZ 没有字符串表达形式因此无法通过字符串来指定, 可以通过一个 long 类型的 epoch 时间来转化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Flink SQL> CREATE VIEW T1 AS SELECT TO_TIMESTAMP_LTZ(4001, 3);
Flink SQL> SET table.local-time-zone=UTC;
Flink SQL> SELECT * FROM T1;
+---------------------------+
| TO_TIMESTAMP_LTZ(4001, 3) |
+---------------------------+
| 1970-01-01 00:00:04.001 |
+---------------------------+

Flink SQL> SET table.local-time-zone=Asia/Shanghai;
Flink SQL> SELECT * FROM T1;
+---------------------------+
| TO_TIMESTAMP_LTZ(4001, 3) |
+---------------------------+
| 1970-01-01 08:00:04.001 |

时区设置

1
2
3
4
5
6
7
8
-- 设置为 UTC 时区
Flink SQL> SET table.local-time-zone=UTC;

-- 设置为上海时区
Flink SQL> SET table.local-time-zone=Asia/Shanghai;

-- 设置为Los_Angeles时区
Flink SQL> SET table.local-time-zone=America/Los_Angeles;
1
2
3
4
5
6
7
8
9
10
11
val envSetting = EnvironmentSettings.newInstance.build
val tEnv = TableEnvironment.create(envSetting)

// 设置为 UTC 时区
tEnv.getConfig.setLocalTimeZone(ZoneId.of("UTC"))

// 设置为上海时区
tEnv.getConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai"))

// 设置为 Los_Angeles 时区
tEnv.getConfig.setLocalTimeZone(ZoneId.of("America/Los_Angeles"))

时区影响的函数

1
2
3
4
5
6
7
8
9
10
11
12
+------------------------+-----------------------------+-------+-----+--------+-----------+
| name | type | null | key | extras | watermark |
+------------------------+-----------------------------+-------+-----+--------+-----------+
| LOCALTIME | TIME(0) | false | | | |
| LOCALTIMESTAMP | TIMESTAMP(3) | false | | | |
| CURRENT_DATE | DATE | false | | | |
| CURRENT_TIME | TIME(0) | false | | | |
| CURRENT_TIMESTAMP | TIMESTAMP_LTZ(3) | false | | | |
|CURRENT_ROW_TIMESTAMP() | TIMESTAMP_LTZ(3) | false | | | |
| NOW() | TIMESTAMP_LTZ(3) | false | | | |
| PROCTIME() | TIMESTAMP_LTZ(3) *PROCTIME* | false | | | |
+------------------------+-----------------------------+-------+-----+--------+-----------+

15流式概念
https://jiajun.xyz/2021/09/08/bigdata/11Flink/01flink_study1/15流式概念/
作者
Lambda
发布于
2021年9月8日
更新于
2022年8月28日
许可协议