当前位置:网站首页>Flink 系例 之 TableAPI & SQL 與 示例模塊
Flink 系例 之 TableAPI & SQL 與 示例模塊
2022-06-21 19:38:00 【不會飛的小龍人】
官方介紹
Flink 中的 API
Flink 為流式 / 批式處理應用程序的開發提供了不同級別的抽象。

- Flink API 最底層的抽象為有狀態實時流處理。其抽象實現是 Process Function,並且 Process Function 被 Flink 框架集成到了 DataStream API 中來為我們使用。它允許用戶在應用程序中自由地處理來自單流或多流的事件(數據),並提供具有全局一致性和容錯保障的狀態。此外,用戶可以在此層抽象中注册事件時間(event time)和處理時間(processing time)回調方法,從而允許程序可以實現複雜計算。
- Flink API 第二層抽象是 Core APIs。實際上,許多應用程序不需要使用到上述最底層抽象的 API,而是可以使用 Core APIs 進行編程:其中包含 DataStream API(應用於有界 / 無界數據流場景)和 DataSet API(應用於有界數據集場景)兩部分。Core APIs 提供的流式 API(Fluent API)為數據處理提供了通用的模塊組件,例如各種形式的用戶自定義轉換(transformations)、聯接(joins)、聚合(aggregations)、窗口(windows)和狀態(state)操作等。此層 API 中處理的數據類型在每種編程語言中都有其對應的類。
Process Function 這類底層抽象和 DataStream API 的相互集成使得用戶可以選擇使用更底層的抽象 API 來實現自己的需求。DataSet API 還額外提供了一些原語,比如循環 / 迭代(loop/iteration)操作。
- Flink API 第三層抽象是 Table API。Table API 是以錶(Table)為中心的聲明式編程(DSL)API,例如在流式數據場景下,它可以錶示一張正在動態改變的錶。Table API 遵循(擴展)關系模型:即錶擁有 schema(類似於關系型數據庫中的 schema),並且 Table API 也提供了類似於關系模型中的操作,比如 select、project、join、group-by 和 aggregate 等。Table API 程序是以聲明的方式定義應執行的邏輯操作,而不是確切地指定程序應該執行的代碼。盡管 Table API 使用起來很簡潔並且可以由各種類型的用戶自定義函數擴展功能,但還是比 Core API 的錶達能力差。此外,Table API 程序在執行之前還會使用優化器中的優化規則對用戶編寫的錶達式進行優化。
錶和 DataStream/DataSet 可以進行無縫切換,Flink 允許用戶在編寫應用程序時將 Table API 與 DataStream/DataSet API 混合使用。
- Flink API 最頂層抽象是 SQL。這層抽象在語義和程序錶達式上都類似於 Table API,但是其程序實現都是 SQL 查詢錶達式。SQL 抽象與 Table API 抽象之間的關聯是非常緊密的,並且 SQL 查詢語句可以在 Table API 中定義的錶上執行。
Table API 和 SQL
Apache Flink 有兩種關系型 API 來做流批統一處理:Table API 和 SQL。
- Table API 是用於 Scala 和 Java 語言的查詢 API,它可以用一種非常直觀的方式來組合使用選取、過濾、join 等關系型算子。Flink SQL 是基於 Apache Calcite 來實現的標准 SQL。這兩種 API 中的查詢對於批(DataSet)和流(DataStream)的輸入有相同的語義,也會產生同樣的計算結果。
- Table API 和 SQL 兩種 API 是緊密集成的,以及 DataStream 和 DataSet API。你可以在這些 API 之間,以及一些基於這些 API 的庫之間輕松的切換。比如,你可以先用 CEP 從 DataStream 中做模式匹配,然後用 Table API 來分析匹配的結果;或者你可以用 SQL 來掃描、過濾、聚合一個批式的錶,然後再跑一個 Gelly 圖算法 來處理已經預處理好的數據。
注意:Table API 和 SQL 現在還處於活躍開發階段,還沒有完全實現所有的特性。不是所有的 [Table API,SQL] 和 [流,批] 的組合都是支持的。
官方文檔
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/
TableAPI&SQL 開發
從本篇開始,增加 TableAPI&SQL 演示內容,在原有的工程基礎上,擴展一個 tableapi 模塊;此模塊會演示以下幾個組件的 TableApi 與 SQL 簡單使用;
- elasticsearch
- kafka
- jdbc (mysql)
新增 tableapi 模塊
在當前工程中,創建名稱為 tableapi 的 maven 工程模塊
pom.xml
<artifactId>tableapi</artifactId>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.12</artifactId>
<version>1.11.1</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.11.1</version>
<!--<scope>provided</scope>-->
</dependency>
<!-- flink-connector-kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.11.1</version>
</dependency>
<!-- mysql驅動包 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
<!-- elasticsearch6依賴 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!--<dependency>-->
<!--<groupId>org.apache.flink</groupId>-->
<!--<artifactId>flink-sql-connector-elasticsearch6_2.11</artifactId>-->
<!--<version>${flink.version}</version>-->
<!--</dependency>-->
</dependencies>刷新工程 maven,下載相關功能依賴組件包;
工程模塊

