当前位置:网站首页>Implementation principle of Flink connector mongodb CDC
Implementation principle of Flink connector mongodb CDC
2022-06-24 05:04:00 【Substitute】
1. CDC summary
CDC The full name is Change Data Capture, We usually refer to the technology that can capture data changes as CDC. Currently commonly described CDC The technology is mainly oriented to the change of database , Is a technique for capturing changes to data in a database .CDC The technology application scenarios of the are data synchronization 、 Data dissemination 、 Data integration, etc .
2. Debezium Introduce
3. Flink SQL CDC Principle introduction
Flink SQL CDC Built in Debezium Engine driven correlation Debezium source connector, Use its ability to extract logs for changes , take Debezium The corresponding database change data obtained by the engine (SourceRecord) Convert to Flink SQL cognitive RowData data , Send to downstream , therefore Flink Provides a Changelog Json format.
Flink Provided Changelog Json format We can simply understand it as Flink Yes, the one who came in RowData The data is packed in one layer , Then an operation type is added . for example , The original data format is like this :
{
"id": 1004,
"name": "Anne"
}after Changlog After format processing , It will change to the following format :
{
"data": {
"id": 1004,
"name": "Anne"
},
"op": "+I"
}4. Flink connector mongodb cdc principle
utilize Debezium Embeded Engine drive MongoDB Kafka Connector.MongoDB Kafka Connector yes MongoDB An official one Kafka Connector Realization , By subscribing ChangeStreamEvent To implement change data subscription .
4.1 Change Stream & Tailing oplog
MongoDB stay 3.6 In the past, it was only through constant tailing oplog To pull incremental oplog obtain CDC data , Set filter conditions manually , Manage breakpoint resuming by yourself .MongoDB from 3.6 The release began to come out Change Stream The function of , Provide real-time incremental data flow function . In the use of watch Start listening to the entire database /collection after , Once there is a qualified change ,Change Stream Will push a event Represents a change ( Insert / Delete / modify ). Every Change Stream Event It all includes a ResumeToken Used for breakpoint continuation .
Contrast item | Change Stream | Tailing Oplog |
|---|---|---|
Ease of use | Simple and easy to use , API friendly | The threshold is high , Need to know oplog Various format changes |
Fault recovery | Simple , The kernel performs unified progress management , adopt resumeToken Achieve fault recovery | Relatively complex , You need to manage incremental renewals by yourself , In case of failure, it is necessary to record the last pull oplog Of ts Field is converted to the next query filter |
update event | Support to return full-text files , Appoint fullDocument that will do | Returning full-text files... Is not supported , about update The operation needs to be based on oplog Medium _id Query again to get the full-text file |
Partition cluster adaptation | Directly initiate change stream You can subscribe to the entire cluster , And it is globally ordered | You need to create a pull process for each partition |
Persistence | Each of the returned event Are submitted to most nodes , The scenario of master-slave switching can also ensure data persistence | No guarantee oplog Submitted to most nodes |
Security | The user can only access the db Subscription changes on | need local Read access to the library |
4.2 MongoDB Kafka Connector
Debezium Connector for MongoDB Is based on oplog The way to achieve .MongoDB Of oplog in UPDATE The event does not retain the data state before the change , Only the information of the changed field is retained , Cannot be MongoDB Change records are converted to Flink Standard change flow (+I -U +U -D). It can only be converted to Upsert flow (+I +U -D), After a ChangelogNormalize Convert to a standard change flow .Update After The change record of needs to be complete after the change RowData, and Debezium Native Connector use dump oplog The way , Not very supportive .
MongoDB Official Kafka Connector use ChangeStreamEvent How to subscribe , Can be opened FullDocument To configure , Collect the latest complete information recorded in this line .
If configured MongoDB Kafka connector Of copy-existing=true It will start MongoSoureTask Copy the original data in the library ( stay Debezium It is called database in SnapShot Stage ):
* <ol> * <li>Get the latest resumeToken from MongoDB * <li>Create insert events for all configured namespaces using multiple threads. This step is * completed only after <em>all</em> collections are successfully copied. * <li>Start a change stream cursor from the saved resumeToken * </ol>
If there are changes to the data during replication , The changes will be applied after the data replication is completed . There may be duplication time between data copy and existing data , Because during copying , The client may be interested in mongodb Modify the data in , But because the data change time flow is idempotent , So consistency can be guaranteed .
If not configured copy-existing=true Only watch To the database after the task starts Change Event.
4.3 MongoSourceTask agent (MongoDBConnectorSourceTask)
because DebeziumSourceFunction Realized CheckpointedFunction, So every once in a while Checkpoint, To ensure that Flink Mission Extractly Once semantics , However, if you are in the database Snapshot Stage , Generally, there is no offset Can be used to checkpoint Of , So this stage needs to be prevented Flink Check point of (Checkpoint) Generate .
So how do we know the database Sanpshot The phase is over , So with MongoDBConnectorSourceTask Acting for the former MongoSourceTask, stay poll Method will keep the last one temporarily SnapshotRecord, By setting SnapshotRecord Field is Last To mark Snapshot End of phase .
Exit in the following two scenarios Snapshot Stage :
- received Change event(non-snapshot record)
- received Change event And represented MongoSourceTask.isCopying Mark is false
边栏推荐
- What's wrong with the failure of uploading web pages to ECS? How many kinds of servers are there
- 阿里云新一代云计算体系架构 CIPU 到底是啥?
- There are many ways to confirm and modify the remote port number
- What domain name does not need to be filed? What should be done for domain name filing
- Automatically convert local pictures to network pictures when writing articles
- How to control CDN traffic gracefully in cloud development?
- mini-Web框架:装饰器方式的添加路由 | 黑马程序员
- IP and traffic reconciliation tool networktrafficview
- 解析后人类时代类人机器人的优越性
- How does ECS build websites? Is it troublesome for ECs to build websites?
猜你喜欢

