当前位置:网站首页>Flink from introduction to Zhenxiang (7. Sink data output file)
Flink from introduction to Zhenxiang (7. Sink data output file)
2020-11-08 12:06:00 【osc_u9mt0sus】
Source yes Flink Program input ,Sink Namely Flink The program is finished Source After the data output , For example, writing output to a file 、sockets、 An external system 、 Or just show ( In the big data Ecology , A lot of similar , such as Flume It's also corresponding to Source/Channel/Sink),Flink Provides a variety of data output methods
It's not like writing directly in code ( For example, you can RickMap in open、close、map Direct writing ) He can save some state , Fault tolerant retrial mechanism and so on
package com.mafei.sinktest
import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
case class SensorReadingTest3(id: String,timestamp: Long, temperature: Double)
object FileSink {
def main(args: Array[String]): Unit = {
// Create an execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
val inputStream= env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt")
env.setParallelism(1)
// First convert to sample class type
val dataStream = inputStream
.map(data =>{
val arr = data.split(",") // according to , Split data , To get the results
SensorReadingTest3(arr(0), arr(1).toLong,arr(2).toDouble) // Generate data for a sensor class , Parameters are passed in the middle toLong and toDouble Because the default split is string category
})
dataStream.print()
// Simple output to txt The method in , Has been flink Abandoning
// dataStream.writeAsText("/opt/java2020_study/maven/flink1/src/main/resources/sink.txt")
// New output mode - recommend
dataStream.addSink(
StreamingFileSink.forRowFormat(
new Path("/opt/java2020_study/maven/flink1/src/main/resources/sink2.txt"),
new SimpleStringEncoder[SensorReadingTest3]() // You can pass in the encoding in parentheses , The default is udf-8
).build()
)
env.execute("udf test")
}
}
Code structure and final output effect :
版权声明
本文为[osc_u9mt0sus]所创,转载请带上原文链接,感谢
边栏推荐
- 原创 | 数据资产确权浅议
- Entry level! Teach you how to develop small programs without asking for help (with internet disk link)
- Implementation of verification code recognition in Python opencv pytesseract
- YGC troubleshooting, let me rise again!
- This paper analyzes the top ten Internet of things applications in 2020!
- The young generation of winner's programming life, the starting point of changing the world is hidden around
- 2018中国云厂商TOP5:阿里云、腾讯云、AWS、电信、联通 ...
- 渤海银行百万级罚单不断:李伏安却称治理完善,增速呈下滑趋势
- Don't look! Full interpretation of Alibaba cloud's original data lake system! (Internet disk link attached)
- Service architecture and transformation optimization process of e-commerce trading platform in mogujie (including ppt)
猜你喜欢
Win10 terminal + WSL 2 installation and configuration guide, exquisite development experience
Implementation of verification code recognition in Python opencv pytesseract
2018中国云厂商TOP5:阿里云、腾讯云、AWS、电信、联通 ...
阿里出品!视觉计算开发者系列手册(附网盘链接)
Mozi college SQL injection solution
为 Docsify 自动生成 RSS 订阅
华为云重大变革:Cloud&AI 升至华为第四大 BG ,火力全开
应届生年薪35w+ !倒挂老员工,互联网大厂薪资为何越来越高?
Flink从入门到真香(10、Sink数据输出-Elasticsearch)
笔试面试题目:求丢失的猪
随机推荐
IQKeyboardManager 源代码看看
Introduction to mongodb foundation of distributed document storage database
python基本语法 变量
Personal current technology stack
This time Kwai tiktok is faster than shaking.
Flink的sink实战之一:初探
Ali! Visual computing developer's series of manuals (with internet disk link)
Top 5 Chinese cloud manufacturers in 2018: Alibaba cloud, Tencent cloud, AWS, telecom, Unicom
YGC troubleshooting, let me rise again!
android基础-CheckBox(复选框)
Python basic syntax
2018中国云厂商TOP5:阿里云、腾讯云、AWS、电信、联通 ...
Can you do it with only six characters?
Iqkeyboardmanager source code to see
蘑菇街电商交易平台服务架构及改造优化历程(含PPT)
YGC问题排查,又让我涨姿势了!
Don't look! Full interpretation of Alibaba cloud's original data lake system! (Internet disk link attached)
用 Python 写出来的进度条,竟如此美妙~
A scheme to improve the memory utilization of flutter
笔试面试题目:求缺失的最小正整数