当前位置:网站首页>Flink从入门到真香(7、Sink数据输出-文件)
Flink从入门到真香(7、Sink数据输出-文件)
2020-11-08 12:06:00 【osc_u9mt0sus】
Source 是 Flink 程序的输入,Sink 就是 Flink 程序处理完Source后数据的输出,比如将输出写到文件、sockets、外部系统、或者仅仅是显示(在大数据生态中,很多类似的,比如Flume里也是对应的Source/Channel/Sink),Flink 提供了多种数据输出方式
跟在代码中直接写不同(比如可以在RickMap中open、close、map中直接写)他可以保存一些状态,容错重试机制等等
package com.mafei.sinktest
import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
case class SensorReadingTest3(id: String,timestamp: Long, temperature: Double)
object FileSink {
def main(args: Array[String]): Unit = {
//创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
val inputStream= env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt")
env.setParallelism(1)
//先转换成样例类类型
val dataStream = inputStream
.map(data =>{
val arr = data.split(",") //按照,分割数据,获取结果
SensorReadingTest3(arr(0), arr(1).toLong,arr(2).toDouble) //生成一个传感器类的数据,参数中传toLong和toDouble是因为默认分割后是字符串类别
})
dataStream.print()
//简单的输出到txt中的方法,已被flink弃用
// dataStream.writeAsText("/opt/java2020_study/maven/flink1/src/main/resources/sink.txt")
//新的输出方式-推荐
dataStream.addSink(
StreamingFileSink.forRowFormat(
new Path("/opt/java2020_study/maven/flink1/src/main/resources/sink2.txt"),
new SimpleStringEncoder[SensorReadingTest3]() //可以在括号中传入编码,默认是udf-8
).build()
)
env.execute("udf test")
}
}
代码结构及最终输出效果:
版权声明
本文为[osc_u9mt0sus]所创,转载请带上原文链接,感谢
https://my.oschina.net/u/4365833/blog/4708100
边栏推荐
- Top 5 Chinese cloud manufacturers in 2018: Alibaba cloud, Tencent cloud, AWS, telecom, Unicom
- 阿里出品!视觉计算开发者系列手册(附网盘链接)
- 值得一看!EMR弹性低成本离线大数据分析最佳实践(附网盘链接)
- 在51CTO学院Get到PMP证书
- 如何将 PyTorch Lightning 模型部署到生产中
- Personal current technology stack
- next.js实现服务端缓存
- [computer network] learning notes, Part 3: data link layer (Xie Xiren version)
- C language I blog assignment 03
- Xamarin deploys IOS from scratch Walterlv.CloudKeyboard application
猜你喜欢
PMP心得分享
吐血整理!阿里巴巴 Android 开发手册!(附网盘链接)
Python基础语法
How to write a resume and project
Entry level! Teach you how to develop small programs without asking for help (with internet disk link)
Hematemesis! Alibaba Android Development Manual! (Internet disk link attached)
PCIe enumeration process
Introduction to mongodb foundation of distributed document storage database
Or talk No.19 | Facebook Dr. Tian Yuandong: black box optimization of hidden action set based on Monte Carlo tree search
你的云服务器可以用来做什么?云服务器有什么用途?
随机推荐
Adobe media encoder / me 2021 software installation package (with installation tutorial)
Written interview questions: find the smallest positive integer missing
2018中国云厂商TOP5:阿里云、腾讯云、AWS、电信、联通 ...
吐血整理!阿里巴巴 Android 开发手册!(附网盘链接)
漫画|讲解一下如何写简历&项目
PCIe enumeration process
Get PMP certificate at 51CTO College
分布式文档存储数据库之MongoDB基础入门
我们采访了阿里云云数据库SQL Server的产品经理,他说了解这四个问题就可以了...
Analysis of ArrayList source code
When kubernetes encounters confidential computing, see how Alibaba protects the data in the container! (Internet disk link attached)
值得一看!EMR弹性低成本离线大数据分析最佳实践(附网盘链接)
2018中国云厂商TOP5:阿里云、腾讯云、AWS、电信、联通 ...
Top 5 Chinese cloud manufacturers in 2018: Alibaba cloud, Tencent cloud, AWS, telecom, Unicom
WLAN 直连(对等连接或 P2P)调研及iOS跨平台调研
[computer network] learning notes, Part 3: data link layer (Xie Xiren version)
入门级!教你小程序开发不求人(附网盘链接)
分布式文档存储数据库之MongoDB基础入门
【计算机网络】学习笔记,第三篇:数据链路层(谢希仁版)
[data structure Python description] use hash table to manually implement a dictionary class based on Python interpreter