本文最后更新于 2022-08-28 11:30:56
流式概念 动态表&连续查询 动态表 是 Flink 的支持流数据的 Table API 和 SQL 的核心概念。与表示批处理数据的静态表不同,动态表是随时间变化的。可以像查询静态批处理表一样查询它们。查询动态表将生成一个 连续查询 。一个连续查询永远不会终止,结果会生成一个动态表 。查询不断更新其(动态)结果表,以反映其(动态)输入表上的更改。
在流上定义表 为了使用关系查询处理流,必须将其转换成 Table。从概念上讲,流的每条记录都被解释为对结果表的 INSERT 操作。本质上我们正在从一个 INSERT-only 的 changelog 流构建表。
连续查询 在动态表上计算一个连续查询,并生成一个新的动态表。与批处理查询不同,连续查询从不终止 ,并根据其输入表上的更新更新其结果表。在任何时候,连续查询的结果在语义上与以批处理模式在输入表快照上执行的相同查询的结果相同。
查询更新先前输出的结果,即定义结果表的 changelog 流包含 INSERT 和 UPDATE 操作。
查询只附加到结果表,即结果表的 changelog 流只包含 INSERT 操作。
查询限制 有些查询代价太高而无法计算,这可能是由于它们需要维护的状态大小,也可能是由于计算更新代价太高。
状态大小: 连续查询在无界流上计算,通常应该运行数周或数月。因此,连续查询处理的数据总量可能非常大。必须更新先前输出的结果的查询需要维护所有输出的行,以便能够更新它们。例如,第一个查询示例需要存储每个用户的 URL 计数,以便能够增加该计数并在输入表接收新行时发送新结果。如果只跟踪注册用户,则要维护的计数数量可能不会太高。但是,如果未注册的用户分配了一个惟一的用户名,那么要维护的计数数量将随着时间增长,并可能最终导致查询失败。
1 2 3 SELECT user , COUNT (url)FROM clicksGROUP BY user ;
计算更新: 有些查询需要重新计算和更新大量已输出的结果行,即使只添加或更新一条输入记录。显然,这样的查询不适合作为连续查询执行。下面的查询就是一个例子,它根据最后一次单击的时间为每个用户计算一个 RANK。一旦 click 表接收到一个新行,用户的 lastAction 就会更新,并必须计算一个新的排名。然而,由于两行不能具有相同的排名,所以所有较低排名的行也需要更新。
1 2 3 4 SELECT user , RANK () OVER (ORDER BY lastAction)FROM ( SELECT user , MAX (cTime) AS lastAction FROM clicks GROUP BY user );
表到流的转换
模式
INSERT
UPDATE
DELETE
Append
支持
不支持
不支持
Upsert
支持
支持
支持
Retract
支持
支持
支持
Append-only 流 仅通过 INSERT 操作修改的动态表可以通过输出插入的行转换为流。
结果一旦输出以后便不会再有变更,Append 输出模式的最大特性是不可变性(immutability),而不可变性最令人向往的优势便是安全,比如线程安全或者 Event Sourcing 的可恢复性
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 public class Test { public static void main (String[] args) throws Exception{ args=new String []{"--application" ,"flink/src/main/java/com/bigdata/flink/tableSqlAppendUpsertRetract/application.properties" }; ParameterTool fromArgs = ParameterTool.fromArgs(args); ParameterTool parameterTool = ParameterTool.fromPropertiesFile(fromArgs.getRequired("application" )); String kafkaBootstrapServers = parameterTool.getRequired("kafkaBootstrapServers" ); String browseTopic = parameterTool.getRequired("browseTopic" ); String browseTopicGroupID = parameterTool.getRequired("browseTopicGroupID" ); EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build(); StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings); streamEnv.setParallelism(1 ); Properties browseProperties = new Properties (); browseProperties.put("bootstrap.servers" ,kafkaBootstrapServers); browseProperties.put("group.id" ,browseTopicGroupID); DataStream<UserBrowseLog> browseStream=streamEnv .addSource(new FlinkKafkaConsumer010 <>(browseTopic, new SimpleStringSchema (), browseProperties)) .process(new BrowseKafkaProcessFunction ()); tableEnv.registerDataStream("source_kafka_browse_log" ,browseStream,"userID,eventTime,eventType,productID,productPrice,eventTimeTimestamp" ); String[] sinkFieldNames={"userID" ,"eventTime" ,"eventType" ,"productID" ,"productPrice" ,"eventTimeTimestamp" }; DataType[] sinkFieldTypes={DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING(),DataTypes.INT(),DataTypes.BIGINT()}; TableSink<Row> myAppendStreamTableSink = new MyAppendStreamTableSink (sinkFieldNames,sinkFieldTypes); tableEnv.registerTableSink("sink_stdout" ,myAppendStreamTableSink); String sql="insert into sink_stdout select userID,eventTime,eventType,productID,productPrice,eventTimeTimestamp from source_kafka_browse_log where userID='user_1'" ; tableEnv.sqlUpdate(sql); tableEnv.execute(Test.class.getSimpleName()); } private static class BrowseKafkaProcessFunction extends ProcessFunction <String, UserBrowseLog> { @Override public void processElement (String value, Context ctx, Collector<UserBrowseLog> out) throws Exception { try { UserBrowseLog log = JSON.parseObject(value, UserBrowseLog.class); java.time.format.DateTimeFormatter format = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss" ); OffsetDateTime eventTime = LocalDateTime.parse(log.getEventTime(), format).atOffset(ZoneOffset.of("+08:00" )); long eventTimeTimestamp = eventTime.toInstant().toEpochMilli(); log.setEventTimeTimestamp(eventTimeTimestamp); out.collect(log); }catch (Exception ex){ log.error("解析Kafka数据异常..." ,ex); } } } private static class MyAppendStreamTableSink implements AppendStreamTableSink <Row> { private TableSchema tableSchema; public MyAppendStreamTableSink (String[] fieldNames, DataType[] fieldTypes) { this .tableSchema = TableSchema.builder().fields(fieldNames,fieldTypes).build(); } @Override public TableSchema getTableSchema () { return tableSchema; } @Override public DataType getConsumedDataType () { return tableSchema.toRowDataType(); } @Override public TableSink<Row> configure (String[] fieldNames, TypeInformation<?>[] fieldTypes) { return null ; } @Override public void emitDataStream (DataStream<Row> dataStream) {} @Override public DataStreamSink<Row> consumeDataStream (DataStream<Row> dataStream) { return dataStream.addSink(new SinkFunction ()); } private static class SinkFunction extends RichSinkFunction <Row> { public SinkFunction () { } @Override public void invoke (Row value, Context context) throws Exception { System.out.println(value); } } } }
Retract流 retract 流包含两种类型的 message: add messages 和 retract messages 。通过将INSERT 操作编码为 add message、将 DELETE 操作编码为 retract message、将 UPDATE 操作编码为更新(先前)行的 retract message 和更新(新)行的 add message,将动态表转换为 retract 流。下图显示了将动态表转换为 retract 流的过程。
Retract 模式会将更新分成两条表示增减量的消息,一条是 (false, OldRow) 的撤回(Retract)操作,一条是 (true, NewRow) 的积累(Accumulate)操作。这样的好处是,在主键出现变化的情况下,Upsert 输出模式无法撤回旧主键的记录,导致数据不准确,而 Retract 模式则不存在这个问题 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 public class Test { public static void main (String[] args) throws Exception{ args=new String []{"--application" ,"flink/src/main/java/com/bigdata/flink/tableSqlAppendUpsertRetract/application.properties" }; ParameterTool fromArgs = ParameterTool.fromArgs(args); ParameterTool parameterTool = ParameterTool.fromPropertiesFile(fromArgs.getRequired("application" )); String kafkaBootstrapServers = parameterTool.getRequired("kafkaBootstrapServers" ); String browseTopic = parameterTool.getRequired("browseTopic" ); String browseTopicGroupID = parameterTool.getRequired("browseTopicGroupID" ); EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build(); StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings); streamEnv.setParallelism(1 ); Properties browseProperties = new Properties (); browseProperties.put("bootstrap.servers" ,kafkaBootstrapServers); browseProperties.put("group.id" ,browseTopicGroupID); DataStream<UserBrowseLog> browseStream=streamEnv .addSource(new FlinkKafkaConsumer010 <>(browseTopic, new SimpleStringSchema (), browseProperties)) .process(new BrowseKafkaProcessFunction ()); tableEnv.registerDataStream("source_kafka_browse_log" ,browseStream,"userID,eventTime,eventType,productID,productPrice,eventTimeTimestamp" ); String[] sinkFieldNames={"userID" ,"browseNumber" }; DataType[] sinkFieldTypes={DataTypes.STRING(),DataTypes.BIGINT()}; RetractStreamTableSink<Row> myRetractStreamTableSink = new MyRetractStreamTableSink (sinkFieldNames,sinkFieldTypes); tableEnv.registerTableSink("sink_stdout" ,myRetractStreamTableSink); String sql="insert into sink_stdout select userID,count(1) as browseNumber from source_kafka_browse_log where userID in ('user_1','user_2') group by userID " ; tableEnv.sqlUpdate(sql); tableEnv.execute(Test.class.getSimpleName()); } private static class BrowseKafkaProcessFunction extends ProcessFunction <String, UserBrowseLog> { @Override public void processElement (String value, Context ctx, Collector<UserBrowseLog> out) throws Exception { try { UserBrowseLog log = JSON.parseObject(value, UserBrowseLog.class); java.time.format.DateTimeFormatter format = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss" ); OffsetDateTime eventTime = LocalDateTime.parse(log.getEventTime(), format).atOffset(ZoneOffset.of("+08:00" )); long eventTimeTimestamp = eventTime.toInstant().toEpochMilli(); log.setEventTimeTimestamp(eventTimeTimestamp); out.collect(log); }catch (Exception ex){ log.error("解析Kafka数据异常..." ,ex); } } } private static class MyRetractStreamTableSink implements RetractStreamTableSink <Row> { private TableSchema tableSchema; public MyRetractStreamTableSink (String[] fieldNames, DataType[] fieldTypes) { this .tableSchema = TableSchema.builder().fields(fieldNames,fieldTypes).build(); } @Override public TableSchema getTableSchema () { return tableSchema; } @Override public TableSink<Tuple2<Boolean, Row>> configure (String[] fieldNames, TypeInformation<?>[] fieldTypes) { return null ; } @Override public void emitDataStream (DataStream<Tuple2<Boolean, Row>> dataStream) {} @Override public DataStreamSink<Tuple2<Boolean, Row>> consumeDataStream (DataStream<Tuple2<Boolean, Row>> dataStream) { return dataStream.addSink(new SinkFunction ()); } @Override public TypeInformation<Row> getRecordType () { return new RowTypeInfo (tableSchema.getFieldTypes(),tableSchema.getFieldNames()); } private static class SinkFunction extends RichSinkFunction <Tuple2<Boolean, Row>> { public SinkFunction () { } @Override public void invoke (Tuple2<Boolean, Row> value, Context context) throws Exception { Boolean flag = value.f0; if (flag){ System.out.println("增加... " +value); }else { System.out.println("删除... " +value); } } } } }
Upsert 流 upsert 流包含两种类型的 message: upsert messages 和delete messages 。转换为 upsert 流的动态表需要(可能是组合的)唯一键。通过将 INSERT 和 UPDATE 操作编码为 upsert message,将 DELETE 操作编码为 delete message ,将具有唯一键的动态表转换为流。消费流的算子需要知道唯一键的属性,以便正确地应用 message。与 retract 流的主要区别在于 UPDATE 操作是用单个 message 编码的,因此效率更高 。下图显示了将动态表转换为 upsert 流的过程。
支持 Append-Only 的操作和在有主键的前提下的 UPDATE 和 DELETE 操作。Upsert 模式依赖业务主键来实现输出结果的更新和删除,因此非常适合 KV 数据库 ,比如HBase、JDBC 的 TableSink 都使用了这种方式。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 public class Test { public static void main (String[] args) throws Exception{ args=new String []{"--application" ,"flink/src/main/java/com/bigdata/flink/tableSqlAppendUpsertRetract/application.properties" }; ParameterTool fromArgs = ParameterTool.fromArgs(args); ParameterTool parameterTool = ParameterTool.fromPropertiesFile(fromArgs.getRequired("application" )); String kafkaBootstrapServers = parameterTool.getRequired("kafkaBootstrapServers" ); String browseTopic = parameterTool.getRequired("browseTopic" ); String browseTopicGroupID = parameterTool.getRequired("browseTopicGroupID" ); EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build(); StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings); streamEnv.setParallelism(1 ); Properties browseProperties = new Properties (); browseProperties.put("bootstrap.servers" ,kafkaBootstrapServers); browseProperties.put("group.id" ,browseTopicGroupID); DataStream<UserBrowseLog> browseStream=streamEnv .addSource(new FlinkKafkaConsumer010 <>(browseTopic, new SimpleStringSchema (), browseProperties)) .process(new BrowseKafkaProcessFunction ()); tableEnv.registerDataStream("source_kafka_browse_log" ,browseStream,"userID,eventTime,eventType,productID,productPrice,eventTimeTimestamp" ); String[] sinkFieldNames={"userID" ,"browseNumber" }; DataType[] sinkFieldTypes={DataTypes.STRING(),DataTypes.BIGINT()}; UpsertStreamTableSink<Row> myRetractStreamTableSink = new MyUpsertStreamTableSink (sinkFieldNames,sinkFieldTypes); tableEnv.registerTableSink("sink_stdout" ,myRetractStreamTableSink); String sql="insert into sink_stdout select userID,count(1) as browseNumber from source_kafka_browse_log where userID in ('user_1','user_2') group by userID " ; tableEnv.sqlUpdate(sql); tableEnv.execute(Test.class.getSimpleName()); } private static class BrowseKafkaProcessFunction extends ProcessFunction <String, UserBrowseLog> { @Override public void processElement (String value, Context ctx, Collector<UserBrowseLog> out) throws Exception { try { UserBrowseLog log = JSON.parseObject(value, UserBrowseLog.class); java.time.format.DateTimeFormatter format = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss" ); OffsetDateTime eventTime = LocalDateTime.parse(log.getEventTime(), format).atOffset(ZoneOffset.of("+08:00" )); long eventTimeTimestamp = eventTime.toInstant().toEpochMilli(); log.setEventTimeTimestamp(eventTimeTimestamp); out.collect(log); }catch (Exception ex){ log.error("解析Kafka数据异常..." ,ex); } } } private static class MyUpsertStreamTableSink implements UpsertStreamTableSink <Row> { private TableSchema tableSchema; public MyUpsertStreamTableSink (String[] fieldNames, DataType[] fieldTypes) { this .tableSchema = TableSchema.builder().fields(fieldNames,fieldTypes).build(); } @Override public TableSchema getTableSchema () { return tableSchema; } @Override public void setKeyFields (String[] keys) {} @Override public void setIsAppendOnly (Boolean isAppendOnly) {} @Override public TypeInformation<Row> getRecordType () { return new RowTypeInfo (tableSchema.getFieldTypes(),tableSchema.getFieldNames()); } @Override public void emitDataStream (DataStream<Tuple2<Boolean, Row>> dataStream) {} @Override public DataStreamSink<Tuple2<Boolean, Row>> consumeDataStream (DataStream<Tuple2<Boolean, Row>> dataStream) { return dataStream.addSink(new SinkFunction ()); } @Override public TableSink<Tuple2<Boolean, Row>> configure (String[] fieldNames, TypeInformation<?>[] fieldTypes) { return null ; } private static class SinkFunction extends RichSinkFunction <Tuple2<Boolean, Row>> { public SinkFunction () { } @Override public void invoke (Tuple2<Boolean, Row> value, Context context) throws Exception { Boolean flag = value.f0; if (flag){ System.out.println("增加... " +value); }else { System.out.println("删除... " +value); } } } } }
时间属性 每种类型的表都可以有时间属性,可以在用CREATE TABLE DDL创建表的时候指定、也可以在 DataStream 中指定、也可以在定义 TableSource 时指定。一旦时间属性定义好,它就可以像普通列一样使用,也可以在时间相关的操作中使用。
只要时间属性没有被修改,而是简单地从一个表传递到另一个表,它就仍然是一个有效的时间属性。时间属性可以像普通的时间戳的列一样被使用和计算。
1 2 3 4 5 6 7 val env = StreamExecutionEnvironment .getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic .ProcessingTime )
处理时间 在DDL中定义 PROCTIME() 就可以定义处理时间,函数 PROCTIME() 的返回类型是 TIMESTAMP_LTZ
1 2 3 4 5 6 7 8 9 10 11 CREATE TABLE user_actions ( user_name STRING, data STRING, user_action_time AS PROCTIME() ) WITH ( ... );SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE ), COUNT (DISTINCT user_name)FROM user_actionsGROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE );
DataStream 到 Table 转换时定义 1 2 3 4 5 6 val stream: DataStream [(String , String )] = ...val table = tEnv.fromDataStream(stream, $"UserActionTimestamp" , $"user_name" , $"data" , $"user_action_time" .proctime)val windowedTable = table.window(Tumble over 10. minutes on $"user_action_time" as "userActionWindow" )
TableSource 定义 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 class UserActionSource extends StreamTableSource [Row ] with DefinedProctimeAttribute { override def getReturnType = { val names = Array [String ]("user_name" , "data" ) val types = Array [TypeInformation [_]](Types .STRING , Types .STRING ) Types .ROW (names, types) } override def getDataStream (execEnv: StreamExecutionEnvironment ): DataStream [Row ] = { val stream = ... stream } override def getProctimeAttribute = { "user_action_time" } } tEnv.registerTableSource("user_actions" , new UserActionSource )val windowedTable = tEnv .from("user_actions" ) .window(Tumble over 10. minutes on $"user_action_time" as "userActionWindow" )
事件时间 DDL中定义 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 CREATE TABLE user_actions ( user_name STRING, data STRING, user_action_time TIMESTAMP (3 ), WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND ) WITH ( ... );SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE ), COUNT (DISTINCT user_name)FROM user_actionsGROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE );CREATE TABLE user_actions ( user_name STRING, data STRING, ts BIGINT , time_ltz AS TO_TIMESTAMP_LTZ(ts, 3 ), WATERMARK FOR time_ltz AS time_ltz - INTERVAL '5' SECOND ) WITH ( ... );SELECT TUMBLE_START(time_ltz, INTERVAL '10' MINUTE ), COUNT (DISTINCT user_name)FROM user_actionsGROUP BY TUMBLE(time_ltz, INTERVAL '10' MINUTE );
DataStream 到 Table 转换时定义 事件时间属性可以用 .rowtime 后缀在定义 DataStream schema 的时候来定义。时间戳和 watermark在这之前一定是在 DataStream 上已经定义好了。 在从 DataStream 转换到 Table 时,由于 DataStream 没有时区概念,因此 Flink 总是将 rowtime 属性解析成 TIMESTAMP WITHOUT TIME ZONE 类型,并且将所有事件时间的值都视为 UTC 时区的值。
在从 DataStream 到 Table 转换时定义事件时间属性有两种方式。取决于用 .rowtime 后缀修饰的字段名字是否是已有字段,事件时间字段可以是:
在 schema 的结尾追加一个新的字段
替换一个已经存在的字段。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 val stream: DataStream [(String , String )] = inputStream.assignTimestampsAndWatermarks(...)val table = tEnv.fromDataStream(stream, $"user_name" , $"data" , $"user_action_time" .rowtime)val stream: DataStream [(Long , String , String )] = inputStream.assignTimestampsAndWatermarks(...)val table = tEnv.fromDataStream(stream, $"user_action_time" .rowtime, $"user_name" , $"data" )val windowedTable = table.window(Tumble over 10. minutes on $"user_action_time" as "userActionWindow" )
TableSource 定义 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 class UserActionSource extends StreamTableSource [Row ] with DefinedRowtimeAttributes { override def getReturnType = { val names = Array [String ]("user_name" , "data" , "user_action_time" ) val types = Array [TypeInformation [_]](Types .STRING , Types .STRING , Types .LONG ) Types .ROW (names, types) } override def getDataStream (execEnv: StreamExecutionEnvironment ): DataStream [Row ] = { val stream = inputStream.assignTimestampsAndWatermarks(...) stream } override def getRowtimeAttributeDescriptors : util.List [RowtimeAttributeDescriptor ] = { val rowtimeAttrDescr = new RowtimeAttributeDescriptor ( "user_action_time" , new ExistingField ("user_action_time" ), new AscendingTimestamps ) val listRowtimeAttrDescr = Collections .singletonList(rowtimeAttrDescr) listRowtimeAttrDescr } } tEnv.registerTableSource("user_actions" , new UserActionSource )val windowedTable = tEnv .from("user_actions" ) .window(Tumble over 10. minutes on $"user_action_time" as "userActionWindow" )
时区 TIMESTAMP
TIMESTAMP(p) 是 TIMESTAMP(p) WITHOUT TIME ZONE 的简写, 精度 p 支持的范围是0-9, 默认是6。
TIMESTAMP 用于描述年, 月, 日, 小时, 分钟, 秒 和 小数秒对应的时间戳。
TIMESTAMP 可以通过一个字符串来指定,例如:
1 2 3 4 Flink SQL > SELECT TIMESTAMP '1970-01-01 00:00:04.001' ;+ | 1970 -01 -01 00 :00 :04.001 | +
TIMESTAMP_LTZ TIMESTAMP_LTZ(p) 是 TIMESTAMP(p) WITH LOCAL TIME ZONE 的简写, 精度 p 支持的范围是0-9, 默认是6
在计算和可视化时, 每个 TIMESTAMP_LTZ 类型的数据都是使用的 session (会话)中配置的时区。
TIMESTAMP_LTZ 没有字符串表达形式因此无法通过字符串来指定 , 可以通过一个 long 类型的 epoch 时间来转化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 Flink SQL > CREATE VIEW T1 AS SELECT TO_TIMESTAMP_LTZ(4001 , 3 ); Flink SQL > SET table.local- time - zone= UTC; Flink SQL > SELECT * FROM T1;+ | TO_TIMESTAMP_LTZ(4001 , 3 ) | + | 1970 -01 -01 00 :00 :04.001 | + Flink SQL > SET table.local- time - zone= Asia/ Shanghai; Flink SQL > SELECT * FROM T1;+ | TO_TIMESTAMP_LTZ(4001 , 3 ) | + | 1970 -01 -01 08 :00 :04.001 |
时区设置 1 2 3 4 5 6 7 8 Flink SQL > SET table.local- time - zone= UTC; Flink SQL > SET table.local- time - zone= Asia/ Shanghai; Flink SQL > SET table.local- time - zone= America/ Los_Angeles;
1 2 3 4 5 6 7 8 9 10 11 val envSetting = EnvironmentSettings .newInstance.buildval tEnv = TableEnvironment .create(envSetting) tEnv.getConfig.setLocalTimeZone(ZoneId .of("UTC" )) tEnv.getConfig.setLocalTimeZone(ZoneId .of("Asia/Shanghai" )) tEnv.getConfig.setLocalTimeZone(ZoneId .of("America/Los_Angeles" ))
时区影响的函数
1 2 3 4 5 6 7 8 9 10 11 12 +------------------------+-----------------------------+-------+-----+--------+-----------+ | name | type | null | key | extras | watermark | +------------------------+-----------------------------+-------+-----+--------+-----------+ | LOCALTIME | TIME(0) | false | | | | | LOCALTIMESTAMP | TIMESTAMP(3) | false | | | | | CURRENT_DATE | DATE | false | | | | | CURRENT_TIME | TIME(0) | false | | | | | CURRENT_TIMESTAMP | TIMESTAMP_LTZ(3) | false | | | | |CURRENT_ROW_TIMESTAMP() | TIMESTAMP_LTZ(3) | false | | | | | NOW() | TIMESTAMP_LTZ(3) | false | | | | | PROCTIME() | TIMESTAMP_LTZ(3) *PROCTIME* | false | | | | +------------------------+-----------------------------+-------+-----+--------+-----------+