当前位置:网站首页>Canal+kafka actual combat (monitor MySQL binlog to realize data synchronization)
Canal+kafka actual combat (monitor MySQL binlog to realize data synchronization)
2022-07-24 05:47:00 【Qinglin emo】
Ali canal brief introduction
canal Yes, it is java Developed incremental log parsing based on database , Provide incremental data subscription & Consumer Middleware
scene :
canal adopt binlog Get the changed data synchronously , And then send it to the storage destination , adopt Kafka Sync .
First step :
Refer to official documentation :https://github.com/alibaba/canal/wiki/QuickStart
Move here directly :
To configure mysql:
[mysqld]
log-bin=mysql-bin # Turn on binlog
binlog-format=ROW # choice ROW Pattern
server_id=1 # To configure MySQL replaction Need to define , Do not mix canal Of slaveId repeat
The second step :
to grant authorization canal link MySQL The account has the function of MySQL slave Authority , If you have an account, you can directly grant
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
The third step :
linux download canal
wget https://github.com/alibaba/canal/releases/download/canal-1.0.17/canal.deployer-1.0.17.tar.gz
decompression
tar -zxvf canal-1.0.17/canal.deployer-1.0.17.tar.gz
There are four folders after decompression
Get into conf Catalog modification canal The configuration file is changed 2 A place to :
1.canal.serverMode = kafka
2.kafka.bootstrap.servers = 127.0.0.1:9092
Here's my demo To configure
#################################################
######### common argument #############
#################################################
# tcp bind ip
canal.ip =
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
# canal.user = canal
# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458
# canal admin config
#canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
#canal.admin.register.auto = true
#canal.admin.register.cluster =
#canal.admin.register.name =
canal.zkServers =
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = kafka
# flush meta cursor/parse position to file
canal.file.data.dir = ${
canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true
## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false
# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size = 1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60
# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30
# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false
canal.instance.filter.dml.insert = false
canal.instance.filter.dml.update = false
canal.instance.filter.dml.delete = false
# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB
# binlog ddl isolation
canal.instance.get.ddl.isolation = false
# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256
# table meta tsdb info
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${
canal.file.data.dir:../conf}/${
canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${
canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360
#################################################
######### destinations #############
#################################################
canal.destinations = example
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5
# set this value to 'true' means that when binlog pos not found, skip to latest.
# WARN: pls keep 'false' in production env, or if you know what you want.
canal.auto.reset.latest.pos.mode = false
canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml
canal.instance.global.mode = spring
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${
canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml
##################################################
######### MQ Properties #############
##################################################
# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =
canal.aliyun.uid=
canal.mq.flatMessage = true
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = local
canal.mq.database.hash = true
canal.mq.send.thread.size = 30
canal.mq.build.thread.size = 8
##################################################
######### Kafka #############
##################################################
##kafka Address
kafka.bootstrap.servers = 127.0.0.1:9092
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0
kafka.kerberos.enable = false
kafka.kerberos.krb5.file = "../conf/kerberos/krb5.conf"
kafka.kerberos.jaas.file = "../conf/kerberos/jaas.conf"
##################################################
######### RocketMQ #############
##################################################
rocketmq.producer.group = test
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =
rocketmq.namespace =
rocketmq.namesrv.addr = 127.0.0.1:9876
rocketmq.retry.times.when.send.failed = 0
rocketmq.vip.channel.enabled = false
rocketmq.tag =
##################################################
######### RabbitMQ #############
##################################################
rabbitmq.host =
rabbitmq.virtual.host =
rabbitmq.exchange =
rabbitmq.username =
rabbitmq.password =
rabbitmq.deliveryMode =
Get into example Catalog modification instance.properties file
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0
# enable gtid use true/false
canal.instance.gtidon=false
# position info
canal.instance.master.address=10.70.7.7:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=
# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=tianwanggaidihu
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
# table regex
canal.instance.filter.regex=user-center.sys_dept,user-center.sys_menu,user-center.sys_post,user-center.sys_post_role,user-center.sys_role,user-center.sys_role_menu,user-center.sys_role_user,user-center.sys_user,user-center.sys_user_dept,user-center.sys_user_post
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
# mq config
canal.mq.topic=canalKafkaTopicUserCenter
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#################################################
Code up :
Introduce dependencies :kafka With 2.7.1
<!-- kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${
kafka.version}</version>
</dependency>
yml To configure :
spring:
kafka:
#kafka To configure
bootstrap-servers: 127.0.0.1:9092
producer: # producer
retries: 3 # Set greater than 0 Value , Then the client will resend the failed record
# Number of messages per batch sent
batch-size: 16384
buffer-memory: 33554432
# Specify the message key And the encoding and decoding method of the message body
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
acks: 1
consumer:
# Specify default consumers group id
group-id: test-consumer-group
auto-offset-reset: earliest
enable-auto-commit: false
auto-commit-interval: 5000
# Specify the message key And the encoding and decoding method of the message body
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# Self defined topic name , Use... In microservices Value Annotation injection call , If kafka There is no such topic in , Will be created automatically
# Self defined topic name , Use... In microservices Value Annotation injection call , If kafka There is no such topic in , Will be created automatically
listener:
ack-mode: manual_immediate
Consumer consumption news :
@Component
@Slf4j
public class CanalConsumer {
@KafkaListener(topics = "canalKafkaTopicUserCenter", groupId = "canalKafkaTopic")
public void canalKafkaTopic001(ConsumerRecord<String, String> record, Acknowledgment ack) {
// The value of database changes
String value = record.value();
log.info(" Data table changes :{}", value);
// Manual submission
ack.acknowledge();
}
}
Start the test :
Finished !
边栏推荐
- 《机器学习》(周志华) 第5章 神经网络 学习心得 笔记
- 统计信号处理小作业——瑞利分布噪声中确定性直流信号的检测
- Scarcity in Web3: how to become a winner in a decentralized world
- 【mycat】mycat分库分表
- Multi merchant mall system function disassembly lecture 06 - platform side merchant settlement agreement
- Flink Watermark机制
- 达梦数据库_支持的表类型,用法,特性
- 【activiti】网关
- 达梦数据库_常用命令
- Mysqldump export Chinese garbled code
猜你喜欢

Inventory Poka ecological potential project | cross chain characteristics to promote the prosperity of multi track

Penetration testing knowledge - industry terminology

达梦数据库_DISQL下各种连接数据库的方法和执行SQL、脚本的方法

【activiti】流程变量

黑龙江省SVG格式地图的创建及生成

Logical structure of Oracle Database

Multi merchant mall system function disassembly lecture 06 - platform side merchant settlement agreement

多商户商城系统功能拆解06讲-平台端商家入驻协议

Principle of fusdt liquidity pledge mining development logic system

Likeshop single merchant mall system is built, and the code is open source without encryption
随机推荐
Flink state使用
达梦数据库_常用的用户管理命令
Sunset: noontide target penetration vulnhub
Use streaming media to transfer RSTP to the Web terminal for playback (II) [review]
达梦数据库_用户口令策略
Multi merchant mall system function disassembly lecture 05 - main business categories of platform merchants
Multi merchant mall system function disassembly lecture 04 - platform side merchants settling in
Multi merchant mall system function disassembly lesson 03 - platform side merchant management
Canal+kafka实战(监听mysql binlog实现数据同步)
Likeshop single merchant mall system is built, and the code is open source without encryption
MySQL和Oracle的语法差异
Web3 Foundation grant program empowers developers to review four successful projects
Moonbeam orbiters program: provides a new way for collectors to participate in moonbeam and Moonriver
The bottom of decentralization is consensus -- Interpretation of Polkadot mixed consensus mechanism
Likeshop100%开源无加密-B2B2C多商户商城系统
flink checkpoint配置详解
Multi merchant mall system function disassembly lecture 06 - platform side merchant settlement agreement
Penetration testing knowledge - industry terminology
My little idea -- using MATLAB to realize reading similar to ring buffer
ThreadLocal存储当前登录用户信息