当前位置:网站首页>flink同步mysql数据到ES
flink同步mysql数据到ES
2022-06-22 20:50:00 【shy_snow】
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/* https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/elasticsearch/ */
public class MysqlSinkToES {
public static void main(String[] args) throws Exception {
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
// 数据源表
String sourceDDL =
"CREATE TABLE users (\n" +
" id BIGINT PRIMARY KEY NOT ENFORCED ,\n" +
" name STRING,\n" +
" birthday TIMESTAMP(3),\n" +
" ts TIMESTAMP(3)\n" +
") WITH (\n" +
" 'connector' = 'mysql-cdc',\n" +
" 'hostname' = '192.168.129.102',\n" +
" 'port' = '3306',\n" +
" 'username' = 'root',\n" +
" 'password' = '123456',\n" +
" 'server-time-zone' = 'Asia/Shanghai',\n" +
" 'database-name' = 'cdc',\n" +
" 'table-name' = 'users'\n" +
" )";
// 输出目标表
String sinkDDL =
"CREATE TABLE users_sink_es\n" +
"(\n" +
" id BIGINT PRIMARY KEY NOT ENFORCED,\n" +
" name STRING,\n" +
" birthday TIMESTAMP(3),\n" +
" ts TIMESTAMP(3)\n" +
") \n" +
"WITH (\n" +
" 'connector' = 'elasticsearch-6',\n" +
" 'hosts' = 'http://192.168.129.103:9200',\n" +
" 'index' = 'users'\n" +
", 'document-type' = 'doc'\n" +
")";
// 简单的聚合处理
String transformSQL = "INSERT INTO users_sink_es SELECT * FROM users";
tableEnv.executeSql(sourceDDL);
tableEnv.executeSql(sinkDDL);
TableResult result = tableEnv.executeSql(transformSQL);
result.print();
env.execute("mysql-to-es");
}
}
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>flink_es_test</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<hadoop.version>2.6.0</hadoop.version>
<flink.version>1.12.2</flink.version>
<hudi.version>0.9.0</hudi.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.12</artifactId>
<version>1.10.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-flink-bundle_2.11</artifactId>
<version>${hudi.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.49</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.6</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
<!-- <scope>test</scope>-->
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
<!-- <scope>test</scope>-->
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.2</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>4.4.5</version>
</dependency>
</dependencies>
</project>
es操作命令
#创建index
curl -X PUT "192.168.129.103:9200/users" -H 'Content-Type: application/json' -d' { "settings" : { "number_of_shards" : 3, "number_of_replicas" : 2 } }'
#查询index下全部数据
curl "http://192.168.129.103:9200/users/doc/_search"
#删除index
curl -X DELETE "192.168.129.103:9200/users"
边栏推荐
- 2021-07-27
- 2021-04-05
- Spark RDD Programming Guide(2.4.3)
- 新捷途X70S上市8.79万起,空间安全越级,不愧是网红国民大7座SUV
- 数据库访问工具简介
- Greedy interval problem (4)
- 2020-12-20
- 2021-08-22
- Reinforcement learning weekly (issue 50): saferl kit, gmi-drl, rp-sdrl & offline meta reinforcement learning
- Task cache compilation caused by gradle build cache
猜你喜欢

2021-08-21

2021-04-05

Core and semiconductor "RF eda/ filter design platform" shines ims2022

Some shaders in AB package do not trigger the callback of ipreprocessshaders

Install the typescript environment and enable vscode to automatically monitor the compiled TS file as a JS file

Freshman girls' nonsense programming is popular! Those who understand programming are tied with Q after reading

SourceTree版本管理常用操作

Spark RDD Programming Guide(2.4.3)

Grafana report display of sentinel based high availability current limiting system
Summary of transport layer knowledge points
随机推荐
Case 2 of SQL performance degradation caused by modifying implicit parameters
SSH method 2 for adding node nodes in Jenkins
使用Redisson操作分布式队列的注意事项
Do not know how to choose the development of digital currency wallet?
A group of K overturned linked lists [disassembly / overturning / assembly of linked lists]
Total number of combinations [standard backtracking + backtracking techniques -- reducing stack depth]
Freshman girls' nonsense programming is popular! Those who understand programming are tied with Q after reading
Pycharm configuring remote connection server development environment
How to continuously improve performance| DX R & D mode
2021-05-02
In the third week of June, the main growth ranking list (BiliBili platform) of station B single feigua data up was released!
Greedy distribution problem (1)
Introduction to database access tools
Fundamentals of shell programming (Part 7: branch statement -if)
Remote access and control - SSH Remote Management and TCP wrappers access control
Greedy interval problem (2)
2021-04-14
mysql主从同步及其分库分表基本流程
Summary of transport layer knowledge points
From 11 hours to 25 seconds -- is there room for optimization?