当前位置:网站首页>Flinktable & SQL (VI)
Flinktable & SQL (VI)
2022-07-24 13:45:00 【Hua Weiyun】
FlinkTable&SQL( 6、 ... and 、 7、 ... and )
Today's goal
- understand Flink Table&SQL The development history
- Understand why to use Table API & SQL
- master Flink Table&SQL Do batch development
- master Flink Table&SQL Develop stream processing
- Master common development cases
- Flink-SQL Common operators of
Flink Table & SQL
FlinkTable & SQL Is a higher level of abstraction operation , Bottom Flink Runtime => Stream technological process
Batch processing is a special form of stream processing
FlinkSQL follow ANSI Of SQL standard
Flink1.9 Before , FlinkSQL Including two sets Table api , DataStream Table API( Stream processing ) ,DataSet Table API( The batch )
Planner Inquire device , Abstract syntax tree ,parser、optimizer、codegen( Template code generation ), The resulting Flink Runtime Code for direct execution
Planner Include old Planner and Blink Planner ,Blink Planner The underlying implementation Stream batch integration ( default Planner)

FlinkTable & SQL Program structure
Import pom rely on , jar Package coordinates
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_2.12</artifactId> <version>${flink.version}</version> </dependency> <!-- flink Implementation plan , This is a 1.9 pre-release --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.12</artifactId> <version>${flink.version}</version> </dependency> <!-- blink Implementation plan ,1.11+ default --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> </dependency>
establish FlinkTable FlinkSQL Of The way to watch
// table is the result of a simple projection query Table projTable = tableEnv.from("X").select(...);// register the Table projTable as table "projectedTable"tableEnv.createTemporaryView("projectedTable", projTable);SQL Four statements of
- DDL Data definition language , Create database 、 surface , Delete database 、 surface
- DML Data operation language , Add to the data 、 Delete 、 Change operation
- DCL Data control language , Set the operation permission of data grant revoke
- DQL Data query language , Query the data in the data table , Basic query , Complex queries , Multi-table query , Subquery
demand
Connect the two data streams DataStream adopt FlinkTable & SQL API Conduct union all operation , Conditions ds1 amount>2 union all ds2 amount<2
Development steps
package cn.itcast.flink.sql;import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.types.Row;import java.util.Arrays;import static org.apache.flink.table.api.Expressions.$;/** * Author itcast * Date 2021/6/22 9:45 * Desc TODO */public class FlinkTableAPIDemo { public static void main(String[] args) throws Exception { //1. Prepare the environment Create a streaming environment and Flow table environment , The parallelism is set to 1 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); // Create a flow table environment StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,settings); //2.Source Create a dataset DataStream<Order> orderA = env.fromCollection(Arrays.asList( new Order(1L, "beer", 3), new Order(1L, "diaper", 4), new Order(3L, "rubber", 2))); DataStream<Order> orderB = env.fromCollection(Arrays.asList( new Order(2L, "pen", 3), new Order(2L, "rubber", 3), new Order(4L, "beer", 1))); //3. The registry Convert data streams into tables // adopt fromDataStream Convert data streams into tables Table orderTableA = tEnv.fromDataStream(orderA, $("user"), $("product"), $("amount")); // Convert the data stream into Create a temporary view tEnv.createTemporaryView("orderTableB",orderB,$("user"), $("product"), $("amount")); //4. Execute the query , Inquire about order1 Of amount>2 and union all On order2 Of amoun<2 Data generation table Table result = tEnv.sqlQuery("" + "select * from " + orderTableA + " where amount>2 " + "union all " + "select * from orderTableB where amount<2"); //4.1 Convert the result table to toAppendStream Data flow // The name and type of the field result.printSchema(); DataStream<Row> resultDS = tEnv.toAppendStream(result, Row.class); //5. Print the results resultDS.print(); //6. execution environment env.execute(); // Create entity class user:Long product:String amount:int } @Data @NoArgsConstructor @AllArgsConstructor public static class Order { public Long user; public String product; public int amount; }}
边栏推荐
- 为什么函数式接口 Comparator 中有 “两个抽象方法”?
- How to generate expected data? Emory University and others' latest "deep learning controllable data generation" review, 52 page PDF, covering 346 documents, comprehensively expounds the controllable g
- Aggregation measurement of robot swarm intelligence based on group entropy
- Flink容错机制(五)
- Flink综合案例(九)
- 2021年最新最全Flink系列教程_Flink原理初探和流批一体API(二.五)v2
- 户外广告牌不能“想挂就挂”!广州城管部门加强户外广告安全管理
- Interface document evolution atlas, some ancient interface document tools, you may not have used them
- 基于社会媒体数据增强的交通态势感知研究及进展
- Network security - filtering bypass injection
猜你喜欢

群体知识图谱:分布式知识迁移与联邦式图谱推理

Network security - Web information collection

Common OJ questions of stack and queue

CSDN垃圾的没有底线!

Explain the edge cloud in simple terms | 2. architecture

Game thinking 04 summary: a summary of frame, state and physical synchronization (it was too long before, and now it's brief)

Icml2022 | branch reinforcement learning

Network security - file upload content check bypass

【无标题】rhcsa第一次作业

rhce第一次作业
随机推荐
Browser type judgment
Network security - use exchange SSRF vulnerabilities in combination with NTLM trunking for penetration testing
How to quickly wrap lines in Excel table
网络安全——过滤绕过注入
Adjust the array order so that odd numbers precede even numbers
Bayesian width learning system based on graph regularization
Outdoor billboards cannot be hung up if you want! Guangzhou urban management department strengthens the safety management of outdoor advertising
R语言使用sort函数排序向量数据实战、返回实际排序后的数据(默认升序)
R language uses the statstack function of epidisplay package to view the statistics (mean, median, etc.) of continuous variables and the corresponding hypothesis test in a hierarchical manner based on
An error is reported when using activiti to create a database table,
Paper notes: swing UNET: UNET like pure transformer for medicalimage segmentation
Network security - Web information collection
Interview question 01.02. determine whether it is character rearrangement
Network security - file upload penetration test
申请了SSL数字证书如何进行域名验证?
Why are there "two abstract methods" in the functional interface comparator?
基于图正则化的贝叶斯宽度学习系统
Aggregation measurement of robot swarm intelligence based on group entropy
在LNMP架构中搭建Zabbix监控服务
CSP2021 T3 回文
