当前位置:网站首页>Flink Catalog解读
Flink Catalog解读
2022-07-23 19:46:00 【杨林伟】
文章目录
01 引言
我们知道 Flink 有Table(表)、View(视图)、Function(函数/算子)、Database(数据库)的概念,这都类似于我们平常使用的关系型数据库里面的概念。
相对于关系型数据库的这些概念,Flink 里还有一个 Catalog(目录) 的概念,本文来讲解下。

02 Catalog
2.1 Catalog概述
数据处理最关键的方面之一是管理元数据:
- 元数据可以是临时的,例如在
Flink中临时表、或者通过TableEnvironment注册的UDF; - 元数据也可以是持久化的,例如
Hive Metastore中的元数据。
Catalog在Flink中提供了一个统一的API,用于管理元数据,并使其可以从 Table API 和 SQL 查询语句中来访问。Catalog 提供了元数据信息,例如数据库、表、分区、视图以及数据库或其他外部系统中存储的函数和信息。
2.2 Catalog分类
Catalog目前分为以下几类:
| 分类 | 描述 | 缺陷 |
|---|---|---|
| GenericInMemoryCatalog | 基于内存实现的 Catalog | 所有元数据只在 session 的生命周期内可用 |
| JdbcCatalog | 可以将 Flink 通过 JDBC 协议连接到关系数据库 | JDBC Catalog只实现了PostgresCatalog |
| HiveCatalog | 作为原生 Flink 元数据的持久化存储,以及作为读写现有 Hive 元数据的接口 | Hive Metastore 以小写形式存储所有元数据对象名称。而 GenericInMemoryCatalog 区分大小写。 |
| 自定义 Catalog | 通过实现 Catalog 接口来开发自定义 Catalog | - |
2.3 Catalog API
2.3.1 数据库操作
// create database
catalog.createDatabase("mydb", new CatalogDatabaseImpl(...), false);
// drop database
catalog.dropDatabase("mydb", false);
// alter database
catalog.alterDatabase("mydb", new CatalogDatabaseImpl(...), false);
// get database
catalog.getDatabase("mydb");
// check if a database exist
catalog.databaseExists("mydb");
// list databases in a catalog
catalog.listDatabases("mycatalog");
2.3.2 表操作
// create table
catalog.createTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);
// drop table
catalog.dropTable(new ObjectPath("mydb", "mytable"), false);
// alter table
catalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);
// rename table
catalog.renameTable(new ObjectPath("mydb", "mytable"), "my_new_table");
// get table
catalog.getTable("mytable");
// check if a table exist or not
catalog.tableExists("mytable");
// list tables in a database
catalog.listTables("mydb");
2.3.3 视图操作
// create view
catalog.createTable(new ObjectPath("mydb", "myview"), new CatalogViewImpl(...), false);
// drop view
catalog.dropTable(new ObjectPath("mydb", "myview"), false);
// alter view
catalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogViewImpl(...), false);
// rename view
catalog.renameTable(new ObjectPath("mydb", "myview"), "my_new_view", false);
// get view
catalog.getTable("myview");
// check if a view exist or not
catalog.tableExists("mytable");
// list views in a database
catalog.listViews("mydb");
2.3.4 分区操作
// create view
catalog.createPartition(
new ObjectPath("mydb", "mytable"),
new CatalogPartitionSpec(...),
new CatalogPartitionImpl(...),
false);
// drop partition
catalog.dropPartition(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...), false);
// alter partition
catalog.alterPartition(
new ObjectPath("mydb", "mytable"),
new CatalogPartitionSpec(...),
new CatalogPartitionImpl(...),
false);
// get partition
catalog.getPartition(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));
// check if a partition exist or not
catalog.partitionExists(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));
// list partitions of a table
catalog.listPartitions(new ObjectPath("mydb", "mytable"));
// list partitions of a table under a give partition spec
catalog.listPartitions(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));
// list partitions of a table by expression filter
catalog.listPartitions(new ObjectPath("mydb", "mytable"), Arrays.asList(epr1, ...));
2.3.5 函数操作
catalog.createFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false);
// drop function
catalog.dropFunction(new ObjectPath("mydb", "myfunc"), false);
// alter function
catalog.alterFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false);
// get function
catalog.getFunction("myfunc");
// check if a function exist or not
catalog.functionExists("myfunc");
// list functions in a database
catalog.listFunctions("mydb");
2.4 Catalog 示例(SQL Client的方式)
① 首先需要注册Catalog:用户可以访问默认创建的内存 Catalog default_catalog,这个 Catalog 默认拥有一个默认数据库 default_database。 用户也可以注册其他的 Catalog 到现有的 Flink 会话中,创建方式如下(可以使用Flink里面的Factory工厂模式动态加载):
tableEnv.registerCatalog(new CustomCatalog("myCatalog"));
② 指定使用的内容:Flink 始终在当前的 Catalog 和数据库中寻找表、视图和 UDF,代码如下:
Flink SQL> USE CATALOG myCatalog;
Flink SQL> USE myDB;
也可以通过提供全限定名 catalog.database.object 来访问不在当前 Catalog 中的元数据信息,代码如下:
Flink SQL> SELECT * FROM not_the_current_catalog.not_the_current_db.my_table;
③ 其它常规命令:
-- 列出可用的 Catalog
Flink SQL> show catalogs;
-- 列出可用的数据库
Flink SQL> show databases;
-- 列出可用的表
Flink SQL> show tables;
03 文末
本文主要讲解了Flink Catalog的概念以及用法,如果大家有兴趣可以进一步去官网查看相关的文档,这里我列出相关比较核心的文档:
- catalogs:https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/catalogs/
- jdbc catalog(postgress):https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/connectors/table/jdbc/#postgres-%e6%95%b0%e6%8d%ae%e5%ba%93%e4%bd%9c%e4%b8%ba-catalog
- hive catalog:https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/connectors/table/hive/overview/
接下来我的计划是编写 “如何自定义Catalog” ,以及Catalog的应用场景(有兴趣可先阅读《Ververica Platform-阿里巴巴全新Flink企业版揭秘》)相关的博客,谢谢大家的阅读,希望能帮助到大家,本文完!
边栏推荐
- 【C语言】通讯录(静态版本)
- AtCoder B - Pizza
- Energy principle and variational method note 17: generalized variational principle (identification factor method)
- Leetcode 152. product maximum subarray (brute force cracking can actually pass!)
- Uncover the working principle of solid state disk
- cJSON库的使用
- ODrive应用 #6 编码器
- 梅科爾工作室-小熊派開發筆記2
- What if there is no word document in win11? There is no word document solution tutorial in win11
- Typescript use of new data type symbol
猜你喜欢

