当前位置:网站首页>Flink synchronizes MySQL data to es

Flink synchronizes MySQL data to es

2022-06-23 00:44:00 shy_ snow

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/* https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/elasticsearch/ */
public class MysqlSinkToES {
    
    public static void main(String[] args) throws Exception {
    
        EnvironmentSettings fsSettings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);

        tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);

        //  Data source table 
        String sourceDDL =
                "CREATE TABLE users (\n" +
                        " id BIGINT PRIMARY KEY NOT ENFORCED ,\n" +
                        " name STRING,\n" +
                        " birthday TIMESTAMP(3),\n" +
                        " ts TIMESTAMP(3)\n" +
                        ") WITH (\n" +
                        " 'connector' = 'mysql-cdc',\n" +
                        " 'hostname' = '192.168.129.102',\n" +
                        " 'port' = '3306',\n" +
                        " 'username' = 'root',\n" +
                        " 'password' = '123456',\n" +
                        " 'server-time-zone' = 'Asia/Shanghai',\n" +
                        " 'database-name' = 'cdc',\n" +
                        " 'table-name' = 'users'\n" +
                        " )";
        //  Output target table 
        String sinkDDL =
                "CREATE TABLE users_sink_es\n" +
                        "(\n" +
                        " id BIGINT PRIMARY KEY NOT ENFORCED,\n" +
                        " name STRING,\n" +
                        " birthday TIMESTAMP(3),\n" +
                        " ts TIMESTAMP(3)\n" +
                        ") \n" +
                        "WITH (\n" +
                        " 'connector' = 'elasticsearch-6',\n" +
                        " 'hosts' = 'http://192.168.129.103:9200',\n" +
                        " 'index' = 'users'\n" +
                        ", 'document-type' = 'doc'\n" +
                        ")";
        //  Simple polymerization 
        String transformSQL = "INSERT INTO users_sink_es SELECT * FROM users";

        tableEnv.executeSql(sourceDDL);
        tableEnv.executeSql(sinkDDL);
        TableResult result = tableEnv.executeSql(transformSQL);
        result.print();

        env.execute("mysql-to-es");
    }
}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>org.example</groupId>
    <artifactId>flink_es_test</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <hadoop.version>2.6.0</hadoop.version>
        <flink.version>1.12.2</flink.version>
        <hudi.version>0.9.0</hudi.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
                <dependency>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-jdbc_2.12</artifactId>
                    <version>1.10.3</version>
                </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.11</artifactId>
            <version>${flink.version}</version>
            <type>test-jar</type>
        </dependency>

        <dependency>
            <groupId>com.alibaba.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>1.2.0</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>1.2.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hudi</groupId>
            <artifactId>hudi-flink-bundle_2.11</artifactId>
            <version>${hudi.version}</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.49</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.8.6</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.25</version>
<!-- <scope>test</scope>-->
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.25</version>
<!-- <scope>test</scope>-->
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>

        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.5.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpcore</artifactId>
            <version>4.4.5</version>
        </dependency>
    </dependencies>
</project>

es Operation command

# establish index
curl -X PUT "192.168.129.103:9200/users" -H 'Content-Type: application/json' -d' { "settings" : { "number_of_shards" : 3, "number_of_replicas" : 2 } }'

# Inquire about index Lower all data  
curl "http://192.168.129.103:9200/users/doc/_search"
# Delete index
curl -X DELETE "192.168.129.103:9200/users"
原网站

版权声明
本文为[shy_ snow]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/173/202206222050014021.html