後續關於 TableAPI&SQL 的演示示例均在此 tableapi 模塊下進行基礎上開發;
源碼下載
Gitee:flink-examples: 基於flink.1.11.1版本的工程示例,此示例包含大部份算子、窗口、中間件連接器、tables&sql的用法,適合新人學習使用;
边栏推荐
- 如何使用DevExpress WPF在WinUI中创建第一个MVVM应用?
- jvm造轮子
- 插入类排序法
- W10 add system environment variable path
- After Hongmeng, Huawei announced that it would donate to Euler again. What impact is expected to be brought to the industry by the donations of Hongmeng and Euler?
- [interval and topic prefix sum] line segment tree (dynamic open point) application problem
- Ogg-21.3 error reporting ogg-00768 failed to map database character to ulibcharaset
- vivo 容器集群监控系统架构与实践
- 力扣今日题1108. IP 地址无效化
- 【力扣10天SQL入门】Day1
猜你喜欢

MFC界面库BCGControlBar v33.0 - 桌面警报窗口、网格控件升级

动态规划【一】(背包问题)

Yolov5 trains its own data set to report error records

点歌系统也有鸿蒙版了?老王抢先体验了一把

After the 80 version of Google browser, how to deal with the problem samesite cross domain problem

Shang Silicon Valley Shang Silicon Valley | what is Clickhouse table engine memory and merge

出院小结识别api接口-医疗票据OCR识别/出院诊断记录/电子病历/理赔服务

谷歌浏览器80版本以后,如何处理出现的问题SameSite跨域问题

How to use devaxpress WPF to create the first MVVM application in winui?

企评家全面解读:【国家电网】中国电力财务有限公司企业成长性
随机推荐
文件上传漏洞靶场分析 UPLOAD_LABS
[high frequency interview questions] the difficulty is 1.5/5. Common two point double pointer interview questions
如何使用DevExpress WPF在WinUI中创建第一个MVVM应用?
R language uses GLM function to build Poisson regression model, and coef function to obtain the coefficients of Poisson regression model and analyze the effects of various variables
The R language uses the follow up The plot function visualizes the longitudinal follow-up chart of multiple ID (case) monitoring indicators, and uses line Col parameter custom curve color (color)
[comprehensive pen test] difficulty 2.5/5: "tree array" and "double tree array optimization"
Technology sharing | mysql:caching_ sha2_ Password quick Q & A
Enabling developers of shengteng scientific research innovation enabling program Huawei computing provides three dimensional support
力扣今日题1108. IP 地址无效化
Must the database primary key be self incremented? What scenarios do not suggest self augmentation?
homeassistant addons
W10 add system environment variable path
网管型全国产加固交换机如何创建网络冗余
技术分享 | MySQL:caching_sha2_password 快速问答
【区间和专题の前缀和】前缀和 + 哈希表 运用题
How many correct answers can you get to Huawei Hongmeng certification test questions?
第298场周赛
API interface for discharge summary identification - medical bill OCR identification / discharge diagnosis record / electronic medical record / claim settlement service
论文解读(USIB)《Towards Explanation for Unsupervised Graph-Level Representation Learning》
After the 80 version of Google browser, how to deal with the problem samesite cross domain problem