当前位置:网站首页>Flink: from introduction to Zhenxiang (3. Reading data from collection and file)
Flink: from introduction to Zhenxiang (3. Reading data from collection and file)
2020-11-08 12:06:00 【open_neocf7df】
You can refer to : https://blog.51cto.com/mapengfei/2546985
Reading data from a collection
New package ,com.mafei.apitest, Create a new one scala Object class ,
package com.mafei.apitest
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
// Get sensor data
case class SensorReading(id: String,timestamp: Long, temperature: Double)
object SourceTest {
def main(args: Array[String]): Unit = {
// Create an execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 1、 Reading data from a collection
val dataList = List(
SensorReading("sensor1",1603766281,41),
SensorReading("sensor2",1603766282,42),
SensorReading("sensor3",1603766283,43),
SensorReading("sensor4",1603766284,44)
)
val stream1 = env.fromCollection(dataList)
stream1.print()
// perform
env.execute(" source test")
}
}
Code catalog diagram :
Running effect
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
2> SensorReading(sensor3,1603766283,43.0)
4> SensorReading(sensor1,1603766281,41.0)
3> SensorReading(sensor4,1603766284,44.0)
1> SensorReading(sensor2,1603766282,42.0)
Read data from file
Just like the first step , New package ,com.mafei.apitest, Create a new one scala Object class ,
package com.mafei.apitest
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
// Get sensor data
case class SensorReading(id: String,timestamp: Long, temperature: Double)
object SourceTest {
def main(args: Array[String]): Unit = {
// Create an execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Reading data from a file
val stream2= env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt")
stream2.print()
// perform
env.execute(" source test")
}
}
stay resources New under the directory sensor.txt, Write the following
sensor1,1603766281,41
sensor2,1603766282,42
sensor3,1603766283,43
sensor4,1603766284,44
Code structure diagram :
Code running effect :
1> sensor1,1603766281,41
1> sensor2,1603766282,42
2> sensor3,1603766283,43
3> sensor4,1603766284,44
版权声明
本文为[open_neocf7df]所创,转载请带上原文链接,感谢
边栏推荐
- Top 5 Chinese cloud manufacturers in 2018: Alibaba cloud, Tencent cloud, AWS, telecom, Unicom
- 2018中国云厂商TOP5:阿里云、腾讯云、AWS、电信、联通 ...
- Flink的sink实战之一:初探
- Windows10关机问题----只有“睡眠”、“更新并重启”、“更新并关机”,但是又不想更新,解决办法
- How TCP protocol ensures reliable transmission
- PDMS cutting software
- Introduction to mongodb foundation of distributed document storage database
- Flink's sink: a preliminary study
- TiDB 性能竞赛 11.02-11.06
- Ubuntu20.04 access FTP server garbled problem + upload files
猜你喜欢

一文剖析2020年最火十大物联网应用|IoT Analytics 年度重磅报告出炉!

个人目前技术栈

Bohai bank million level fines continue: Li Volta said that the governance is perfect, the growth rate is declining

Written interview questions: find the smallest positive integer missing

新的目标市场在哪里?锚定的产品是什么?| 十问2021中国企业服务

C language I blog assignment 03

Windows10关机问题----只有“睡眠”、“更新并重启”、“更新并关机”,但是又不想更新,解决办法

Hematemesis! Alibaba Android Development Manual! (Internet disk link attached)

Mozi college SQL injection solution

2天,利用下班后的4小时开发一个测试工具
随机推荐
What can your cloud server do? What is the purpose of cloud server?
python小工具:编码转换
11 server monitoring tools commonly used by operation and maintenance personnel
Top 5 Chinese cloud manufacturers in 2018: Alibaba cloud, Tencent cloud, AWS, telecom, Unicom
漫画|讲解一下如何写简历&项目
The progress bar written in Python is so wonderful~
渤海银行百万级罚单不断:李伏安却称治理完善,增速呈下滑趋势
运维人员常用到的 11 款服务器监控工具
Flink's sink: a preliminary study
阿里撕下电商标签
Powershell 使用.Net对象发送邮件
原创 | 数据资产确权浅议
Python basic syntax variables
Why is Schnorr Signature known as the biggest technology update after bitcoin segwit
Xamarin deploys IOS from scratch Walterlv.CloudKeyboard application
Analysis of istio access control
阿里教你深入浅出玩转物联网平台!(附网盘链接)
Iqkeyboardmanager source code to see
Python basic syntax
新的目标市场在哪里?锚定的产品是什么?| 十问2021中国企业服务