当前位置:网站首页>数据湖(十九):SQL API 读取Kafka数据实时写入Iceberg表
数据湖(十九):SQL API 读取Kafka数据实时写入Iceberg表
2022-07-24 03:32:00 【Lanson】
SQL API 读取Kafka数据实时写入Iceberg表
从Kafka中实时读取数据写入到Iceberg表中,操作步骤如下:
一、首先需要创建对应的Iceberg表
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);
env.enableCheckpointing(1000);
//1.创建Catalog
tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +
"'type'='iceberg'," +
"'catalog-type'='hadoop'," +
"'warehouse'='hdfs://mycluster/flink_iceberg')");
//2.创建iceberg表 flink_iceberg_tbl
tblEnv.executeSql("create table hadoop_iceberg.iceberg_db.flink_iceberg_tbl3(id int,name string,age int,loc string) partitioned by (loc)");二、编写代码读取Kafka数据实时写入Iceberg
public class ReadKafkaToIceberg {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);
env.enableCheckpointing(1000);
/**
* 1.需要预先创建 Catalog 及Iceberg表
*/
//1.创建Catalog
tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +
"'type'='iceberg'," +
"'catalog-type'='hadoop'," +
"'warehouse'='hdfs://mycluster/flink_iceberg')");
//2.创建iceberg表 flink_iceberg_tbl
// tblEnv.executeSql("create table hadoop_iceberg.iceberg_db.flink_iceberg_tbl3(id int,name string,age int,loc string) partitioned by (loc)");
//3.创建 Kafka Connector,连接消费Kafka中数据
tblEnv.executeSql("create table kafka_input_table(" +
" id int," +
" name varchar," +
" age int," +
" loc varchar" +
") with (" +
" 'connector' = 'kafka'," +
" 'topic' = 'flink-iceberg-topic'," +
" 'properties.bootstrap.servers'='node1:9092,node2:9092,node3:9092'," +
" 'scan.startup.mode'='latest-offset'," +
" 'properties.group.id' = 'my-group-id'," +
" 'format' = 'csv'" +
")");
//4.配置 table.dynamic-table-options.enabled
Configuration configuration = tblEnv.getConfig().getConfiguration();
// 支持SQL语法中的 OPTIONS 选项
configuration.setBoolean("table.dynamic-table-options.enabled", true);
//5.写入数据到表 flink_iceberg_tbl3
tblEnv.executeSql("insert into hadoop_iceberg.iceberg_db.flink_iceberg_tbl3 select id,name,age,loc from kafka_input_table");
//6.查询表数据
TableResult tableResult = tblEnv.executeSql("select * from hadoop_iceberg.iceberg_db.flink_iceberg_tbl3 /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/");
tableResult.print();
}
}启动以上代码,向Kafka topic中生产如下数据:
1,zs,18,beijing
2,ls,19,shanghai
3,ww,20,beijing
4,ml,21,shanghai我们可以看到控制台上有对应实时数据输出,查看对应的Icberg HDFS目录,数据写入成功。
边栏推荐
- idea写web项目时报错Failed to load resource: the server responded with a status of 404 (Not Found)
- Connected graph (day 72)
- JS 數組 isAarray() typeof
- IO stream sorting
- Basic syntax of MySQL DDL and DML and DQL
- How will you answer the "Hello world" challenge written in C language?
- Genesis public chain: Tamp the foundation of Web 3.0 development
- Advantages, disadvantages and summary of sequence list and linked list
- JS Array isaarray () Type of
- Basic syntax of MySQL DDL and DML and DQL
猜你喜欢
随机推荐
uva1467
leetcode hot 100(刷题篇8)(232/88/451/offer10/offer22/344/)
Bingbing learning notes: basic operation of vim tool
C user defined type details
Internet of things installation and debugging personnel let "smart" life come early
Jump statement in day011 loop structure
Insist on accompanying study
uva1445
Summary of Zhang Yu's 30 lectures on Advanced Mathematics
STL set容器
C語言經典練習題(2)——“冒泡排序(Bubble Sort)“
You must know the ten vulnerabilities of invalid access control
FTP service and configuration
Exttestngireporterlistener all codes
Standard C language 10
JIRA automation experience sharing for 2 years
水题: 接雨水
A series of problems of dp+ backtracking segmentation palindrome string
Data Lake: introduction to Apache Hudi
正則錶達式 \b \B 深入淺出理解單詞邊界的匹配









