当前位置:网站首页>Flink从入门到真香(10、Sink数据输出-Elasticsearch)
Flink从入门到真香(10、Sink数据输出-Elasticsearch)
2020-11-08 12:06:00 【osc_lqb3vmrs】
目标: 从txt文件中读取数据,写入es,我这里用的es7.9,如果用的es7之前的版本下面代码中有个.type("_doc") 类别需要设置
如果没有es和kibana(可选)环境可以先安装
安装es7
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.9.3-x86_64.rpm
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.9.3-x86_64.rpm.sha512
shasum -a 512 -c elasticsearch-7.9.3-x86_64.rpm.sha512
sudo rpm --install elasticsearch-7.9.3-x86_64.rpm
systemctl restart elasticsearch
安装kibana (可选,如果不想界面操作就可以不用装)
wget https://artifacts.elastic.co/downloads/kibana/kibana-7.9.3-x86_64.rpm
sudo rpm --install kibana-7.9.3-x86_64.rpm
systemctl start kibana
先引入Elasticsearch的pom依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_2.12</artifactId>
<version>1.10.1</version>
</dependency>
新建一个ElasticsearchSinkTest.scala
package com.mafei.sinktest
import java.util
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink
import org.apache.http.HttpHost
import org.elasticsearch.client.Requests
object ElasticsearchSinkTest {
def main(args: Array[String]): Unit = {
//创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
val inputStream = env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt")
env.setParallelism(1)
inputStream.print()
//先转换成样例类类型
val dataStream = inputStream
.map(data => {
val arr = data.split(",") //按照,分割数据,获取结果
SensorReadingTest5(arr(0), arr(1).toLong, arr(2).toDouble) //生成一个传感器类的数据,参数中传toLong和toDouble是因为默认分割后是字符串类别
})
//定义es的连接信息
val httpHosts = new util.ArrayList[HttpHost]()
httpHosts.add(new HttpHost("127.0.0.1", 9200))
//自定义写入es的ElasticsearchSinkFunction
val myEsSinkFunc = new ElasticsearchSinkFunction[SensorReadingTest5] {
override def process(t: SensorReadingTest5, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
//定义一个map作为 数据源
val dataSource = new util.HashMap[String, String]()
dataSource.put("id", t.id)
dataSource.put("temperature", t.temperature.toString)
dataSource.put("ts", t.timestamp.toString)
//创建index request ,指定index
val indexRequest = Requests.indexRequest()
indexRequest.index("sensors") //指定写入哪一个索引
.source(dataSource) //指定写入的数据
// .type("_doc") //我这里用的es7已经不需要这个参数了
//执行新增操作
requestIndexer.add(indexRequest)
}
}
dataStream.addSink(new ElasticsearchSink.Builder[SensorReadingTest5](httpHosts, myEsSinkFunc)
.build()
)
env.execute()
}
}
代码结构:
到服务器上查看数据,sensor就是我们刚塞进去的数据
查看所有索引数据
[root@localhost ~]# curl http://127.0.0.1:9200/_cat/indices
green open .kibana-event-log-7.9.3-000001 NvnP2SI9Q_i-z5bNvsgWhA 1 0 1 0 5.5kb 5.5kb
yellow open sensors PGTeT0MZRJ-4hmYkDQnqIw 1 1 6 0 5.4kb 5.4kb
green open .apm-custom-link IdxoOaP9Sh6ssBd0Q9kPsw 1 0 0 0 208b 208b
green open .kibana_task_manager_1 -qAi_8LmTc2eJsWUQwugtw 1 0 6 3195 434.2kb 434.2kb
green open .apm-agent-configuration FG9PE8CARdyKWrdsAg4gbA 1 0 0 0 208b 208b
green open .kibana_1 uVmly8KaQ5uIXZ-IkArnVg 1 0 18 4 10.4mb 10.4m
查看塞进去的数据
[root@localhost ~]# curl http://127.0.0.1:9200/sensors/_search
{"took":0,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":6,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"sensors","_type":"_doc","_id":"h67gkHUBr1E85RDXoNXP","_score":1.0,"_source":{"temperature":"41.0","id":"sensor1","ts":"1603766281"}},{"_index":"sensors","_type":"_doc","_id":"iK7gkHUBr1E85RDXoNXP","_score":1.0,"_source":{"temperature":"42.0","id":"sensor2","ts":"1603766282"}},{"_index":"sensors","_type":"_doc","_id":"ia7gkHUBr1E85RDXoNXP","_score":1.0,"_source":{"temperature":"43.0","id":"sensor3","ts":"1603766283"}},{"_index":"sensors","_type":"_doc","_id":"iq7gkHUBr1E85RDXoNXP","_score":1.0,"_source":{"temperature":"40.1","id":"sensor4","ts":"1603766240"}},{"_index":"sensors","_type":"_doc","_id":"i67gkHUBr1E85RDXoNXP","_score":1.0,"_source":{"temperature":"20.0","id":"sensor4","ts":"1603766284"}},{"_index":"sensors","_type":"_doc","_id":"jK7gkHUBr1E85RDXoNXP","_score":1.0,"_source":{"temperature":"40.2","id":"sensor4","ts":"1603766249"}}]}}
版权声明
本文为[osc_lqb3vmrs]所创,转载请带上原文链接,感谢
https://my.oschina.net/u/4338498/blog/4708102
边栏推荐
- Top 5 Chinese cloud manufacturers in 2018: Alibaba cloud, Tencent cloud, AWS, telecom, Unicom
- Where is the new target market? What is the anchored product? |Ten questions 2021 Chinese enterprise service
- VC++指定目录下文件按时间排序输出
- 2018中国云厂商TOP5:阿里云、腾讯云、AWS、电信、联通 ...
- BCCOIN告诉您:年底最靠谱的投资项目是什么!
- TiDB 性能竞赛 11.02-11.06
- Top 5 Chinese cloud manufacturers in 2018: Alibaba cloud, Tencent cloud, AWS, telecom, Unicom
- Deeplight Technology Bluetooth protocol SRRC certification services
- Game optimization performance (11) - Zhihu
- Why is Schnorr Signature known as the biggest technology update after bitcoin segwit
猜你喜欢

Improvement of rate limit for laravel8 update

2018中国云厂商TOP5:阿里云、腾讯云、AWS、电信、联通 ...

Automatically generate RSS feeds for docsify

值得一看!EMR弹性低成本离线大数据分析最佳实践(附网盘链接)

供货紧张!苹果被曝 iPhone 12 电源芯片产能不足

攻防世界之web新手题

Q & A and book giving activities of harbor project experts

Win10 Terminal + WSL 2 安装配置指南,精致开发体验

Dogs can also operate drones! You're right, but it's actually an autonomous drone - you know

Flink的sink实战之一:初探
随机推荐
入门级!教你小程序开发不求人(附网盘链接)
Installing MacOS 11 Big Sur in virtual machine
Deeplight Technology Bluetooth protocol SRRC certification services
Python基础语法
值得一看!EMR弹性低成本离线大数据分析最佳实践(附网盘链接)
运维人员常用到的 11 款服务器监控工具
PMP心得分享
Istio traffic management -- progress gateway
What can your cloud server do? What is the purpose of cloud server?
It's worth seeing! EMR elastic low cost offline big data analysis best practice (with network disk link)
一文剖析2020年最火十大物联网应用|IoT Analytics 年度重磅报告出炉!
阿里教你深入浅出玩转物联网平台!(附网盘链接)
Mozi college SQL injection solution
最全!阿里巴巴经济体云原生实践!(附网盘链接)
VC++指定目录下文件按时间排序输出
Top 5 Chinese cloud manufacturers in 2018: Alibaba cloud, Tencent cloud, AWS, telecom, Unicom
2天,利用下班后的4小时开发一个测试工具
蘑菇街电商交易平台服务架构及改造优化历程(含PPT)
python基本语法 变量
Research on WLAN direct connection (peer-to-peer connection or P2P) and cross platform research of IOS