当前位置:网站首页>Flink从入门到真香(7、Sink数据输出-文件)
Flink从入门到真香(7、Sink数据输出-文件)
2020-11-08 12:06:00 【osc_u9mt0sus】
Source 是 Flink 程序的输入,Sink 就是 Flink 程序处理完Source后数据的输出,比如将输出写到文件、sockets、外部系统、或者仅仅是显示(在大数据生态中,很多类似的,比如Flume里也是对应的Source/Channel/Sink),Flink 提供了多种数据输出方式
跟在代码中直接写不同(比如可以在RickMap中open、close、map中直接写)他可以保存一些状态,容错重试机制等等
package com.mafei.sinktest
import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
case class SensorReadingTest3(id: String,timestamp: Long, temperature: Double)
object FileSink {
def main(args: Array[String]): Unit = {
//创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
val inputStream= env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt")
env.setParallelism(1)
//先转换成样例类类型
val dataStream = inputStream
.map(data =>{
val arr = data.split(",") //按照,分割数据,获取结果
SensorReadingTest3(arr(0), arr(1).toLong,arr(2).toDouble) //生成一个传感器类的数据,参数中传toLong和toDouble是因为默认分割后是字符串类别
})
dataStream.print()
//简单的输出到txt中的方法,已被flink弃用
// dataStream.writeAsText("/opt/java2020_study/maven/flink1/src/main/resources/sink.txt")
//新的输出方式-推荐
dataStream.addSink(
StreamingFileSink.forRowFormat(
new Path("/opt/java2020_study/maven/flink1/src/main/resources/sink2.txt"),
new SimpleStringEncoder[SensorReadingTest3]() //可以在括号中传入编码,默认是udf-8
).build()
)
env.execute("udf test")
}
}
代码结构及最终输出效果:

版权声明
本文为[osc_u9mt0sus]所创,转载请带上原文链接,感谢
https://my.oschina.net/u/4365833/blog/4708100
边栏推荐
- 为什么 Schnorr 签名被誉为比特币 Segwit 后的最大技术更新
- C语言I博客作业03
- Introduction to mongodb foundation of distributed document storage database
- 11 server monitoring tools commonly used by operation and maintenance personnel
- Xamarin 从零开始部署 iOS 上的 Walterlv.CloudKeyboard 应用
- 2天,利用下班后的4小时开发一个测试工具
- Or talk No.19 | Facebook Dr. Tian Yuandong: black box optimization of hidden action set based on Monte Carlo tree search
- It's worth seeing! EMR elastic low cost offline big data analysis best practice (with network disk link)
- Get PMP certificate at 51CTO College
- 分布式文档存储数据库之MongoDB基础入门
猜你喜欢
随机推荐
阿里撕下电商标签
Is software testing training class easy to find a job
Adobe Lightroom / LR 2021 software installation package (with installation tutorial)
笔试面试题目:盛水最多的容器
笔试面试题目:求丢失的猪
Why is Schnorr Signature known as the biggest technology update after bitcoin segwit
C language I blog assignment 03
一文剖析2020年最火十大物联网应用|IoT Analytics 年度重磅报告出炉!
Share the experience of passing the PMP examination
供货紧张!苹果被曝 iPhone 12 电源芯片产能不足
阿里教你深入浅出玩转物联网平台!(附网盘链接)
Ali! Visual computing developer's series of manuals (with internet disk link)
Analysis of istio access control
用 Python 写出来的进度条,竟如此美妙~
Bccoin tells you: what is the most reliable investment project at the end of the year!
Powershell 使用.Net对象发送邮件
YGC troubleshooting, let me rise again!
2018中国云厂商TOP5:阿里云、腾讯云、AWS、电信、联通 ...
2018中国云厂商TOP5:阿里云、腾讯云、AWS、电信、联通 ...
Flink's sink: a preliminary study



