当前位置:网站首页>Flink从入门到真香(7、Sink数据输出-文件)
Flink从入门到真香(7、Sink数据输出-文件)
2020-11-08 12:06:00 【osc_u9mt0sus】
Source 是 Flink 程序的输入,Sink 就是 Flink 程序处理完Source后数据的输出,比如将输出写到文件、sockets、外部系统、或者仅仅是显示(在大数据生态中,很多类似的,比如Flume里也是对应的Source/Channel/Sink),Flink 提供了多种数据输出方式
跟在代码中直接写不同(比如可以在RickMap中open、close、map中直接写)他可以保存一些状态,容错重试机制等等
package com.mafei.sinktest
import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
case class SensorReadingTest3(id: String,timestamp: Long, temperature: Double)
object FileSink {
def main(args: Array[String]): Unit = {
//创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
val inputStream= env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt")
env.setParallelism(1)
//先转换成样例类类型
val dataStream = inputStream
.map(data =>{
val arr = data.split(",") //按照,分割数据,获取结果
SensorReadingTest3(arr(0), arr(1).toLong,arr(2).toDouble) //生成一个传感器类的数据,参数中传toLong和toDouble是因为默认分割后是字符串类别
})
dataStream.print()
//简单的输出到txt中的方法,已被flink弃用
// dataStream.writeAsText("/opt/java2020_study/maven/flink1/src/main/resources/sink.txt")
//新的输出方式-推荐
dataStream.addSink(
StreamingFileSink.forRowFormat(
new Path("/opt/java2020_study/maven/flink1/src/main/resources/sink2.txt"),
new SimpleStringEncoder[SensorReadingTest3]() //可以在括号中传入编码,默认是udf-8
).build()
)
env.execute("udf test")
}
}
代码结构及最终输出效果:

版权声明
本文为[osc_u9mt0sus]所创,转载请带上原文链接,感谢
https://my.oschina.net/u/4365833/blog/4708100
边栏推荐
- Adobe media encoder /Me 2021软件安装包(附安装教程)
- python基本语法 变量
- Xamarin 从零开始部署 iOS 上的 Walterlv.CloudKeyboard 应用
- Tidb performance competition 11.02-11.06
- 你搞不懂与别人的差距,永远成不了架构师!月薪15K和月薪65K,你差在那了?
- Top 5 Chinese cloud manufacturers in 2018: Alibaba cloud, Tencent cloud, AWS, telecom, Unicom
- Bccoin tells you: what is the most reliable investment project at the end of the year!
- Top 5 Chinese cloud manufacturers in 2018: Alibaba cloud, Tencent cloud, AWS, telecom, Unicom
- The young generation of winner's programming life, the starting point of changing the world is hidden around
- 2018中国云厂商TOP5:阿里云、腾讯云、AWS、电信、联通 ...
猜你喜欢

Enabling education innovation and reconstruction with science and technology Huawei implements education informatization

TiDB 性能竞赛 11.02-11.06
![[data structure Python description] use hash table to manually implement a dictionary class based on Python interpreter](/img/3b/00bc81122d330c9d59909994e61027.jpg)
[data structure Python description] use hash table to manually implement a dictionary class based on Python interpreter

Xamarin 从零开始部署 iOS 上的 Walterlv.CloudKeyboard 应用

Major changes in Huawei's cloud: Cloud & AI rises to Huawei's fourth largest BG with full fire

Written interview questions: find the smallest positive integer missing

Close to the double 11, he made up for two months and successfully took the offer from a large factory and transferred to Alibaba

laravel8更新之速率限制改进

Flink's sink: a preliminary study

2018中国云厂商TOP5:阿里云、腾讯云、AWS、电信、联通 ...
随机推荐
Ali! Visual computing developer's series of manuals (with internet disk link)
用科技赋能教育创新与重构 华为将教育信息化落到实处
Or talk No.19 | Facebook Dr. Tian Yuandong: black box optimization of hidden action set based on Monte Carlo tree search
Rust : 性能测试criterion库
不多不少,大学里必做的五件事(从我的大一说起)
Introduction to mongodb foundation of distributed document storage database
软件测试培训班出来好找工作么
C语言I博客作业03
OR Talk NO.19 | Facebook田渊栋博士:基于蒙特卡洛树搜索的隐动作集黑盒优化 - 知乎
Oops, the system is under attack again
为什么 Schnorr 签名被誉为比特币 Segwit 后的最大技术更新
阿里教你深入浅出玩转物联网平台!(附网盘链接)
这次,快手终于比抖音'快'了!
Written interview topic: looking for the lost pig
2018中国云厂商TOP5:阿里云、腾讯云、AWS、电信、联通 ...
Istio流量管理--Ingress Gateway
Xamarin 从零开始部署 iOS 上的 Walterlv.CloudKeyboard 应用
Bccoin tells you: what is the most reliable investment project at the end of the year!
The container with the most water
What can your cloud server do? What is the purpose of cloud server?