当前位置:网站首页>Flinkcdc2.0 uses flinksql to collect MySQL
Flinkcdc2.0 uses flinksql to collect MySQL
2022-07-25 07:08:00 【Big data Institute】
1. Dependency management
Put the following dependent packages into FLINK_HOME/lib Next .
flink-sql-connector-mysql-cdc-2.2.0.jar
flink-connector-jdbc_2.11-1.14.3.jar
flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
2.Flink Global configuration
modify flink-conf.yaml file :
execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
execution.checkpointing.max-concurrent-checkpoints: 1
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.timeout: 10min
state.backend: filesystem
state.checkpoints.dir: hdfs://mycluster/flinkcdc-checkpoints3.sql-client Submit job mode
1.Standalone Pattern
start-up sql-client:bin/sql-client.sh embedded
Be careful , If you use standalone mode , You need to start one Flink standalone colony , The method is as follows :
bin/start-cluster.sh
2.yarn-session Pattern ( How to use this case )
Start... First Flink yarn-session colony :bin/yarn-session.sh -s 1 -jm 1024 -tm 1024
And then it starts sql-client:bin/sql-client.sh embedded -s yarn-session
4.checkpoint To configure
Official website address :https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#checkpointing
#sql-client Set up checkpoint Parameters
SET 'execution.checkpointing.interval' = '10s';
SET 'parallelism.default' = '3';5. establish source table
CREATE TABLE `cars`(
`id` BIGINT,
`owerId` BIGINT,
`carCode` STRING,
`carColor` STRING,
`type` BIGINT,
`remark` STRING,
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'hadoop1',
'port' = '3306',
'username' = 'hive',
'password' = 'hive',
'database-name' = 'sca',
'table-name' = 'cars',
'connect.timeout' = '60s'
);6. establish sink table
CREATE TABLE `cars_copy`(
`id` BIGINT,
`owerId` BIGINT,
`carCode` STRING,
`carColor` STRING,
`type` BIGINT,
`remark` STRING,
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://hadoop1:3306/sca?useUnicode=true&characterEncoding=utf8',
'username' = 'hive',
'password' = 'hive',
'table-name' = 'cars_copy',
'sink.parallelism' = '2'
);7.source to sink table
Write the collected data into MySQL
insert into cars_copy SELECT * FROM cars;The number of data records in the query result table
select count(*) from cars_copyNew test data set ( Check the result table again )
insert into `cars` (`id`, `owerId`, `carCode`, `carColor`, `type`, `remark`) values('10096006','10244815',' harbor T·7RONE',' Red ','1',NULL);
insert into `cars` (`id`, `owerId`, `carCode`, `carColor`, `type`, `remark`) values('10096007','10244816',' harbor T·7RONE',' yellow ','1',NULL);remarks : If manually cacel job, Next time restart job The data in the table will still be collected again .
8.cacel job Save on Save point
bin/flink stop --savepointPath hdfs://mycluster/flinkcdc-savepoints -Dyarn.application.id=application_1658045078748_0001 79ce915e39fc1d18a194b6a464d7c3fdremarks : The last parameter is yarn Medium job id, The second parameter is flink Of job id.
9.cacel job Then resume job
# Set up job From the last time savepoint Position start processing
SET 'execution.checkpointing.interval' = '10s';
SET 'parallelism.default' = '3';
SET 'execution.savepoint.path' = 'hdfs://mycluster/flinkcdc-savepoints/savepoint-79ce91-92206bcaaad2';remarks : The value of this parameter is savepoint route .
# perform flink sql job
insert into cars_copy SELECT * FROM cars;# New test data set
insert into `cars` (`id`, `owerId`, `carCode`, `carColor`, `type`, `remark`) values('10096008','10244815',' harbor T·7RONE',' Red ','1',NULL);
insert into `cars` (`id`, `owerId`, `carCode`, `carColor`, `type`, `remark`) values('10096009','10244816',' harbor T·7RONE',' yellow ','1',NULL);# Re query the number of data records in the result table
select count(*) from cars_copyNormal condition , What is collected at this time is the new data , Historical data will not be collected .
remarks :Flink SQL Way to collect MySQL data , Easy to use , But only single tables are supported .
边栏推荐
- Leave the factory and sell insurance
- EFCore高级Saas系统下单DbContext如何支持不同数据库的迁移
- Meta is in a deep quagmire: advertisers reduce spending and withdraw from the platform
- [Yugong series] July 2022 go teaching course 016 logical operators and other operators of operators
- [Yugong series] July 2022 go teaching course 015 assignment operators and relational operators of operators
- 【SemiDrive源码分析】【驱动BringUp】39 - Touch Panel 触摸屏调试
- Mathematics Olympiad vs Informatics Olympiad (July 19, 2022)
- knapsack problem
- Health clock in daily reminder tired? Then let automation help you -- hiflow, application connection automation assistant
- 如何学习 C 语言?
猜你喜欢

BOM概述
![[Yugong series] July 2022 go teaching course 015 assignment operators and relational operators of operators](/img/37/4a892b96bec8cfa7efe38046c5dfc9.png)
[Yugong series] July 2022 go teaching course 015 assignment operators and relational operators of operators

Create a new STM32 project and configure it - based on registers

Qt实战案例(53)——利用QDrag实现拖拽拼图功能

2022 Tiangong cup ctf--- crypto1 WP

2022 Shenzhen cup

With apple not making money, the 2trillion "fruit chain" abandons "fruit" and embraces "special"

如何学习 C 语言?

【每日一题】1184. 公交站间的距离
![[yolov5 practice 3] traffic sign recognition system based on yolov5 - model training](/img/2f/1d2938dafa17c602c9aaf640be9bf1.png)
[yolov5 practice 3] traffic sign recognition system based on yolov5 - model training
随机推荐
EFCore高级Saas系统下单DbContext如何支持不同数据库的迁移
Install, configure, and use the metroframework in the C WinForms application
Tp5.1 foreach adds a new field in the controller record, and there is no need to write all the other fields again without changing them (not operating in the template) (paging)
Scavenging vultures or woodpeckers? How to correctly understand short selling
Boiling short drama Jianghu: nine of the ten production teams are shooting, with a head sharing fund of more than 30million, and users are addicted to paying routines
CTF Crypto---RSA KCS1_ Oaep mode
Luo min from qudian, prefabricate "leeks"?
Precautions for starting up the server of Dahua Westward Journey
Observer mode
论文阅读:UNET 3+: A FULL-SCALE CONNECTED UNET FOR MEDICAL IMAGE SEGMENTATION
【电脑讲解】去电脑维修店修电脑需要注意什么?
Over adapter mode
批量导入数据,一直提示 “失败原因:SQL解析失败:解析文件失败::null”怎么回事?
Restrict Su command and sudo mechanism to promote nmap and console command netstat
机器学习两周学习成果
[yolov5 practice 3] traffic sign recognition system based on yolov5 - model training
[Yugong series] July 2022 go teaching course 016 logical operators and other operators of operators
Example demonstration of math.random() random function
Cointegraph wrote: relying on the largest Dao usdd to become the most reliable stable currency
Special analysis of data security construction in banking industry