当前位置:网站首页>Flink从入门到真香(10、Sink数据输出-Elasticsearch)
Flink从入门到真香(10、Sink数据输出-Elasticsearch)
2020-11-08 12:06:00 【osc_lqb3vmrs】
目标: 从txt文件中读取数据,写入es,我这里用的es7.9,如果用的es7之前的版本下面代码中有个.type("_doc") 类别需要设置
如果没有es和kibana(可选)环境可以先安装
安装es7
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.9.3-x86_64.rpm
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.9.3-x86_64.rpm.sha512
shasum -a 512 -c elasticsearch-7.9.3-x86_64.rpm.sha512
sudo rpm --install elasticsearch-7.9.3-x86_64.rpm
systemctl restart elasticsearch
安装kibana (可选,如果不想界面操作就可以不用装)
wget https://artifacts.elastic.co/downloads/kibana/kibana-7.9.3-x86_64.rpm
sudo rpm --install kibana-7.9.3-x86_64.rpm
systemctl start kibana
先引入Elasticsearch的pom依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_2.12</artifactId>
<version>1.10.1</version>
</dependency>
新建一个ElasticsearchSinkTest.scala
package com.mafei.sinktest
import java.util
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink
import org.apache.http.HttpHost
import org.elasticsearch.client.Requests
object ElasticsearchSinkTest {
def main(args: Array[String]): Unit = {
//创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
val inputStream = env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt")
env.setParallelism(1)
inputStream.print()
//先转换成样例类类型
val dataStream = inputStream
.map(data => {
val arr = data.split(",") //按照,分割数据,获取结果
SensorReadingTest5(arr(0), arr(1).toLong, arr(2).toDouble) //生成一个传感器类的数据,参数中传toLong和toDouble是因为默认分割后是字符串类别
})
//定义es的连接信息
val httpHosts = new util.ArrayList[HttpHost]()
httpHosts.add(new HttpHost("127.0.0.1", 9200))
//自定义写入es的ElasticsearchSinkFunction
val myEsSinkFunc = new ElasticsearchSinkFunction[SensorReadingTest5] {
override def process(t: SensorReadingTest5, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
//定义一个map作为 数据源
val dataSource = new util.HashMap[String, String]()
dataSource.put("id", t.id)
dataSource.put("temperature", t.temperature.toString)
dataSource.put("ts", t.timestamp.toString)
//创建index request ,指定index
val indexRequest = Requests.indexRequest()
indexRequest.index("sensors") //指定写入哪一个索引
.source(dataSource) //指定写入的数据
// .type("_doc") //我这里用的es7已经不需要这个参数了
//执行新增操作
requestIndexer.add(indexRequest)
}
}
dataStream.addSink(new ElasticsearchSink.Builder[SensorReadingTest5](httpHosts, myEsSinkFunc)
.build()
)
env.execute()
}
}
代码结构:
到服务器上查看数据,sensor就是我们刚塞进去的数据
查看所有索引数据
[root@localhost ~]# curl http://127.0.0.1:9200/_cat/indices
green open .kibana-event-log-7.9.3-000001 NvnP2SI9Q_i-z5bNvsgWhA 1 0 1 0 5.5kb 5.5kb
yellow open sensors PGTeT0MZRJ-4hmYkDQnqIw 1 1 6 0 5.4kb 5.4kb
green open .apm-custom-link IdxoOaP9Sh6ssBd0Q9kPsw 1 0 0 0 208b 208b
green open .kibana_task_manager_1 -qAi_8LmTc2eJsWUQwugtw 1 0 6 3195 434.2kb 434.2kb
green open .apm-agent-configuration FG9PE8CARdyKWrdsAg4gbA 1 0 0 0 208b 208b
green open .kibana_1 uVmly8KaQ5uIXZ-IkArnVg 1 0 18 4 10.4mb 10.4m
查看塞进去的数据
[root@localhost ~]# curl http://127.0.0.1:9200/sensors/_search
{"took":0,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":6,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"sensors","_type":"_doc","_id":"h67gkHUBr1E85RDXoNXP","_score":1.0,"_source":{"temperature":"41.0","id":"sensor1","ts":"1603766281"}},{"_index":"sensors","_type":"_doc","_id":"iK7gkHUBr1E85RDXoNXP","_score":1.0,"_source":{"temperature":"42.0","id":"sensor2","ts":"1603766282"}},{"_index":"sensors","_type":"_doc","_id":"ia7gkHUBr1E85RDXoNXP","_score":1.0,"_source":{"temperature":"43.0","id":"sensor3","ts":"1603766283"}},{"_index":"sensors","_type":"_doc","_id":"iq7gkHUBr1E85RDXoNXP","_score":1.0,"_source":{"temperature":"40.1","id":"sensor4","ts":"1603766240"}},{"_index":"sensors","_type":"_doc","_id":"i67gkHUBr1E85RDXoNXP","_score":1.0,"_source":{"temperature":"20.0","id":"sensor4","ts":"1603766284"}},{"_index":"sensors","_type":"_doc","_id":"jK7gkHUBr1E85RDXoNXP","_score":1.0,"_source":{"temperature":"40.2","id":"sensor4","ts":"1603766249"}}]}}
版权声明
本文为[osc_lqb3vmrs]所创,转载请带上原文链接,感谢
https://my.oschina.net/u/4338498/blog/4708102
边栏推荐
- 软件测试培训班出来好找工作么
- OR Talk NO.19 | Facebook田渊栋博士:基于蒙特卡洛树搜索的隐动作集黑盒优化 - 知乎
- Can you do it with only six characters?
- python基础教程python opencv pytesseract 验证码识别的实现
- Don't look! Full interpretation of Alibaba cloud's original data lake system! (Internet disk link attached)
- 用科技赋能教育创新与重构 华为将教育信息化落到实处
- Python Gadgets: code conversion
- Hematemesis! Alibaba Android Development Manual! (Internet disk link attached)
- “1024”征文活动结果新鲜出炉!快来看看是否榜上有名?~~
- Top 5 Chinese cloud manufacturers in 2018: Alibaba cloud, Tencent cloud, AWS, telecom, Unicom
猜你喜欢

Ali teaches you how to use the Internet of things platform! (Internet disk link attached)

YGC问题排查,又让我涨姿势了!

Second assignment

Share the experience of passing the PMP examination

211考研失败后,熬夜了两个月拿下字节offer!【面经分享】

吐血整理!阿里巴巴 Android 开发手册!(附网盘链接)
![[computer network] learning notes, Part 3: data link layer (Xie Xiren version)](/img/b0/b236a52e38f1cd3eff25a398dac7aa.jpg)
[computer network] learning notes, Part 3: data link layer (Xie Xiren version)

原创 | 数据资产确权浅议

Entry level! Teach you how to develop small programs without asking for help (with internet disk link)

How to write a resume and project
随机推荐
TCP协议如何确保可靠传输
阿里教你深入浅出玩转物联网平台!(附网盘链接)
We interviewed the product manager of SQL server of Alibaba cloud database, and he said that it is enough to understand these four problems
Hematemesis! Alibaba Android Development Manual! (Internet disk link attached)
Harbor项目高手问答及赠书活动
阿里撕下电商标签
最全!阿里巴巴经济体云原生实践!(附网盘链接)
第二次作业
It's worth seeing! EMR elastic low cost offline big data analysis best practice (with network disk link)
C language I blog assignment 03
2018中国云厂商TOP5:阿里云、腾讯云、AWS、电信、联通 ...
Rust : 性能测试criterion库
A scheme to improve the memory utilization of flutter
Tight supply! Apple's iPhone 12 power chip capacity exposed
PDMS cutting software
Top 5 Chinese cloud manufacturers in 2018: Alibaba cloud, Tencent cloud, AWS, telecom, Unicom
Win10 Terminal + WSL 2 安装配置指南,精致开发体验
Understanding design patterns
入门级!教你小程序开发不求人(附网盘链接)
Improvement of rate limit for laravel8 update