当前位置:网站首页>Flinktable & SQL (VI)
Flinktable & SQL (VI)
2022-07-24 13:45:00 【Hua Weiyun】
FlinkTable&SQL( 6、 ... and 、 7、 ... and )
Today's goal
- understand Flink Table&SQL The development history
- Understand why to use Table API & SQL
- master Flink Table&SQL Do batch development
- master Flink Table&SQL Develop stream processing
- Master common development cases
- Flink-SQL Common operators of
Flink Table & SQL
FlinkTable & SQL Is a higher level of abstraction operation , Bottom Flink Runtime => Stream technological process
Batch processing is a special form of stream processing
FlinkSQL follow ANSI Of SQL standard
Flink1.9 Before , FlinkSQL Including two sets Table api , DataStream Table API( Stream processing ) ,DataSet Table API( The batch )
Planner Inquire device , Abstract syntax tree ,parser、optimizer、codegen( Template code generation ), The resulting Flink Runtime Code for direct execution
Planner Include old Planner and Blink Planner ,Blink Planner The underlying implementation Stream batch integration ( default Planner)

FlinkTable & SQL Program structure
Import pom rely on , jar Package coordinates
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_2.12</artifactId> <version>${flink.version}</version> </dependency> <!-- flink Implementation plan , This is a 1.9 pre-release --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.12</artifactId> <version>${flink.version}</version> </dependency> <!-- blink Implementation plan ,1.11+ default --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> </dependency>
establish FlinkTable FlinkSQL Of The way to watch
// table is the result of a simple projection query Table projTable = tableEnv.from("X").select(...);// register the Table projTable as table "projectedTable"tableEnv.createTemporaryView("projectedTable", projTable);SQL Four statements of
- DDL Data definition language , Create database 、 surface , Delete database 、 surface
- DML Data operation language , Add to the data 、 Delete 、 Change operation
- DCL Data control language , Set the operation permission of data grant revoke
- DQL Data query language , Query the data in the data table , Basic query , Complex queries , Multi-table query , Subquery
demand
Connect the two data streams DataStream adopt FlinkTable & SQL API Conduct union all operation , Conditions ds1 amount>2 union all ds2 amount<2
Development steps
package cn.itcast.flink.sql;import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.types.Row;import java.util.Arrays;import static org.apache.flink.table.api.Expressions.$;/** * Author itcast * Date 2021/6/22 9:45 * Desc TODO */public class FlinkTableAPIDemo { public static void main(String[] args) throws Exception { //1. Prepare the environment Create a streaming environment and Flow table environment , The parallelism is set to 1 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); // Create a flow table environment StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,settings); //2.Source Create a dataset DataStream<Order> orderA = env.fromCollection(Arrays.asList( new Order(1L, "beer", 3), new Order(1L, "diaper", 4), new Order(3L, "rubber", 2))); DataStream<Order> orderB = env.fromCollection(Arrays.asList( new Order(2L, "pen", 3), new Order(2L, "rubber", 3), new Order(4L, "beer", 1))); //3. The registry Convert data streams into tables // adopt fromDataStream Convert data streams into tables Table orderTableA = tEnv.fromDataStream(orderA, $("user"), $("product"), $("amount")); // Convert the data stream into Create a temporary view tEnv.createTemporaryView("orderTableB",orderB,$("user"), $("product"), $("amount")); //4. Execute the query , Inquire about order1 Of amount>2 and union all On order2 Of amoun<2 Data generation table Table result = tEnv.sqlQuery("" + "select * from " + orderTableA + " where amount>2 " + "union all " + "select * from orderTableB where amount<2"); //4.1 Convert the result table to toAppendStream Data flow // The name and type of the field result.printSchema(); DataStream<Row> resultDS = tEnv.toAppendStream(result, Row.class); //5. Print the results resultDS.print(); //6. execution environment env.execute(); // Create entity class user:Long product:String amount:int } @Data @NoArgsConstructor @AllArgsConstructor public static class Order { public Long user; public String product; public int amount; }}
边栏推荐
- 数据类型二进制字符串类型
- Simple order management system small exercise
- 使用Activiti创建数据库表报错,
- How can the easycvr platform access special devices without authentication?
- Paper notes: swing UNET: UNET like pure transformer for medicalimage segmentation
- Rhcsa sixth note
- The scroll bar in unity ugui is not displayed from the top when launching the interface in the game
- rhcsa第六次笔记
- Go redis pipeline application
- 2021-07-09
猜你喜欢
随机推荐
R语言使用epiDisplay包的summ函数计算dataframe中指定变量在不同分组变量下的描述性统计汇总信息并可视化有序点图、自定义cex.main参数配置标题文本字体的大小
Realize a JS lottery?
Soft link, hard link
群体知识图谱:分布式知识迁移与联邦式图谱推理
FlinkTable&SQL(六)
Network security - Cookie injection
Go redis pipeline application
Aike AI frontier promotion (7.24)
XSS white list
How can the easycvr platform access special devices without authentication?
Kunyu installation details
支持鹏程系列开源大模型应用生态演化的可持续学习能力探索
三层交换机配置MSTP协议详解【华为eNSP实验】
Statistical table of competition time and host school information of 2022 national vocational college skills competition (the second batch)
SQL Server 启停作业脚本
网络安全——过滤绕过注入
Packet switching and label switching in MPLS
交换机链路聚合详解【华为eNSP】
基于典型相关分析的多视图学习方法综述
2021年最新最全Flink系列教程_Flink原理初探和流批一体API(二.五)v2









