当前位置:网站首页>Interpretation of Flink catalog
Interpretation of Flink catalog
2022-07-23 20:30:00 【Yang Linwei】
List of articles
01 introduction
We know Flink Yes Table( surface )、View( View )、Function( function / operator )、Database( database ) The concept of , This is similar to the concept in the relational database we usually use .
Relative to these concepts of relational database ,Flink There's one more Catalog( Catalog ) The concept of , This article will explain .

02 Catalog
2.1 Catalog summary
One of the most critical aspects of data processing is Management metadata :
- Metadata can be temporary , For example, in
FlinkMiddle temporary watch 、 Or byTableEnvironmentRegisteredUDF; - Metadata can also be persistent , for example
Hive MetastoreThe metadata in .
Catalog stay Flink Provides a unified API, Used to manage metadata , And make it available from Table API and SQL To access... In a query statement .Catalog Provides metadata information , Like databases 、 surface 、 Partition 、 Views and functions and information stored in databases or other external systems .
2.2 Catalog classification
Catalog At present, it is divided into the following categories :
| classification | describe | defects |
|---|---|---|
| GenericInMemoryCatalog | Based on memory implementation Catalog | All metadata is only in session Available in the life cycle of |
| JdbcCatalog | Can be Flink adopt JDBC Protocol to connect to relational database | JDBC Catalog only PostgresCatalog |
| HiveCatalog | As a native Flink Persistent storage of metadata , As well as reading and writing existing Hive Metadata interface | Hive Metastore Store all metadata object names in lowercase . and GenericInMemoryCatalog Case sensitive . |
| Customize Catalog | By implementing Catalog Interface to develop custom Catalog | - |
2.3 Catalog API
2.3.1 Database operation
// create database
catalog.createDatabase("mydb", new CatalogDatabaseImpl(...), false);
// drop database
catalog.dropDatabase("mydb", false);
// alter database
catalog.alterDatabase("mydb", new CatalogDatabaseImpl(...), false);
// get database
catalog.getDatabase("mydb");
// check if a database exist
catalog.databaseExists("mydb");
// list databases in a catalog
catalog.listDatabases("mycatalog");
2.3.2 Table operations
// create table
catalog.createTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);
// drop table
catalog.dropTable(new ObjectPath("mydb", "mytable"), false);
// alter table
catalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);
// rename table
catalog.renameTable(new ObjectPath("mydb", "mytable"), "my_new_table");
// get table
catalog.getTable("mytable");
// check if a table exist or not
catalog.tableExists("mytable");
// list tables in a database
catalog.listTables("mydb");
2.3.3 View operation
// create view
catalog.createTable(new ObjectPath("mydb", "myview"), new CatalogViewImpl(...), false);
// drop view
catalog.dropTable(new ObjectPath("mydb", "myview"), false);
// alter view
catalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogViewImpl(...), false);
// rename view
catalog.renameTable(new ObjectPath("mydb", "myview"), "my_new_view", false);
// get view
catalog.getTable("myview");
// check if a view exist or not
catalog.tableExists("mytable");
// list views in a database
catalog.listViews("mydb");
2.3.4 Partition operation
// create view
catalog.createPartition(
new ObjectPath("mydb", "mytable"),
new CatalogPartitionSpec(...),
new CatalogPartitionImpl(...),
false);
// drop partition
catalog.dropPartition(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...), false);
// alter partition
catalog.alterPartition(
new ObjectPath("mydb", "mytable"),
new CatalogPartitionSpec(...),
new CatalogPartitionImpl(...),
false);
// get partition
catalog.getPartition(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));
// check if a partition exist or not
catalog.partitionExists(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));
// list partitions of a table
catalog.listPartitions(new ObjectPath("mydb", "mytable"));
// list partitions of a table under a give partition spec
catalog.listPartitions(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));
// list partitions of a table by expression filter
catalog.listPartitions(new ObjectPath("mydb", "mytable"), Arrays.asList(epr1, ...));
2.3.5 Function operation
catalog.createFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false);
// drop function
catalog.dropFunction(new ObjectPath("mydb", "myfunc"), false);
// alter function
catalog.alterFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false);
// get function
catalog.getFunction("myfunc");
// check if a function exist or not
catalog.functionExists("myfunc");
// list functions in a database
catalog.listFunctions("mydb");
2.4 Catalog Example (SQL Client The way )
① First you need to register Catalog: Users can access the memory created by default Catalog default_catalog, This Catalog There is a default database by default default_database. Users can also register other Catalog To existing Flink In the session , Create it as follows ( have access to Flink Inside Factory Factory mode dynamic loading ):
tableEnv.registerCatalog(new CustomCatalog("myCatalog"));
② Specify what to use :Flink Always in the current Catalog And looking for tables in the database 、 The view and UDF, The code is as follows :
Flink SQL> USE CATALOG myCatalog;
Flink SQL> USE myDB;
You can also provide a fully qualified name catalog.database.object To access not currently Catalog Metadata information in , The code is as follows :
Flink SQL> SELECT * FROM not_the_current_catalog.not_the_current_db.my_table;
③ Other general commands :
-- List the available Catalog
Flink SQL> show catalogs;
-- List the available databases
Flink SQL> show databases;
-- List the available tables
Flink SQL> show tables;
03 At the end of the article
This article mainly explains Flink Catalog The concept and usage of , If you are interested, you can further check the relevant documents on the official website , Here I list the core documents :
- catalogs:https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/catalogs/
- jdbc catalog(postgress):https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/connectors/table/jdbc/#postgres-%e6%95%b0%e6%8d%ae%e5%ba%93%e4%bd%9c%e4%b8%ba-catalog
- hive catalog:https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/connectors/table/hive/overview/
My next plan is to write “ How to customize Catalog” , as well as Catalog Application scenarios of ( You can read it first if you are interested 《Ververica Platform- Alibaba is brand new Flink Uncover secrets of Enterprise Edition 》) Related blogs , Thank you for reading , I hope I can help you , The end of this paper !
边栏推荐
猜你喜欢
随机推荐
VRRP+MSTP配置详解【华为eNSP实验】
Discussion on the usage of scanf () and getchar ()
【力扣】最接近的三数之和
哈希表、无序集合、映射的原理与实现
Use tinkerpop framework to add, delete, modify and check GDB
JDK安装包和Mysql安装包整理
QT With OpenGL(帧缓存篇)
【ASP.NET Core】选项模式的相关接口
next数值型数据类型()出现输入错误后,下次依然能正常输入
平安证券低佣金开户链接安全吗,怎么办理低佣金
区间DP-链式石子合并
NLP hotspots from ACL 2022 onsite experience
QT 设置缓存和编译输出路径
Win11小组件怎么添加待办事项?Win11添加待办事项小组件的方法
去广场吃饭
shell脚本中$#、$*、[email protected]、$?、$0等含义一文搞懂
121. 买卖股票的最佳时机
使用高德地图JS API 2.0加载起点终点路径轨迹
使用代码设置activity为透明
选择大于努力!贵阳校区小哥哥0基础成功转行软件测试收获12K!
![[ar learning] - II. Environment construction](/img/e8/c20de6a46ef70b6eb49684d685e4cd.png)