Introduction à la méthode de descente par Gradient - document d'apprentissage automatique pour les programmeurs de chevaux noirs

SAP mts/ato/mto/eto topic 7: ATO mode 1 m+m mode strategy 82 (6892)

阿里云新一代云计算体系架构 CIPU 到底是啥?

Training methods after the reform of children's programming course

Facebook internal announcement: instant messaging will be re integrated

Introduction to gradient descent method - black horse programmer machine learning handout

014_ TimePicker time selector

SAP mts/ato/mto/eto topic 10: ETO mode q+ empty mode unvalued inventory policy customization

Are you ready for the exam preparation strategy of level II cost engineer in 2022?

让孩子们学习Steam 教育的应用精髓
随机推荐
Develop a customized music player from scratch, and your girlfriend will have it?
解析90后创客教育的主观积极性
The easyplayer player displays compileerror:webassembly Reason for instance() and its solution
What if the ECS forgets its password? How can I retrieve my forgotten password?
阿里云新一代云计算体系架构 CIPU 到底是啥?
Digital transformation practice of Zheshang Bank
The trunk warehouse can also be tob, and Tencent cloud microenterprises do not leave quality behind
SAP mts/ato/mto/eto topic 10: ETO mode q+ empty mode unvalued inventory policy customization
There are many ways to confirm and modify the remote port number
Medical industry EDI overview
查找GBase 8c数据库当前索引?
Weak current engineer, 25g Ethernet and 40g Ethernet: which do you choose?
5g and industrial Internet
Confluence data center version is nearing its lifecycle
GDB debugging container and command saving
RedHat 8 time synchronization and time zone modification
Popularization of children's programming education in specific scenarios
Spirit breath development log (15)
Leetcode (question 1) - sum of two numbers
What is the secondary domain name of the website? What is the relationship between the secondary domain name and the primary domain name?