Energy principle and variational method note 18: virtual force principle

如何合理地估算线程池大小

使用多态时,判断能否向下转型的两种思路

数仓4.0笔记——数仓环境搭建—— DataGrip准备和数据准备

小程序頭像組樣式

数组——11. 盛最多水的容器
![[development experience] development project trample pit collection [continuous update]](/img/02/7bea3bf09e9a27b6ab74399639f197.png)
[development experience] development project trample pit collection [continuous update]

安装Win11找不到固态硬盘如何解决?

Osgearth2.8 compiling silvering cloud effect
![Relevant interfaces of [asp.net core] option mode](/img/2e/847e7541cfc49fd69794089dce2df2.jpg)
Relevant interfaces of [asp.net core] option mode
随机推荐
How to add to-do items for win11 widgets? Win11 method of adding to-do widget
百度地图数据可视化
How to solve the problem that the solid state disk cannot be found when installing win11?
【无标题】
Training log on July 22, 2022
海通证券股票开户怎么样安全吗
Osgearth uses sundog's Triton ocean and silverlining cloud effects
Atelier macoll - notes de développement de la secte de l'ours 2
next数值型数据类型()出现输入错误后,下次依然能正常输入
AtCoder——Subtree K-th Max
2022/7/21 training summary
The numerical sequence caused by the PostgreSQL sequence cache parameter is discontinuous with interval gap
QT 设置缓存和编译输出路径
【AR学习】-- 二、 环境搭建
Set asp Net MVC site default page is the specified page
千呼万唤,5G双卡双通到底有多重要?
Configure MySQL master-slave replication with mysqldump or mydumper
不用MQTT C库就能实现MQTT连接、订阅和发布
【Unity项目实践】关卡解锁
2022/7/22 训练日志