<!-- Either... (for the old planner that was available before Flink 1.9) --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.11</artifactId> <version>1.13.0</version> <scope>provided</scope> </dependency> <!-- or.. (for the new Blink planner) --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.11</artifactId> <version>1.13.0</version> <scope>provided</scope> </dependency>
val fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build() val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment val fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings) // or val fsTableEnv = TableEnvironment.create(fsSettings)
val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings) // or val bsTableEnv = TableEnvironment.create(bsSettings)
//指定当前环境catelog,database // get a TableEnvironment val tEnv: TableEnvironment = ...; tEnv.useCatalog("custom_catalog") tEnv.useDatabase("custom_database")
val table: Table = ...;
// register the view named 'exampleView' in the catalog named 'custom_catalog' // in the database named 'custom_database' tableEnv.createTemporaryView("exampleView", table)
// register the view named 'exampleView' in the catalog named 'custom_catalog' // in the database named 'other_database' tableEnv.createTemporaryView("other_database.exampleView", table)
// register the view named 'example.View' in the catalog named 'custom_catalog' // in the database named 'custom_database' tableEnv.createTemporaryView("`example.View`", table)
// register the view named 'exampleView' in the catalog named 'other_catalog' // in the database named 'other_database' tableEnv.createTemporaryView("other_catalog.other_database.exampleView", table)
// create environments of both APIs val env = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = StreamTableEnvironment.create(env)
// create a DataStream val dataStream = env.fromElements("Alice", "Bob", "John")
// interpret the insert-only DataStream as a Table val inputTable = tableEnv.fromDataStream(dataStream)
// register the Table object as a view and query it tableEnv.createTemporaryView("InputTable", inputTable) val resultTable = tableEnv.sqlQuery("SELECT UPPER(f0) FROM InputTable")
// interpret the insert-only Table as a DataStream again val resultStream = tableEnv.toDataStream(resultTable)
// add a printing sink and execute in DataStream API resultStream.print() env.execute()
数据类型到 Table Schema 的映射
基于位置映射
1 2 3 4 5 6 7 8 9 10
// get a TableEnvironment val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section
val stream: DataStream[(Long, Int)] = ...
// convert DataStream into Table with field "myLong" only val table: Table = tableEnv.fromDataStream(stream, $"myLong")
// convert DataStream into Table with field names "myLong" and "myInt" val table: Table = tableEnv.fromDataStream(stream, $"myLong", $"myInt")
基于名称映射
1 2 3 4 5 6 7 8 9 10 11 12 13
// get a TableEnvironment val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section
val stream: DataStream[(Long, Int)] = ...
// convert DataStream into Table with field "_2" only val table: Table = tableEnv.fromDataStream(stream, $"_2")
// convert DataStream into Table with swapped fields val table: Table = tableEnv.fromDataStream(stream, $"_2", $"_1")
// convert DataStream into Table with swapped fields and field names "myInt" and "myLong" val table: Table = tableEnv.fromDataStream(stream, $"_2" as "myInt", $"_1" as "myLong")
//========================================= //ROW类型 // get a TableEnvironment val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section
// DataStream of Row with two fields "name" and "age" specified in `RowTypeInfo` val stream: DataStream[Row] = ...
// convert DataStream into Table with renamed field names "myName", "myAge" (position-based) val table: Table = tableEnv.fromDataStream(stream, $"myName", $"myAge")
// convert DataStream into Table with renamed fields "myName", "myAge" (name-based) val table: Table = tableEnv.fromDataStream(stream, $"name" as "myName", $"age" as "myAge")
// convert DataStream into Table with projected field "name" (name-based) val table: Table = tableEnv.fromDataStream(stream, $"name")
// convert DataStream into Table with projected and renamed field "myName" (name-based) val table: Table = tableEnv.fromDataStream(stream, $"name" as "myName")
//=========================================== //POJO类型 // get a TableEnvironment val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section
// Person is a POJO with field names "name" and "age" val stream: DataStream[Person] = ...
// convert DataStream into Table with renamed fields "myAge", "myName" (name-based) val table: Table = tableEnv.fromDataStream(stream, $"age" as "myAge", $"name" as "myName")
// convert DataStream into Table with projected field "name" (name-based) val table: Table = tableEnv.fromDataStream(stream, $"name")
// convert DataStream into Table with projected and renamed field "myName" (name-based) val table: Table = tableEnv.fromDataStream(stream, $"name" as "myName")
//============================================= //Tuple // get a TableEnvironment val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section
val stream: DataStream[(Long, String)] = ...
// convert DataStream into Table with field names "myLong", "myString" (position-based) val table: Table = tableEnv.fromDataStream(stream, $"myLong", $"myString")
// convert DataStream into Table with reordered fields "_2", "_1" (name-based) val table: Table = tableEnv.fromDataStream(stream, $"_2", $"_1")
// convert DataStream into Table with projected field "_2" (name-based) val table: Table = tableEnv.fromDataStream(stream, $"_2")
// convert DataStream into Table with reordered and aliased fields "myString", "myLong" (name-based) val table: Table = tableEnv.fromDataStream(stream, $"_2" as "myString", $"_1" as "myLong")
// define case class caseclassPerson(name: String, age: Int) val streamCC: DataStream[Person] = ...
// convert DataStream into Table with field names 'myName, 'myAge (position-based) val table = tableEnv.fromDataStream(streamCC, $"myName", $"myAge")
// convert DataStream into Table with reordered and aliased fields "myAge", "myName" (name-based) val table: Table = tableEnv.fromDataStream(stream, $"age" as "myAge", $"name" as "myName")
//============================================== //原子类型 // get a TableEnvironment val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section
val stream: DataStream[Long] = ...
// convert DataStream into Table with field name "myLong" val table: Table = tableEnv.fromDataStream(stream, $"myLong")
// derive all physical columns automatically // but add computed columns (in this case for creating a proctime attribute column)
val table = tableEnv.fromDataStream( dataStream, Schema.newBuilder() .columnByExpression("proc_time", "PROCTIME()") .build()) table.printSchema() // prints: // ( // `name` STRING, // `score` INT NOT NULL, // `event_time` TIMESTAMP_LTZ(9), // `proc_time` TIMESTAMP_LTZ(3) NOT NULL *PROCTIME* AS PROCTIME() //)
// === EXAMPLE 3 ===
// derive all physical columns automatically // but add computed columns (in this case for creating a rowtime attribute column) // and a custom watermark strategy
val table = tableEnv.fromDataStream( dataStream, Schema.newBuilder() .columnByExpression("rowtime", "CAST(event_time AS TIMESTAMP_LTZ(3))") .watermark("rowtime", "rowtime - INTERVAL '10' SECOND") .build()) table.printSchema() // prints: // ( // `name` STRING, // `score` INT, // `event_time` TIMESTAMP_LTZ(9), // `rowtime` TIMESTAMP_LTZ(3) *ROWTIME* AS CAST(event_time AS TIMESTAMP_LTZ(3)), // WATERMARK FOR `rowtime`: TIMESTAMP_LTZ(3) AS rowtime - INTERVAL '10' SECOND // )
// === EXAMPLE 4 ===
// derive all physical columns automatically // but access the stream record's timestamp for creating a rowtime attribute column // also rely on the watermarks generated in the DataStream API
// we assume that a watermark strategy has been defined for `dataStream` before // (not part of this example) val table = tableEnv.fromDataStream( dataStream, Schema.newBuilder() .columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)") .watermark("rowtime", "SOURCE_WATERMARK()") .build()) table.printSchema() // prints: // ( // `name` STRING, // `score` INT, // `event_time` TIMESTAMP_LTZ(9), // `rowtime` TIMESTAMP_LTZ(3) *ROWTIME* METADATA, // WATERMARK FOR `rowtime`: TIMESTAMP_LTZ(3) AS SOURCE_WATERMARK() // )
// === EXAMPLE 5 ===
// define physical columns manually // in this example, // - we can reduce the default precision of timestamps from 9 to 3 // - we also project the columns and put `event_time` to the beginning
val table = tableEnv.fromDataStream( dataStream, Schema.newBuilder() .column("event_time", "TIMESTAMP_LTZ(3)") .column("name", "STRING") .column("score", "INT") .watermark("event_time", "SOURCE_WATERMARK()") .build()) table.printSchema() // prints: // ( // `event_time` TIMESTAMP_LTZ(3) *ROWTIME*, // `name` VARCHAR(200), // `score` INT // ) // note: the watermark strategy is not shown due to the inserted column reordering projection
// instead, declare a more useful data type for columns using the Table API's type system // in a custom schema and rename the columns in a following `as` projection
tableEnv.executeSql( """ CREATE TABLE GeneratedTable ( name STRING, score INT, event_time TIMESTAMP_LTZ(3), WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND ) WITH ('connector'='datagen') """ )
val table = tableEnv.from("GeneratedTable")
// === EXAMPLE 1 ===
// use the default conversion to instances of Row
// since `event_time` is a single rowtime attribute, it is inserted into the DataStream // metadata and watermarks are propagated
val dataStream: DataStream[Row] = tableEnv.toDataStream(table)
// === EXAMPLE 2 ===
// a data type is extracted from class `User`, // the planner reorders fields and inserts implicit casts where possible to convert internal // data structures to the desired structured type
// since `event_time` is a single rowtime attribute, it is inserted into the DataStream // metadata and watermarks are propagated
val dataStream: DataStream[User] = tableEnv.toDataStream(table, classOf[User])
// data types can be extracted reflectively as above or explicitly defined
// interpret the DataStream as a Table val table = tableEnv.fromChangelogStream(dataStream)
// register the table under a name and perform an aggregation tableEnv.createTemporaryView("InputTable", table) tableEnv .executeSql("SELECT f0 AS name, SUM(f1) AS score FROM InputTable GROUP BY f0") .print()
// prints: // +----+--------------------------------+-------------+ // | op | name | score | // +----+--------------------------------+-------------+ // | +I | Bob | 5 | // | +I | Alice | 12 | // | -D | Alice | 12 | // | +I | Alice | 100 | // +----+--------------------------------+-------------+
// === EXAMPLE 2 ===
// interpret the stream as an upsert stream (without a need for UPDATE_BEFORE)
// create a changelog DataStream val dataStream = env.fromElements( Row.ofKind(RowKind.INSERT, "Alice", Int.box(12)), Row.ofKind(RowKind.INSERT, "Bob", Int.box(5)), Row.ofKind(RowKind.UPDATE_AFTER, "Alice", Int.box(100)) )(Types.ROW(Types.STRING, Types.INT))
// interpret the DataStream as a Table val table = tableEnv.fromChangelogStream( dataStream, Schema.newBuilder().primaryKey("f0").build(), ChangelogMode.upsert())
// register the table under a name and perform an aggregation tableEnv.createTemporaryView("InputTable", table) tableEnv .executeSql("SELECT f0 AS name, SUM(f1) AS score FROM InputTable GROUP BY f0") .print()
// prints: // +----+--------------------------------+-------------+ // | op | name | score | // +----+--------------------------------+-------------+ // | +I | Bob | 5 | // | +I | Alice | 12 | // | -U | Alice | 12 | // | +U | Alice | 100 | // +----+--------------------------------+-------------+
// convert to DataStream in the simplest and most general way possible (with event-time)
val dataStream: DataStream[Row] = tableEnv.toChangelogStream(table)
// since `event_time` is a single time attribute in the schema, it is set as the // stream record's timestamp by default; however, at the same time, it remains part of the Row
// timestamp exists once println(ctx.timestamp()) } }) env.execute()
// === EXAMPLE 4 ===
// for advanced users, it is also possible to use more internal data structures for better // efficiency
// note that this is only mentioned here for completeness because using internal data structures // adds complexity and additional type handling
// however, converting a TIMESTAMP_LTZ column to `Long` or STRING to `byte[]` might be convenient, // also structured types can be represented as `Row` if needed
// create environments of both APIs val env = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = StreamTableEnvironment.create(env)
// create a DataStream val dataStream = env.fromElements( Row.of("Alice", Int.box(12)), Row.of("Bob", Int.box(10)), Row.of("Alice", Int.box(100)) )(Types.ROW(Types.STRING, Types.INT))
// interpret the insert-only DataStream as a Table val inputTable = tableEnv.fromDataStream(dataStream).as("name", "score")
// register the Table object as a view and query it // the query contains an aggregation that produces updates tableEnv.createTemporaryView("InputTable", inputTable) val resultTable = tableEnv.sqlQuery("SELECT name, SUM(score) FROM InputTable GROUP BY name")
// interpret the updating Table as a changelog DataStream val resultStream = tableEnv.toChangelogStream(resultTable)
// add a printing sink and execute in DataStream API resultStream.print() env.execute()
// prints: // +I[Alice, 12] --insert // +I[Bob, 10] // -U[Alice, 12] --update before // +U[Alice, 112] --update after