当前位置:网站首页>Canal+kafka actual combat (monitor MySQL binlog to realize data synchronization)
Canal+kafka actual combat (monitor MySQL binlog to realize data synchronization)
2022-07-24 05:47:00 【Qinglin emo】
Ali canal brief introduction
canal Yes, it is java Developed incremental log parsing based on database , Provide incremental data subscription & Consumer Middleware
scene :
canal adopt binlog Get the changed data synchronously , And then send it to the storage destination , adopt Kafka Sync .
First step :
Refer to official documentation :https://github.com/alibaba/canal/wiki/QuickStart
Move here directly :
To configure mysql:
[mysqld]
log-bin=mysql-bin # Turn on binlog
binlog-format=ROW # choice ROW Pattern
server_id=1 # To configure MySQL replaction Need to define , Do not mix canal Of slaveId repeat
The second step :
to grant authorization canal link MySQL The account has the function of MySQL slave Authority , If you have an account, you can directly grant
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
The third step :
linux download canal
wget https://github.com/alibaba/canal/releases/download/canal-1.0.17/canal.deployer-1.0.17.tar.gz
decompression
tar -zxvf canal-1.0.17/canal.deployer-1.0.17.tar.gz
There are four folders after decompression
Get into conf Catalog modification canal The configuration file is changed 2 A place to :
1.canal.serverMode = kafka
2.kafka.bootstrap.servers = 127.0.0.1:9092
Here's my demo To configure
#################################################
######### common argument #############
#################################################
# tcp bind ip
canal.ip =
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
# canal.user = canal
# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458
# canal admin config
#canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
#canal.admin.register.auto = true
#canal.admin.register.cluster =
#canal.admin.register.name =
canal.zkServers =
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = kafka
# flush meta cursor/parse position to file
canal.file.data.dir = ${
canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true
## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false
# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size = 1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60
# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30
# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false
canal.instance.filter.dml.insert = false
canal.instance.filter.dml.update = false
canal.instance.filter.dml.delete = false
# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB
# binlog ddl isolation
canal.instance.get.ddl.isolation = false
# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256
# table meta tsdb info
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${
canal.file.data.dir:../conf}/${
canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${
canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360
#################################################
######### destinations #############
#################################################
canal.destinations = example
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5
# set this value to 'true' means that when binlog pos not found, skip to latest.
# WARN: pls keep 'false' in production env, or if you know what you want.
canal.auto.reset.latest.pos.mode = false
canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml
canal.instance.global.mode = spring
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${
canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml
##################################################
######### MQ Properties #############
##################################################
# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =
canal.aliyun.uid=
canal.mq.flatMessage = true
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = local
canal.mq.database.hash = true
canal.mq.send.thread.size = 30
canal.mq.build.thread.size = 8
##################################################
######### Kafka #############
##################################################
##kafka Address
kafka.bootstrap.servers = 127.0.0.1:9092
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0
kafka.kerberos.enable = false
kafka.kerberos.krb5.file = "../conf/kerberos/krb5.conf"
kafka.kerberos.jaas.file = "../conf/kerberos/jaas.conf"
##################################################
######### RocketMQ #############
##################################################
rocketmq.producer.group = test
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =
rocketmq.namespace =
rocketmq.namesrv.addr = 127.0.0.1:9876
rocketmq.retry.times.when.send.failed = 0
rocketmq.vip.channel.enabled = false
rocketmq.tag =
##################################################
######### RabbitMQ #############
##################################################
rabbitmq.host =
rabbitmq.virtual.host =
rabbitmq.exchange =
rabbitmq.username =
rabbitmq.password =
rabbitmq.deliveryMode =
Get into example Catalog modification instance.properties file
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0
# enable gtid use true/false
canal.instance.gtidon=false
# position info
canal.instance.master.address=10.70.7.7:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=
# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=tianwanggaidihu
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
# table regex
canal.instance.filter.regex=user-center.sys_dept,user-center.sys_menu,user-center.sys_post,user-center.sys_post_role,user-center.sys_role,user-center.sys_role_menu,user-center.sys_role_user,user-center.sys_user,user-center.sys_user_dept,user-center.sys_user_post
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
# mq config
canal.mq.topic=canalKafkaTopicUserCenter
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#################################################
Code up :
Introduce dependencies :kafka With 2.7.1
<!-- kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${
kafka.version}</version>
</dependency>
yml To configure :
spring:
kafka:
#kafka To configure
bootstrap-servers: 127.0.0.1:9092
producer: # producer
retries: 3 # Set greater than 0 Value , Then the client will resend the failed record
# Number of messages per batch sent
batch-size: 16384
buffer-memory: 33554432
# Specify the message key And the encoding and decoding method of the message body
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
acks: 1
consumer:
# Specify default consumers group id
group-id: test-consumer-group
auto-offset-reset: earliest
enable-auto-commit: false
auto-commit-interval: 5000
# Specify the message key And the encoding and decoding method of the message body
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# Self defined topic name , Use... In microservices Value Annotation injection call , If kafka There is no such topic in , Will be created automatically
# Self defined topic name , Use... In microservices Value Annotation injection call , If kafka There is no such topic in , Will be created automatically
listener:
ack-mode: manual_immediate
Consumer consumption news :
@Component
@Slf4j
public class CanalConsumer {
@KafkaListener(topics = "canalKafkaTopicUserCenter", groupId = "canalKafkaTopic")
public void canalKafkaTopic001(ConsumerRecord<String, String> record, Acknowledgment ack) {
// The value of database changes
String value = record.value();
log.info(" Data table changes :{}", value);
// Manual submission
ack.acknowledge();
}
}
Start the test :
Finished !
边栏推荐
- 达梦数据库_DISQL下各种连接数据库的方法和执行SQL、脚本的方法
- [Baidu map API] the version of the map JS API you are using is too low and no longer maintained. In order to ensure the normal use of the basic functions of the map, please upgrade to the latest versi
- 多商户商城系统功能拆解06讲-平台端商家入驻协议
- 推荐一款完全开源,功能丰富,界面精美的商城系统
- Flink 时间流处理
- Sunset: noontide target penetration vulnhub
- 【mycat】mycat介绍
- The repetition detection function of PHP multi line text content and count the number of repetitions
- @Async 没有异步执行
- Wechat applet returns parameters or trigger events
猜你喜欢

mysqldump 导出中文乱码

Flink Format系列(1)-JSON

Flink sql-client.sh使用

【activiti】activiti环境配置

【activiti】activiti入门

对接CRM系统和效果类广告,助力企业精准营销助力企业精准营销

Zotero快速上手指南

Help transform traditional games into gamefi, and web3games promote a new direction of game development

The way to attack the first poca hackson project "Manta network"

多商户商城系统功能拆解14讲-平台端会员等级
随机推荐
公司女同事深夜11点让我去她住处修电脑,原来是C盘爆红,看我一招搞定女同事....的电脑
Flink state使用
波卡生态发展不设限的奥义——多维解读平行链
Flink函数(2):CheckpointedFunction
Are you still trying to limit MySQL paging?
Interpretation of the randomness of POS mechanism, how does poca's randomness principle work?
Brief introduction of [data mining] cluster analysis
Likeshop single merchant mall system is built, and the code is open source without encryption
Unknown collation: ‘utf8mb4_ 0900_ ai_ Solution of CI '
Gavin wood, founder of Poka: what will happen to Poka governance V2?
公众号开发自定义菜单和服务器配置同时启用
Subsystem technology and ecology may memorabilia | square one plan launched, Boca launched xcm!
SqlServer 完全删除
Logical structure of Oracle Database
MySQL和Oracle的语法差异
Substrate technology and ecology June memorabilia | Polkadot decoded came to a successful conclusion, and the hacker song winning project injected new forces into the ecosystem
likeshop单商户SAAS商城系统无限多开
@Async does not execute asynchronously
数据仓库与数仓建模
【数据挖掘】零基础入门决策树