14TableAPI&SQL
本文最后更新于 2022-08-28 11:30:56
TableAPI&SQL
Apache Flink 有两种关系型 API 来做流批统一处理:Table API 和 SQL。Table API 是用于 Scala 和 Java 语言的查询API,它可以用一种非常直观的方式来组合使用选取、过滤、join 等关系型算子。Flink SQL 是基于 Apache Calcite 来实现的标准 SQL。
依赖
<!-- Either... -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.13.0</version>
<scope>provided</scope>
</dependency>
<!-- or... -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>1.13.0</version>
<scope>provided</scope>
</dependency>
<!-- Either... (for the old planner that was available before Flink 1.9) -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.13.0</version>
<scope>provided</scope>
</dependency>
<!-- or.. (for the new Blink planner) -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.13.0</version>
<scope>provided</scope>
</dependency>
<!--扩展依赖,可以自定义function或自定义解析-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.13.2</version>
<scope>provided</scope>
</dependency>两种Planner区别
- Blink 将批处理作业视作流处理的一种特例。严格来说,
Table和DataSet之间不支持相互转换,并且批处理作业也不会转换成DataSet程序而是转换成DataStream程序,流处理作业也一样。 - Blink 计划器不支持
BatchTableSource,而是使用有界的StreamTableSource来替代。 - 旧计划器和 Blink 计划器中
FilterableTableSource的实现是不兼容的。旧计划器会将PlannerExpression下推至FilterableTableSource,而 Blink 计划器则是将Expression下推。 - 基于字符串的键值配置选项仅在 Blink 计划器中使用
PlannerConfig在两种计划器中的实现(CalciteConfig)是不同的。- Blink 计划器会将多sink(multiple-sinks)优化成一张有向无环图(DAG),
TableEnvironment和StreamTableEnvironment都支持该特性。旧计划器总是将每个sink都优化成一个新的有向无环图,且所有图相互独立。 - 旧计划器目前不支持 catalog 统计数据,而 Blink 支持。
blink
不论输入数据源是流式的还是批式的,Table API 和 SQL 查询都会被转换成 DataStream程序。查询在内部表示为逻辑查询计划,并被翻译成两个阶段:
- 优化逻辑执行计划
- 翻译成 DataStream 程序
Table API 或者 SQL 查询在下列情况下会被翻译:
- 当
TableEnvironment.executeSql()被调用时。该方法是用来执行一个 SQL 语句,一旦该方法被调用, SQL 语句立即被翻译。 - 当
Table.executeInsert()被调用时。该方法是用来将一个表的内容插入到目标表中,一旦该方法被调用, TABLE API 程序立即被翻译。 - 当
Table.execute()被调用时。该方法是用来将一个表的内容收集到本地,一旦该方法被调用, TABLE API 程序立即被翻译。 - 当
StatementSet.execute()被调用时。Table(通过StatementSet.addInsert()输出给某个Sink)和 INSERT 语句 (通过调用StatementSet.addInsertSql())会先被缓存到StatementSet中,StatementSet.execute()方法被调用时,所有的 sink 会被优化成一张有向无环图。 - 当
Table被转换成DataStream时。转换完成后,它就成为一个普通的 DataStream 程序,并会在调用StreamExecutionEnvironment.execute()时被执行。 - 从 1.11 版本开始,
sqlUpdate方法 和insertInto方法被废弃,从这两个方法构建的 Table 程序必须通过StreamTableEnvironment.execute()方法执行,而不能通过StreamExecutionEnvironment.execute()方法来执行。
old
Table API 和 SQL 查询会被翻译成 DataStream或者 DataSet程序, 这取决于它们的输入数据源是流式的还是批式的。查询在内部表示为逻辑查询计划,并被翻译成两个阶段:
- 优化逻辑执行计划
- 翻译成 DataStream 或 DataSet 程序
Table API 或者 SQL 查询在下列情况下会被翻译:
- 当
TableEnvironment.executeSql()被调用时。该方法是用来执行一个 SQL 语句,一旦该方法被调用, SQL 语句立即被翻译。 - 当
Table.executeInsert()被调用时。该方法是用来将一个表的内容插入到目标表中,一旦该方法被调用, TABLE API 程序立即被翻译。 - 当
Table.execute()被调用时。该方法是用来将一个表的内容收集到本地,一旦该方法被调用, TABLE API 程序立即被翻译。 - 当
StatementSet.execute()被调用时。Table(通过StatementSet.addInsert()输出给某个Sink)和 INSERT 语句 (通过调用StatementSet.addInsertSql())会先被缓存到StatementSet中,StatementSet.execute()方法被调用时,所有的 sink 会被优化成一张有向无环图。 - 对于 Streaming 而言,当
Table被转换成DataStream时触发翻译。转换完成后,它就成为一个普通的 DataStream 程序,并会在调用StreamExecutionEnvironment.execute()时被执行。对于 Batch 而言,Table被转换成DataSet时触发翻译。转换完成后,它就成为一个普通的 DataSet 程序,并会在调用ExecutionEnvironment.execute()时被执行。 - 从 1.11 版本开始,
sqlUpdate方法 和insertInto方法被废弃。对于 Streaming 而言,如果一个 Table 程序是从这两个方法构建出来的,必须通过StreamTableEnvironment.execute()方法执行,而不能通过StreamExecutionEnvironment.execute()方法执行;对于 Batch 而言,如果一个 Table 程序是从这两个方法构建出来的,必须通过BatchTableEnvironment.execute()方法执行,而不能通过ExecutionEnvironment.execute()方法执行。
创建TableEnvironment
TableEnvironment 是 Table API 和 SQL 的核心概念。它负责:
- 在内部的 catalog 中注册
Table - 注册外部的 catalog
- 加载可插拔模块
- 执行 SQL 查询
- 注册自定义函数 (scalar、table 或 aggregation)
- 将
DataStream或DataSet转换成Table - 持有对
ExecutionEnvironment或StreamExecutionEnvironment的引用
Table 总是与特定的 TableEnvironment 绑定。不能在同一条查询中使用不同 TableEnvironment 中的表,例如,对它们进行 join 或 union 操作。
// **********************
// FLINK STREAMING QUERY
// **********************
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
val fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build()
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings)
// or val fsTableEnv = TableEnvironment.create(fsSettings)
// ******************
// FLINK BATCH QUERY
// ******************
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.BatchTableEnvironment
val fbEnv = ExecutionEnvironment.getExecutionEnvironment
val fbTableEnv = BatchTableEnvironment.create(fbEnv)
// **********************
// BLINK STREAMING QUERY
// **********************
//1.13.0如不值当settings默认使用该方式创建
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)
// or val bsTableEnv = TableEnvironment.create(bsSettings)
// ******************
// BLINK BATCH QUERY
// ******************
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}
val bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
val bbTableEnv = TableEnvironment.create(bbSettings)
//如果只有一种planner的jar包,可以使用useAnyPlanner, discovered automatically创建表
TableEnvironment 维护着一个由标识符(identifier)创建的表 catalog 的映射。标识符由三个部分组成:catalog 名称、数据库名称以及对象名称。如果 catalog 或者数据库没有指明,就会使用当前默认值
Table 可以是虚拟的(视图 VIEWS)也可以是常规的(表 TABLES)。视图 VIEWS可以从已经存在的Table中创建,一般是 Table API 或者 SQL 的查询结果。 表TABLES描述的是外部数据,例如文件、数据库表或者消息队列。
临时表(Temporary Table)和永久表(Permanent Table)
表可以是临时的,并与单个 Flink 会话(session)的生命周期相关,也可以是永久的,并且在多个 Flink 会话和群集(cluster)中可见。
永久表需要 catalog(例如 Hive Metastore)以维护表的元数据。一旦永久表被创建,它将对任何连接到 catalog 的 Flink 会话可见且持续存在,直至被明确删除。
临时表通常保存于内存中并且仅在创建它们的 Flink 会话持续期间存在。这些表对于其它会话是不可见的。它们不与任何 catalog 或者数据库绑定但可以在一个命名空间(namespace)中创建。即使它们对应的数据库被删除,临时表也不会被删除。
使用与已存在的永久表相同的标识符去注册临时表。临时表会屏蔽永久表,并且只要临时表存在,永久表就无法访问。
// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section
// table is the result of a simple projection query
val projTable: Table = tableEnv.from("X").select(...)
// register the Table projTable as table "projectedTable"
//创建临时视图
tableEnv.createTemporaryView("projectedTable", projTable)
//创建临时表
tableEnvironment
.connect(...)
.withFormat(...)
.withSchema(...)
.inAppendMode()
.createTemporaryTable("MyTable")
//指定当前环境catelog,database
// get a TableEnvironment
val tEnv: TableEnvironment = ...;
tEnv.useCatalog("custom_catalog")
tEnv.useDatabase("custom_database")
val table: Table = ...;
// register the view named 'exampleView' in the catalog named 'custom_catalog'
// in the database named 'custom_database'
tableEnv.createTemporaryView("exampleView", table)
// register the view named 'exampleView' in the catalog named 'custom_catalog'
// in the database named 'other_database'
tableEnv.createTemporaryView("other_database.exampleView", table)
// register the view named 'example.View' in the catalog named 'custom_catalog'
// in the database named 'custom_database'
tableEnv.createTemporaryView("`example.View`", table)
// register the view named 'exampleView' in the catalog named 'other_catalog'
// in the database named 'other_database'
tableEnv.createTemporaryView("other_catalog.other_database.exampleView", table)从传统数据库系统的角度来看,Table 对象与 VIEW 视图非常像。也就是,定义了 Table 的查询是没有被优化的, 而且会被内嵌到另一个引用了这个注册了的 Table的查询中。如果多个查询都引用了同一个注册了的Table,那么它会被内嵌每个查询中并被执行多次, 也就是说注册了的Table的结果不会被共享(注:Blink 计划器的TableEnvironment会优化成只执行一次)。
查询表
Table API 是关于 Scala 和 Java 的集成语言式查询 API。与 SQL 相反,Table API 的查询不是由字符串指定,而是在宿主语言中逐步构建。
// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section
// register Orders table
// scan registered Orders table
val orders = tableEnv.from("Orders")
// compute revenue for all customers from France
//$号需要隐式引用
val revenue = orders
.filter($"cCountry" === "FRANCE")
.groupBy($"cID", $"cName")
.select($"cID", $"cName", $"revenue".sum AS "revSum")
//使用sql
val revenue = tableEnv.sqlQuery("""
|SELECT cID, cName, SUM(revenue) AS revSum
|FROM Orders
|WHERE cCountry = 'FRANCE'
|GROUP BY cID, cName
""".stripMargin)
输出表
Table 通过写入 TableSink 输出。TableSink 是一个通用接口,用于支持多种文件格式(如 CSV、Apache Parquet、Apache Avro)、存储系统(如 JDBC、Apache HBase、Apache Cassandra、Elasticsearch)或消息队列系统(如 Apache Kafka、RabbitMQ)。
批处理 Table 只能写入 BatchTableSink,而流处理 Table 需要指定写入 AppendStreamTableSink,RetractStreamTableSink 或者 UpsertStreamTableSink。
// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section
// create an output Table
val schema = new Schema()
.field("a", DataTypes.INT())
.field("b", DataTypes.STRING())
.field("c", DataTypes.BIGINT())
tableEnv.connect(new FileSystem().path("/path/to/file"))
.withFormat(new Csv().fieldDelimiter('|').deriveSchema())
.withSchema(schema)
.createTemporaryTable("CsvSinkTable")
// compute a result Table using Table API operators and/or SQL queries
val result: Table = ...
// emit the result Table to the registered TableSink
result.executeInsert("CsvSinkTable")解释表
Table API 提供了一种机制来解释计算 Table 的逻辑和优化查询计划。 这是通过 Table.explain() 方法或者 StatementSet.explain() 方法来完成的。
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)
val table1 = env.fromElements((1, "hello")).toTable(tEnv, $"count", $"word")
val table2 = env.fromElements((1, "hello")).toTable(tEnv, $"count", $"word")
val table = table1
.where($"word".like("F%"))
.unionAll(table2)
println(table.explain())DataStream与Table互转
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
// create environments of both APIs
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)
// create a DataStream
val dataStream = env.fromElements("Alice", "Bob", "John")
// interpret the insert-only DataStream as a Table
val inputTable = tableEnv.fromDataStream(dataStream)
// register the Table object as a view and query it
tableEnv.createTemporaryView("InputTable", inputTable)
val resultTable = tableEnv.sqlQuery("SELECT UPPER(f0) FROM InputTable")
// interpret the insert-only Table as a DataStream again
val resultStream = tableEnv.toDataStream(resultTable)
// add a printing sink and execute in DataStream API
resultStream.print()
env.execute()数据类型到 Table Schema 的映射
基于位置映射
// get a TableEnvironment
val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section
val stream: DataStream[(Long, Int)] = ...
// convert DataStream into Table with field "myLong" only
val table: Table = tableEnv.fromDataStream(stream, $"myLong")
// convert DataStream into Table with field names "myLong" and "myInt"
val table: Table = tableEnv.fromDataStream(stream, $"myLong", $"myInt")基于名称映射
// get a TableEnvironment
val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section
val stream: DataStream[(Long, Int)] = ...
// convert DataStream into Table with field "_2" only
val table: Table = tableEnv.fromDataStream(stream, $"_2")
// convert DataStream into Table with swapped fields
val table: Table = tableEnv.fromDataStream(stream, $"_2", $"_1")
// convert DataStream into Table with swapped fields and field names "myInt" and "myLong"
val table: Table = tableEnv.fromDataStream(stream, $"_2" as "myInt", $"_1" as "myLong")//=========================================
//ROW类型
// get a TableEnvironment
val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section
// DataStream of Row with two fields "name" and "age" specified in `RowTypeInfo`
val stream: DataStream[Row] = ...
// convert DataStream into Table with renamed field names "myName", "myAge" (position-based)
val table: Table = tableEnv.fromDataStream(stream, $"myName", $"myAge")
// convert DataStream into Table with renamed fields "myName", "myAge" (name-based)
val table: Table = tableEnv.fromDataStream(stream, $"name" as "myName", $"age" as "myAge")
// convert DataStream into Table with projected field "name" (name-based)
val table: Table = tableEnv.fromDataStream(stream, $"name")
// convert DataStream into Table with projected and renamed field "myName" (name-based)
val table: Table = tableEnv.fromDataStream(stream, $"name" as "myName")
//===========================================
//POJO类型
// get a TableEnvironment
val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section
// Person is a POJO with field names "name" and "age"
val stream: DataStream[Person] = ...
// convert DataStream into Table with renamed fields "myAge", "myName" (name-based)
val table: Table = tableEnv.fromDataStream(stream, $"age" as "myAge", $"name" as "myName")
// convert DataStream into Table with projected field "name" (name-based)
val table: Table = tableEnv.fromDataStream(stream, $"name")
// convert DataStream into Table with projected and renamed field "myName" (name-based)
val table: Table = tableEnv.fromDataStream(stream, $"name" as "myName")
//=============================================
//Tuple
// get a TableEnvironment
val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section
val stream: DataStream[(Long, String)] = ...
// convert DataStream into Table with field names "myLong", "myString" (position-based)
val table: Table = tableEnv.fromDataStream(stream, $"myLong", $"myString")
// convert DataStream into Table with reordered fields "_2", "_1" (name-based)
val table: Table = tableEnv.fromDataStream(stream, $"_2", $"_1")
// convert DataStream into Table with projected field "_2" (name-based)
val table: Table = tableEnv.fromDataStream(stream, $"_2")
// convert DataStream into Table with reordered and aliased fields "myString", "myLong" (name-based)
val table: Table = tableEnv.fromDataStream(stream, $"_2" as "myString", $"_1" as "myLong")
// define case class
case class Person(name: String, age: Int)
val streamCC: DataStream[Person] = ...
// convert DataStream into Table with field names 'myName, 'myAge (position-based)
val table = tableEnv.fromDataStream(streamCC, $"myName", $"myAge")
// convert DataStream into Table with reordered and aliased fields "myAge", "myName" (name-based)
val table: Table = tableEnv.fromDataStream(stream, $"age" as "myAge", $"name" as "myName")
//==============================================
//原子类型
// get a TableEnvironment
val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section
val stream: DataStream[Long] = ...
// convert DataStream into Table with field name "myLong"
val table: Table = tableEnv.fromDataStream(stream, $"myLong")fromDataStream
import org.apache.flink.api.scala._
import java.time.Instant;
// some example case class
case class User(name: String, score: java.lang.Integer, event_time: java.time.Instant)
// create a DataStream
val dataStream = env.fromElements(
User("Alice", 4, Instant.ofEpochMilli(1000)),
User("Bob", 6, Instant.ofEpochMilli(1001)),
User("Alice", 10, Instant.ofEpochMilli(1002)))
// === EXAMPLE 1 ===
// derive all physical columns automatically
val table = tableEnv.fromDataStream(dataStream)
table.printSchema()
// prints:
// (
// `name` STRING,
// `score` INT,
// `event_time` TIMESTAMP_LTZ(9)
// )
// === EXAMPLE 2 ===
// derive all physical columns automatically
// but add computed columns (in this case for creating a proctime attribute column)
val table = tableEnv.fromDataStream(
dataStream,
Schema.newBuilder()
.columnByExpression("proc_time", "PROCTIME()")
.build())
table.printSchema()
// prints:
// (
// `name` STRING,
// `score` INT NOT NULL,
// `event_time` TIMESTAMP_LTZ(9),
// `proc_time` TIMESTAMP_LTZ(3) NOT NULL *PROCTIME* AS PROCTIME()
//)
// === EXAMPLE 3 ===
// derive all physical columns automatically
// but add computed columns (in this case for creating a rowtime attribute column)
// and a custom watermark strategy
val table =
tableEnv.fromDataStream(
dataStream,
Schema.newBuilder()
.columnByExpression("rowtime", "CAST(event_time AS TIMESTAMP_LTZ(3))")
.watermark("rowtime", "rowtime - INTERVAL '10' SECOND")
.build())
table.printSchema()
// prints:
// (
// `name` STRING,
// `score` INT,
// `event_time` TIMESTAMP_LTZ(9),
// `rowtime` TIMESTAMP_LTZ(3) *ROWTIME* AS CAST(event_time AS TIMESTAMP_LTZ(3)),
// WATERMARK FOR `rowtime`: TIMESTAMP_LTZ(3) AS rowtime - INTERVAL '10' SECOND
// )
// === EXAMPLE 4 ===
// derive all physical columns automatically
// but access the stream record's timestamp for creating a rowtime attribute column
// also rely on the watermarks generated in the DataStream API
// we assume that a watermark strategy has been defined for `dataStream` before
// (not part of this example)
val table =
tableEnv.fromDataStream(
dataStream,
Schema.newBuilder()
.columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)")
.watermark("rowtime", "SOURCE_WATERMARK()")
.build())
table.printSchema()
// prints:
// (
// `name` STRING,
// `score` INT,
// `event_time` TIMESTAMP_LTZ(9),
// `rowtime` TIMESTAMP_LTZ(3) *ROWTIME* METADATA,
// WATERMARK FOR `rowtime`: TIMESTAMP_LTZ(3) AS SOURCE_WATERMARK()
// )
// === EXAMPLE 5 ===
// define physical columns manually
// in this example,
// - we can reduce the default precision of timestamps from 9 to 3
// - we also project the columns and put `event_time` to the beginning
val table =
tableEnv.fromDataStream(
dataStream,
Schema.newBuilder()
.column("event_time", "TIMESTAMP_LTZ(3)")
.column("name", "STRING")
.column("score", "INT")
.watermark("event_time", "SOURCE_WATERMARK()")
.build())
table.printSchema()
// prints:
// (
// `event_time` TIMESTAMP_LTZ(3) *ROWTIME*,
// `name` VARCHAR(200),
// `score` INT
// )
// note: the watermark strategy is not shown due to the inserted column reordering projection
public static class User {
public final String name;
public final Integer score;
public User(String name, Integer score) {
this.name = name;
this.score = score;
}
}
// create a DataStream
DataStream<User> dataStream = env.fromElements(
new User("Alice", 4),
new User("Bob", 6),
new User("Alice", 10));
// since fields of a RAW type cannot be accessed, every stream record is treated as an atomic type
// leading to a table with a single column `f0`
Table table = tableEnv.fromDataStream(dataStream);
table.printSchema();
// prints:
// (
// `f0` RAW('User', '...')
// )
// instead, declare a more useful data type for columns using the Table API's type system
// in a custom schema and rename the columns in a following `as` projection
Table table = tableEnv
.fromDataStream(
dataStream,
Schema.newBuilder()
.column("f0", DataTypes.of(User.class))
.build())
.as("user");
table.printSchema();
// prints:
// (
// `user` *User<`name` STRING,`score` INT>*
// )
// data types can be extracted reflectively as above or explicitly defined
Table table3 = tableEnv
.fromDataStream(
dataStream,
Schema.newBuilder()
.column(
"f0",
DataTypes.STRUCTURED(
User.class,
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("score", DataTypes.INT())))
.build())
.as("user");
table.printSchema();
// prints:
// (
// `user` *User<`name` STRING,`score` INT>*
// )// create some DataStream
val dataStream: DataStream[(Long, String)] = env.fromElements(
(12L, "Alice"),
(0L, "Bob"))
// === EXAMPLE 1 ===
// register the DataStream as view "MyView" in the current session
// all columns are derived automatically
tableEnv.createTemporaryView("MyView", dataStream)
tableEnv.from("MyView").printSchema()
// prints:
// (
// `_1` BIGINT NOT NULL,
// `_2` STRING
// )
// === EXAMPLE 2 ===
// register the DataStream as view "MyView" in the current session,
// provide a schema to adjust the columns similar to `fromDataStream`
// in this example, the derived NOT NULL information has been removed
tableEnv.createTemporaryView(
"MyView",
dataStream,
Schema.newBuilder()
.column("_1", "BIGINT")
.column("_2", "STRING")
.build())
tableEnv.from("MyView").printSchema()
// prints:
// (
// `_1` BIGINT,
// `_2` STRING
// )
// === EXAMPLE 3 ===
// use the Table API before creating the view if it is only about renaming columns
tableEnv.createTemporaryView(
"MyView",
tableEnv.fromDataStream(dataStream).as("id", "name"))
tableEnv.from("MyView").printSchema()
// prints:
// (
// `id` BIGINT NOT NULL,
// `name` STRING
// )toDataStream
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.table.api.DataTypes
case class User(name: String, score: java.lang.Integer, event_time: java.time.Instant)
tableEnv.executeSql(
"""
CREATE TABLE GeneratedTable (
name STRING,
score INT,
event_time TIMESTAMP_LTZ(3),
WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
)
WITH ('connector'='datagen')
"""
)
val table = tableEnv.from("GeneratedTable")
// === EXAMPLE 1 ===
// use the default conversion to instances of Row
// since `event_time` is a single rowtime attribute, it is inserted into the DataStream
// metadata and watermarks are propagated
val dataStream: DataStream[Row] = tableEnv.toDataStream(table)
// === EXAMPLE 2 ===
// a data type is extracted from class `User`,
// the planner reorders fields and inserts implicit casts where possible to convert internal
// data structures to the desired structured type
// since `event_time` is a single rowtime attribute, it is inserted into the DataStream
// metadata and watermarks are propagated
val dataStream: DataStream[User] = tableEnv.toDataStream(table, classOf[User])
// data types can be extracted reflectively as above or explicitly defined
val dataStream: DataStream[User] =
tableEnv.toDataStream(
table,
DataTypes.STRUCTURED(
classOf[User],
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("score", DataTypes.INT()),
DataTypes.FIELD("event_time", DataTypes.TIMESTAMP_LTZ(3))))fromChangelogStream
/ === EXAMPLE 1 ===
// interpret the stream as a retract stream
// create a changelog DataStream
val dataStream = env.fromElements(
Row.ofKind(RowKind.INSERT, "Alice", Int.box(12)),
Row.ofKind(RowKind.INSERT, "Bob", Int.box(5)),
Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", Int.box(12)),
Row.ofKind(RowKind.UPDATE_AFTER, "Alice", Int.box(100))
)(Types.ROW(Types.STRING, Types.INT))
// interpret the DataStream as a Table
val table = tableEnv.fromChangelogStream(dataStream)
// register the table under a name and perform an aggregation
tableEnv.createTemporaryView("InputTable", table)
tableEnv
.executeSql("SELECT f0 AS name, SUM(f1) AS score FROM InputTable GROUP BY f0")
.print()
// prints:
// +----+--------------------------------+-------------+
// | op | name | score |
// +----+--------------------------------+-------------+
// | +I | Bob | 5 |
// | +I | Alice | 12 |
// | -D | Alice | 12 |
// | +I | Alice | 100 |
// +----+--------------------------------+-------------+
// === EXAMPLE 2 ===
// interpret the stream as an upsert stream (without a need for UPDATE_BEFORE)
// create a changelog DataStream
val dataStream = env.fromElements(
Row.ofKind(RowKind.INSERT, "Alice", Int.box(12)),
Row.ofKind(RowKind.INSERT, "Bob", Int.box(5)),
Row.ofKind(RowKind.UPDATE_AFTER, "Alice", Int.box(100))
)(Types.ROW(Types.STRING, Types.INT))
// interpret the DataStream as a Table
val table =
tableEnv.fromChangelogStream(
dataStream,
Schema.newBuilder().primaryKey("f0").build(),
ChangelogMode.upsert())
// register the table under a name and perform an aggregation
tableEnv.createTemporaryView("InputTable", table)
tableEnv
.executeSql("SELECT f0 AS name, SUM(f1) AS score FROM InputTable GROUP BY f0")
.print()
// prints:
// +----+--------------------------------+-------------+
// | op | name | score |
// +----+--------------------------------+-------------+
// | +I | Bob | 5 |
// | +I | Alice | 12 |
// | -U | Alice | 12 |
// | +U | Alice | 100 |
// +----+--------------------------------+-------------+toChangelogStream
// create Table with event-time
tableEnv.executeSql(
"""
CREATE TABLE GeneratedTable (
name STRING,
score INT,
event_time TIMESTAMP_LTZ(3),
WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
)
WITH ('connector'='datagen')
"""
)
val table = tableEnv.from("GeneratedTable")
// === EXAMPLE 1 ===
// convert to DataStream in the simplest and most general way possible (no event-time)
val simpleTable = tableEnv
.fromValues(row("Alice", 12), row("Alice", 2), row("Bob", 12))
.as("name", "score")
.groupBy($"name")
.select($"name", $"score".sum())
tableEnv
.toChangelogStream(simpleTable)
.executeAndCollect()
.foreach(println)
// prints:
// +I[Bob, 12]
// +I[Alice, 12]
// -U[Alice, 12]
// +U[Alice, 14]
// === EXAMPLE 2 ===
// convert to DataStream in the simplest and most general way possible (with event-time)
val dataStream: DataStream[Row] = tableEnv.toChangelogStream(table)
// since `event_time` is a single time attribute in the schema, it is set as the
// stream record's timestamp by default; however, at the same time, it remains part of the Row
dataStream.process(new ProcessFunction[Row, Unit] {
override def processElement(
row: Row,
ctx: ProcessFunction[Row, Unit]#Context,
out: Collector[Unit]): Unit = {
// prints: [name, score, event_time]
println(row.getFieldNames(true))
// timestamp exists twice
assert(ctx.timestamp() == row.getFieldAs[Instant]("event_time").toEpochMilli)
}
})
env.execute()
// === EXAMPLE 3 ===
// convert to DataStream but write out the time attribute as a metadata column which means
// it is not part of the physical schema anymore
val dataStream: DataStream[Row] = tableEnv.toChangelogStream(
table,
Schema.newBuilder()
.column("name", "STRING")
.column("score", "INT")
.columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)")
.build())
// the stream record's timestamp is defined by the metadata; it is not part of the Row
dataStream.process(new ProcessFunction[Row, Unit] {
override def processElement(
row: Row,
ctx: ProcessFunction[Row, Unit]#Context,
out: Collector[Unit]): Unit = {
// prints: [name, score]
println(row.getFieldNames(true))
// timestamp exists once
println(ctx.timestamp())
}
})
env.execute()
// === EXAMPLE 4 ===
// for advanced users, it is also possible to use more internal data structures for better
// efficiency
// note that this is only mentioned here for completeness because using internal data structures
// adds complexity and additional type handling
// however, converting a TIMESTAMP_LTZ column to `Long` or STRING to `byte[]` might be convenient,
// also structured types can be represented as `Row` if needed
val dataStream: DataStream[Row] = tableEnv.toChangelogStream(
table,
Schema.newBuilder()
.column(
"name",
DataTypes.STRING().bridgedTo(classOf[StringData]))
.column(
"score",
DataTypes.INT())
.column(
"event_time",
DataTypes.TIMESTAMP_LTZ(3).bridgedTo(class[Long]))
.build())
// leads to a stream of Row(name: StringData, score: Integer, event_time: Long)更新表
changelog
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.types.Row
// create environments of both APIs
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)
// create a DataStream
val dataStream = env.fromElements(
Row.of("Alice", Int.box(12)),
Row.of("Bob", Int.box(10)),
Row.of("Alice", Int.box(100))
)(Types.ROW(Types.STRING, Types.INT))
// interpret the insert-only DataStream as a Table
val inputTable = tableEnv.fromDataStream(dataStream).as("name", "score")
// register the Table object as a view and query it
// the query contains an aggregation that produces updates
tableEnv.createTemporaryView("InputTable", inputTable)
val resultTable = tableEnv.sqlQuery("SELECT name, SUM(score) FROM InputTable GROUP BY name")
// interpret the updating Table as a changelog DataStream
val resultStream = tableEnv.toChangelogStream(resultTable)
// add a printing sink and execute in DataStream API
resultStream.print()
env.execute()
// prints:
// +I[Alice, 12] --insert
// +I[Bob, 10]
// -U[Alice, 12] --update before
// +U[Alice, 112] --update after