当前位置:网站首页>Oceanus practice - develop MySQL CDC to es SQL jobs from 0 to 1
Oceanus practice - develop MySQL CDC to es SQL jobs from 0 to 1
2022-06-24 05:43:00 【Wuyuntao】
Real time is the future , Recently in Tencent cloud Oceanus Real time computing services , The following is a mysql To flink To ES practice . Share with you ~
1. Environment building
1.1 establish Oceanus colony
stay Oceanus Console 【 Cluster management 】->【 New cluster 】 Page create cluster , Choose the region 、 Availability zone 、VPC、 journal 、 Storage , Set the initial password, etc .
If not used before VPC, journal , Store these components , You need to create it first .
VPC And subnets need to be connected with the following Mysql、ES Clusters use the same , Otherwise, you need to get through manually ( Such as peer-to-peer connection ).
The created cluster is as follows :
1.2 establish Mysql colony
On Tencent cloud homepage 【 product 】->【 database 】->【 Cloud database MySQL】 Page purchase Mysql colony .
stay MySQL Console Find the created MySQL colony , stay 【 Database management 】->【 Parameter setting 】 Modify the following parameters on the page :
binlog_row_image=FULL
1.3 stay mysql Create tables in the database :
The implementation is as follows sql, Or create tables through visual pages .
-- Take the student transcript as an example CREATE TABLE `cdc_source4es` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT ' Student number ', `score` int(11) NOT NULL COMMENT ' fraction ', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO\_INCREMENT=7 DEFAULT CHARSET=utf8 COMMENT='create for student score'
1.4 establish Elastic Search colony
On Tencent cloud homepage 【 product 】->【 big data 】->【ElasticSearch】 Page purchase ES colony , And here for simplicity , Chose and Oceanus The same area , The same as the zone . The network selection is the same as above VPC.
This time we created 1 individual ES6 Version of cluster , adopt ES Console see , The created cluster is as follows :
After creation, you can use Kibana see ES Cluster information . If in Dev Tools Execute the following commands on the panel :
# View the cluster nodes GET _cat/nodes
# The returned node information is normal 172.28.1.1 43 99 1 0.06 0.06 0.12 dilm - 1627027760001130832 172.28.1.2 65 99 3 0.03 0.12 0.13 dilm - 1627027760001130732 172.28.1.3 29 99 3 0.08 0.08 0.12 dilm * 1627027760001130632
notes :ES There is no need to create a table like entity in advance .
thus , The environment is ready .
2. Job creation
2.1 establish SQL Homework
stay Oceanus Console 【 Job management 】->【 New job 】-> SQL Homework , Select the cluster creation job just created . Then at the end of the assignment 【 Development and debugging 】->【 Operation parameters 】 Add necessary connector, Such as mysql-cdc connector、elasticsearch6/7 connector.
notes :es connector The version to be purchased ES Consistent component version .
2.2 establish Source End
Choose... Here mysql As a data source , And continuously update the subsequent data to ES in .
-- mysql-cdc connector
CREATE TABLE `mysql_source` (
`id` int,
`score` int,
PRIMARY KEY (`id`) NOT ENFORCED -- If the database table to be synchronized has a primary key defined , Then you also need to define
) WITH (
'connector' = 'mysql-cdc', -- It has to be for 'mysql-cdc'
'hostname' = '172.28.28.213', -- Database IP
'port' = '3306', -- The access port of the database
'username' = 'youruser', -- User name for database access ( Need to provide SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, SELECT, RELOAD jurisdiction )
'password' = 'yourpassword', -- Password for database access
'database-name' = 'test', -- Databases that need to be synchronized
'table-name' = 'cdc_source4es' -- The name of the data table that needs to be synchronized
);2.3 establish Sink End
here sink No need to ES Initialize the cluster in advance , Data can be written directly .
-- Be careful ! If you enable Elasticsearch User name and password authentication function , At present, you can only use Flink 1.10 The old grammar of . If authentication is not required , You can use Flink 1.11 New syntax .
-- See https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#elasticsearch-connector
CREATE TABLE es_old_sink (
`id` INT,
`score` INT
) WITH (
'connector.type' = 'elasticsearch', -- Output to Elasticsearch
'connector.version' = '6', -- Appoint Elasticsearch Version of , for example '6', '7'. Pay attention to the necessary and selected built-in Connector Versions,
'connector.hosts' = 'http://172.28.1.175:9200', -- Elasticsearch The connection address of
'connector.index' = 'connector-test-index', -- Elasticsearch Of Index name
'connector.document-type' = '_doc', -- Elasticsearch Of Document type
'connector.username' = 'elastic', -- Optional parameters : Elasticsearch user name
'connector.password' = 'yourpassword', -- Optional parameters : Elasticsearch password
'update-mode' = 'upsert', -- Optional without primary key 'append' Pattern , Or with a primary key 'upsert' Pattern
'connector.key-delimiter' = '$', -- Optional parameters , The concatenation character of the composite primary key ( The default is _ Symbol , for example key1_key2_key3)
'connector.key-null-literal' = 'n/a', -- The primary key is null Substitute string for , The default is 'null'
'connector.failure-handler' = 'retry-rejected', -- Optional error handling . Can choose 'fail' ( Throw an exception )、'ignore'( Ignore any errors )、'retry-rejected'( retry )
'connector.flush-on-checkpoint' = 'true', -- Optional parameters , Batch writes are not allowed during snapshot (flush), The default is true
'connector.bulk-flush.max-actions' = '42', -- Optional parameters , Maximum number of pieces per batch
'connector.bulk-flush.max-size' = '42 mb', -- Optional parameters , Cumulative maximum size per batch ( Only support mb)
'connector.bulk-flush.interval' = '60000', -- Optional parameters , Interval between batch writes (ms)
'connector.connection-max-retry-timeout' = '1000', -- Maximum timeout per request (ms)
--'connector.connection-path-prefix' = '/v1' -- Optional fields , The path prefix appended to each request
'format.type' = 'json' -- Output data format , Currently only supported 'json'
);2.4 Operator operation
Only simple data insertion is done here , No complicated calculations are made .
INSERT INTO es_old_sink select id, score from mysql_source;
3. Validation summary
stay Kibana Of Dev Tools Query in ES Data in , Whether the data is inserted successfully .
# Query all the data under the index GET connector-test-index/_search
边栏推荐
- Where to register the HK domain name? What should I pay attention to when registering the domain name
- How to check the domain name of the website? Are there any skills to speak of
- How to do domain name resolution? What does domain name resolution mean?
- How do users check the domain name registrar? What are the conditions for domain name registration?
- What is the website domain name and how to register the secondary domain name?
- How to buy a domain name? What should I pay attention to when buying a domain name?
- How to register a company domain name how to build a website with a domain name
- Analysis and summary of the packet capturing artifact tcpdump - covering major use scenarios and advanced usage
- What is the subdomain name and how to register the domain name
- Massif tool of Valgrind
猜你喜欢
Easy to understand JDBC tutorial - absolutely suitable for zero Foundation
Learning routes and materials for cloud native O & M engineers

How should we learn cloud native in 2022?
What cloud native knowledge should programmers master?

Answer questions! This article explains the automated testing framework in software testing from beginning to end
随机推荐
Lightweight toss plan 3, develop in the browser - build your own development bucket (Part 1)
How to apply for web domain name what is the role of domain name
How do individuals register domain names? What are the precautions for individual domain name registration?
How do users purchase domain names? What should I pay attention to when buying a domain name?
How about the online domain name? Is it easy to use from the current market
Net domain name? Net domain name?
Net domain name how to log in to a website
How to build a website with a domain name? What are the precautions for website construction?
How to register an overseas domain name what should be paid attention to when registering a domain name
How about the work domain name? Does the work domain name need real name authentication?
Kubernetes configures two ways of hot update
Massif tool of Valgrind
3D visualization of smart dam
How to buy a network domain name? Is the domain expensive
What domain name is Io? The role of domain name for web address
How to register the company domain name mailbox? Is the operation process complicated
How to get the website domain name? Does it cost money to apply for a website domain name?
How to resolve computer domain name resolution errors how to select a good domain name
Technical dry goods | multi modal classification and recognition of audio-visual scenes in the stage of Tencent cloud smart media AI
Kubesphere manages multi tke clusters