当前位置:网站首页>Flink synchronizes MySQL data to es
Flink synchronizes MySQL data to es
2022-06-23 00:44:00 【shy_ snow】
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/* https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/elasticsearch/ */
public class MysqlSinkToES {
public static void main(String[] args) throws Exception {
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
// Data source table
String sourceDDL =
"CREATE TABLE users (\n" +
" id BIGINT PRIMARY KEY NOT ENFORCED ,\n" +
" name STRING,\n" +
" birthday TIMESTAMP(3),\n" +
" ts TIMESTAMP(3)\n" +
") WITH (\n" +
" 'connector' = 'mysql-cdc',\n" +
" 'hostname' = '192.168.129.102',\n" +
" 'port' = '3306',\n" +
" 'username' = 'root',\n" +
" 'password' = '123456',\n" +
" 'server-time-zone' = 'Asia/Shanghai',\n" +
" 'database-name' = 'cdc',\n" +
" 'table-name' = 'users'\n" +
" )";
// Output target table
String sinkDDL =
"CREATE TABLE users_sink_es\n" +
"(\n" +
" id BIGINT PRIMARY KEY NOT ENFORCED,\n" +
" name STRING,\n" +
" birthday TIMESTAMP(3),\n" +
" ts TIMESTAMP(3)\n" +
") \n" +
"WITH (\n" +
" 'connector' = 'elasticsearch-6',\n" +
" 'hosts' = 'http://192.168.129.103:9200',\n" +
" 'index' = 'users'\n" +
", 'document-type' = 'doc'\n" +
")";
// Simple polymerization
String transformSQL = "INSERT INTO users_sink_es SELECT * FROM users";
tableEnv.executeSql(sourceDDL);
tableEnv.executeSql(sinkDDL);
TableResult result = tableEnv.executeSql(transformSQL);
result.print();
env.execute("mysql-to-es");
}
}
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>flink_es_test</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<hadoop.version>2.6.0</hadoop.version>
<flink.version>1.12.2</flink.version>
<hudi.version>0.9.0</hudi.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.12</artifactId>
<version>1.10.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-flink-bundle_2.11</artifactId>
<version>${hudi.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.49</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.6</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
<!-- <scope>test</scope>-->
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
<!-- <scope>test</scope>-->
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.2</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>4.4.5</version>
</dependency>
</dependencies>
</project>
es Operation command
# establish index
curl -X PUT "192.168.129.103:9200/users" -H 'Content-Type: application/json' -d' { "settings" : { "number_of_shards" : 3, "number_of_replicas" : 2 } }'
# Inquire about index Lower all data
curl "http://192.168.129.103:9200/users/doc/_search"
# Delete index
curl -X DELETE "192.168.129.103:9200/users"
边栏推荐
- Ros2 summer school 2022 transfer-
- Tidb monitoring upgrade: a long way to solve panic
- Es5 object extension methods //call, apply and bind
- 你踩过这些坑吗?谨慎在时间类型列上创建索引
- 62. different paths
- How to set the power-off auto start of easycvr hardware box
- 一文读懂基于Redis的Amazon MemoryDB数据库
- Hierarchy selector
- 【UVM】别再说你的 VIP 用不了 RAL Model
- SwiftUI Swift 教程之 14 个有用的数组运算符
猜你喜欢

cadence SPB17.4 - allegro - 优化指定单条电气线折线连接角度 - 折线转圆弧

SAP UI5 应用开发教程之一百零二 - SAP UI5 应用的打印(Print)功能实现详解

SAP UI5 应用开发教程之一百零三 - 如何在 SAP UI5 应用中消费第三方库试读版
因为我说:volatile 是轻量级的 synchronized,面试官让我回去等通知!

SAP UI5 应用开发教程之一百零三 - 如何在 SAP UI5 应用中消费第三方库

Database daily question - day 20: selling products by date

How to calculate the position of gold ETF

详解openGauss多线程架构启动过程

#yyds干货盘点# 解决剑指offer:把二叉树打印成多行

Typecho仿盧松松博客主題模板/科技資訊博客主題模板
随机推荐
Ansible learning summary (8) -- Summary of ansible control right raising related knowledge
Which brokerage platform is better and safer for a brokerage to open an account on a mobile phone? What if you need a low commission
Yyds dry inventory solution sword finger offer: print the binary tree into multiple lines
數據庫中數據的儲存結構和方式是什麼?
Because I said: volatile is a lightweight synchronized, the interviewer asked me to go back and wait for the notice!
OpenCvSharp (C# OpenCV) 微信QRCode解码功能使用介绍(附源码)
層次選擇器
Ansible learning summary (7) -- ansible state management related knowledge summary
SAP ui5 application development tutorial 102 - detailed explanation of the print function of SAP ui5 applications
手机上券商开户哪个券商平台更好更安全,如果需要佣金低的怎么办
SwiftUI Swift 教程之 14 个有用的数组运算符
How to solve the problem that easycvr does not display the interface when RTMP streaming is used?
How to get started with machine learning?
TIDB监控升级解决panic的漫漫探索之路
How to calculate the position of gold ETF
华为云如何实现实时音视频全球低时延网络架构【上】
Swiftui swift tutorial 14 useful array operators
Mysql8.0 easily completes gtid master-slave replication
OLAP - Druid introduction
详解openGauss多线程架构启动过程