当前位置:网站首页>Flinktable & SQL (VI)
Flinktable & SQL (VI)
2022-07-24 13:45:00 【Hua Weiyun】
FlinkTable&SQL( 6、 ... and 、 7、 ... and )
Today's goal
- understand Flink Table&SQL The development history
- Understand why to use Table API & SQL
- master Flink Table&SQL Do batch development
- master Flink Table&SQL Develop stream processing
- Master common development cases
- Flink-SQL Common operators of
Flink Table & SQL
FlinkTable & SQL Is a higher level of abstraction operation , Bottom Flink Runtime => Stream technological process
Batch processing is a special form of stream processing
FlinkSQL follow ANSI Of SQL standard
Flink1.9 Before , FlinkSQL Including two sets Table api , DataStream Table API( Stream processing ) ,DataSet Table API( The batch )
Planner Inquire device , Abstract syntax tree ,parser、optimizer、codegen( Template code generation ), The resulting Flink Runtime Code for direct execution
Planner Include old Planner and Blink Planner ,Blink Planner The underlying implementation Stream batch integration ( default Planner)

FlinkTable & SQL Program structure
Import pom rely on , jar Package coordinates
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_2.12</artifactId> <version>${flink.version}</version> </dependency> <!-- flink Implementation plan , This is a 1.9 pre-release --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.12</artifactId> <version>${flink.version}</version> </dependency> <!-- blink Implementation plan ,1.11+ default --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> </dependency>
establish FlinkTable FlinkSQL Of The way to watch
// table is the result of a simple projection query Table projTable = tableEnv.from("X").select(...);// register the Table projTable as table "projectedTable"tableEnv.createTemporaryView("projectedTable", projTable);SQL Four statements of
- DDL Data definition language , Create database 、 surface , Delete database 、 surface
- DML Data operation language , Add to the data 、 Delete 、 Change operation
- DCL Data control language , Set the operation permission of data grant revoke
- DQL Data query language , Query the data in the data table , Basic query , Complex queries , Multi-table query , Subquery
demand
Connect the two data streams DataStream adopt FlinkTable & SQL API Conduct union all operation , Conditions ds1 amount>2 union all ds2 amount<2
Development steps
package cn.itcast.flink.sql;import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.types.Row;import java.util.Arrays;import static org.apache.flink.table.api.Expressions.$;/** * Author itcast * Date 2021/6/22 9:45 * Desc TODO */public class FlinkTableAPIDemo { public static void main(String[] args) throws Exception { //1. Prepare the environment Create a streaming environment and Flow table environment , The parallelism is set to 1 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); // Create a flow table environment StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,settings); //2.Source Create a dataset DataStream<Order> orderA = env.fromCollection(Arrays.asList( new Order(1L, "beer", 3), new Order(1L, "diaper", 4), new Order(3L, "rubber", 2))); DataStream<Order> orderB = env.fromCollection(Arrays.asList( new Order(2L, "pen", 3), new Order(2L, "rubber", 3), new Order(4L, "beer", 1))); //3. The registry Convert data streams into tables // adopt fromDataStream Convert data streams into tables Table orderTableA = tEnv.fromDataStream(orderA, $("user"), $("product"), $("amount")); // Convert the data stream into Create a temporary view tEnv.createTemporaryView("orderTableB",orderB,$("user"), $("product"), $("amount")); //4. Execute the query , Inquire about order1 Of amount>2 and union all On order2 Of amoun<2 Data generation table Table result = tEnv.sqlQuery("" + "select * from " + orderTableA + " where amount>2 " + "union all " + "select * from orderTableB where amount<2"); //4.1 Convert the result table to toAppendStream Data flow // The name and type of the field result.printSchema(); DataStream<Row> resultDS = tEnv.toAppendStream(result, Row.class); //5. Print the results resultDS.print(); //6. execution environment env.execute(); // Create entity class user:Long product:String amount:int } @Data @NoArgsConstructor @AllArgsConstructor public static class Order { public Long user; public String product; public int amount; }}
边栏推荐
- From cloud native to intelligent, in-depth interpretation of the industry's first "best practice map of live video technology"
- An error is reported when using activiti to create a database table,
- Bayesian width learning system based on graph regularization
- Research and progress of traffic situation awareness based on social media data enhancement
- Swarm intelligence collaborative obstacle avoidance method inspired by brain attention mechanism
- 第六章 总线
- Interview question 01.02. determine whether it is character rearrangement
- The gather function of tidyr package of R language converts a wide table into a long table (a wide table into a long table), the first parameter specifies the name of the new data column generated by
- 数据修改插入
- Hcip day 13
猜你喜欢
随机推荐
为什么函数式接口 Comparator 中有 “两个抽象方法”?
Sringboot-plugin-framework 实现可插拔插件服务
Adjust the array order so that odd numbers precede even numbers
Outdoor billboards cannot be hung up if you want! Guangzhou urban management department strengthens the safety management of outdoor advertising
Kunyu installation details
支持鹏程系列开源大模型应用生态演化的可持续学习能力探索
Common OJ questions of stack and queue
Network security - war backdoor deployment
代码签名证书与SSL证书区别
网络安全——使用Evil Maid物理访问安全漏洞进行渗透
Icml2022 | branch reinforcement learning
How to generate expected data? Emory University and others' latest "deep learning controllable data generation" review, 52 page PDF, covering 346 documents, comprehensively expounds the controllable g
R language uses the tablestack function of epidisplay package to make statistical summary tables (descriptive statistics based on the grouping of target variables, hypothesis testing, etc.), set the b
Explain flex layout in detail
MPLS中的包交换和标签交换
网络安全——过滤绕过注入
Repair the problem of adding device groups and editing exceptions on easycvr platform
数据类型二进制字符串类型
Flink容错机制(五)
HCIP第十三天










![[acm/ two points] two points clear entry-level explanation](/img/87/e4d58b7530bfc381ec07d7c76e90a1.png)