当前位置:网站首页>FlinkTable&SQL(六)
FlinkTable&SQL(六)
2022-07-24 13:37:00 【华为云】
FlinkTable&SQL(六、七)
今日目标
- 了解Flink Table&SQL发展历史
- 了解为什么要使用Table API & SQL
- 掌握Flink Table&SQL进行批处理开发
- 掌握Flink Table&SQL进行流处理开发
- 掌握常用的开发案例
- Flink-SQL的常用算子
Flink Table & SQL
FlinkTable & SQL 是抽象级别更高的操作, 底层Flink Runtime => Stream 流程
批处理是流处理的一种特殊形态
FlinkSQL 遵循ANSI的SQL规范
Flink1.9之前, FlinkSQL包括两套Table api , DataStream Table API(流处理) ,DataSet Table API(批处理)
Planner 查询器, 抽象语法树,parser、optimizer、codegen(模板代码生成),最终生成 Flink Runtime 直接进行执行的代码
Planner包括old Planner 和 Blink Planner ,Blink Planner 底层实现了 流批一体(默认的Planner)

FlinkTable & SQL 程序结构
导入 pom 依赖, jar包坐标
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_2.12</artifactId> <version>${flink.version}</version> </dependency> <!-- flink执行计划,这是1.9版本之前的--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.12</artifactId> <version>${flink.version}</version> </dependency> <!-- blink执行计划,1.11+默认的--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> </dependency>
创建 FlinkTable FlinkSQL的 表的方式
// table is the result of a simple projection query Table projTable = tableEnv.from("X").select(...);// register the Table projTable as table "projectedTable"tableEnv.createTemporaryView("projectedTable", projTable);SQL的四种语句
- DDL 数据定义语言, 创建数据库、表,删除数据库、表
- DML 数据操作语言, 对数据进行增、删、改操作
- DCL 数据控制语言, 对数据的操作权限进行设置 grant revoke
- DQL 数据查询语言,对数据表中的数据进行查询,基础查询,复杂查询,多表查询,子查询
需求
将两个数据流 DataStream 通过 FlinkTable & SQL API 进行 union all 操作,条件ds1 amount>2 union all ds2 amount<2
开发步骤
package cn.itcast.flink.sql;import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.types.Row;import java.util.Arrays;import static org.apache.flink.table.api.Expressions.$;/** * Author itcast * Date 2021/6/22 9:45 * Desc TODO */public class FlinkTableAPIDemo { public static void main(String[] args) throws Exception { //1.准备环境 创建流环境 和 流表环境,并行度设置为1 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); //创建流表环境 StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,settings); //2.Source 创建数据集 DataStream<Order> orderA = env.fromCollection(Arrays.asList( new Order(1L, "beer", 3), new Order(1L, "diaper", 4), new Order(3L, "rubber", 2))); DataStream<Order> orderB = env.fromCollection(Arrays.asList( new Order(2L, "pen", 3), new Order(2L, "rubber", 3), new Order(4L, "beer", 1))); //3.注册表 将数据流转换成表 // 通过fromDataStream将数据流转换成表 Table orderTableA = tEnv.fromDataStream(orderA, $("user"), $("product"), $("amount")); // 将数据流转换成 创建临时视图 tEnv.createTemporaryView("orderTableB",orderB,$("user"), $("product"), $("amount")); //4.执行查询,查询order1的amount>2并union all 上 order2的amoun<2的数据生成表 Table result = tEnv.sqlQuery("" + "select * from " + orderTableA + " where amount>2 " + "union all " + "select * from orderTableB where amount<2"); //4.1 将结果表转换成toAppendStream数据流 //字段的名称和类型 result.printSchema(); DataStream<Row> resultDS = tEnv.toAppendStream(result, Row.class); //5.打印结果 resultDS.print(); //6.执行环境 env.execute(); // 创建实体类 user:Long product:String amount:int } @Data @NoArgsConstructor @AllArgsConstructor public static class Order { public Long user; public String product; public int amount; }}
边栏推荐
- EAS environment structure directory
- Modification of EAS login interface
- Dtcloud uses custom fonts
- 网络安全——Web信息收集
- Atcoder beginer contest 261 f / / tree array
- How to generate expected data? Emory University and others' latest "deep learning controllable data generation" review, 52 page PDF, covering 346 documents, comprehensively expounds the controllable g
- 数据修改修改
- 网络安全——过滤绕过注入
- CSDN垃圾的没有底线!
- Editor formula
猜你喜欢

Explain the edge cloud in simple terms | 2. architecture
![[paper reading] temporary binding for semi-superior learning](/img/59/846d868cccad238267bd984f085218.png)
[paper reading] temporary binding for semi-superior learning

How to draw Bezier curve and spline curve?

汉字风格迁移篇---无监督排版传输

脑注意力机制启发的群体智能协同避障方法

网络安全——使用Exchange SSRF 漏洞结合NTLM中继进行渗透测试

【论文阅读】Mean teachers are better role models

支持鹏程系列开源大模型应用生态演化的可持续学习能力探索

汉字风格迁移篇---用于汉字多字体生成的多样性正则化StarGAN

Win10 log in with Microsoft account and open all programs by default with administrator privileges: 2020-12-14
随机推荐
CSDN garbage has no bottom line!
Browser type judgment
三层交换机配置MSTP协议详解【华为eNSP实验】
Cmake basic grammar (1)
Odoo+ test
The scroll bar in unity ugui is not displayed from the top when launching the interface in the game
position: -webkit-sticky; /* for Safari */ position: sticky;
Network security - file upload penetration test
开放环境下的群智决策:概念、挑战及引领性技术
Simple order management system small exercise
Game thinking 04 summary: a summary of frame, state and physical synchronization (it was too long before, and now it's brief)
Paper notes: swing UNET: UNET like pure transformer for medicalimage segmentation
Notes on Linear Algebra -- lesson 25 -- projection of vectors on axes
Sort method -- bubble sort (use an array to sort a string of numbers from large to small or from small to large)
Selenium environment configuration and eight elements positioning
如何生成预期数据?埃默里大学等最新《深度学习可控数据生成》综述,52页pdf涵盖346篇文献全面阐述可控生成技术体系
使用activiti创建数据库表报错
Why are there "two abstract methods" in the functional interface comparator?
[机缘参悟-51]:既然人注定要死亡,为什么还要活着?
Network security - Web penetration testing
