当前位置:网站首页>Flink从入门到真香(6、Flink实现UDF函数-实现更细粒度的控制流)
Flink从入门到真香(6、Flink实现UDF函数-实现更细粒度的控制流)
2020-11-08 12:06:00 【osc_15vyay19】
Flink提供了各种数据的转换操作,但实际业务过程中有很多业务上需要处理的数据结构、规则等等,需要自己写自己的业务代码,这时候就用到的flink提供的函数类(Function Class)
Flink暴露了所有udf函数的接口(实现方式为接口或者抽象类),例如MapFunction,FilterFunction,ProcessFunction等。
一个小栗子,要筛选数据中以sensor3为开头的数据
还是在com.mafei.apitest新建一个scala Object UDFTest1
其他代码跟之前一样,读取文件做些简单处理,这里增加了一个自定义的函数类MyFilterFunction,在使用时,只需要在逻辑处增加.filter方法即可,
package com.mafei.apitest
import org.apache.flink.api.common.functions.{FilterFunction, ReduceFunction, RichFilterFunction}
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
//获取传感器数据
case class SensorReadingTest1(id: String,timestamp: Long, temperature: Double)
object UdfTest1 {
def main(args: Array[String]): Unit = {
//创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
case class Person(name: String, age: Int)
val inputStream= env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt")
env.setParallelism(1)
// inputStream.print()
//先转换成样例类类型
val dataStream = inputStream
.map(data => {
val arr = data.split(",") //按照,分割数据,获取结果
SensorReadingTest1(arr(0), arr(1).toLong, arr(2).toDouble) //生成一个传感器类的数据,参数中传toLong和toDouble是因为默认分割后是字符串类别
// }).filter(new MyFilterFunction)
// }).filter(_.id.startsWith("sensor1")) //如果特别简单的逻辑,也可以匿名类直接这样子写,和写一个函数是一样的效果
// }).filter(new RichFilterFunction[SensorReadingTest1] {
// override def filter(t: SensorReadingTest1): Boolean =
// t.id.startsWith("sensor3")
// }) //匿名类的实现效果,和上面2种效果都是一样的
}).filter(new KeywordFilterFunction("sensor3")) //也可以把要过滤的参数传进去
dataStream.print()
env.execute("udf test")
}
}
//自定义一个函数类,做过滤,实现接口中的filter方法即可
class MyFilterFunction extends FilterFunction[SensorReadingTest1] {
override def filter(t: SensorReadingTest1): Boolean = t.id.startsWith("sensor3")
}
//自定义的函数类,和上面一样,增加了传参,
class KeywordFilterFunction(keyword: String) extends FilterFunction[SensorReadingTest1]{
override def filter(t: SensorReadingTest1): Boolean =
t.id.startsWith(keyword)
}
代码结构及运行效果图

RichMap
主要做一些数据处理等操作,代码演示了 MapperDemo和RichMapDemo的区别及运行效果
package com.mafei.apitest
import org.apache.flink.api.common.functions.{FilterFunction, MapFunction, RichMapFunction}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
//获取传感器数据
case class SensorReadingTest2(id: String,timestamp: Long, temperature: Double)
object UdfTest2 {
def main(args: Array[String]): Unit = {
//创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
case class Person(name: String, age: Int)
val inputStream= env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt")
env.setParallelism(1)
// inputStream.print()
//先转换成样例类类型
val dataStream = inputStream
.map(data => {
val arr = data.split(",") //按照,分割数据,获取结果
SensorReadingTest2(arr(0), arr(1).toLong, arr(2).toDouble) //生成一个传感器类的数据,参数中传toLong和toDouble是因为默认分割后是字符串类别
}).map(new RichMapDemo())
dataStream.print()
env.execute("udf test")
}
}
class MapperDemo extends MapFunction[SensorReadingTest2, String]{
override def map(t: SensorReadingTest2): String = t.id+"测试加一些字符串"
}
//富函数,比上面类多了open和close等方法,可以做些数据库连接等操作
class RichMapDemo extends RichMapFunction[SensorReadingTest2, String]{
//这里主要是一些初始化操作,启动调用时,整个过程只会调用一次,类似于类初始化加载的变量,像数据库连接等等
override def open(parameters: Configuration): Unit = {
println("进行了数据库连接。。。。。。。。。。")
//获取运行时上下文
getRuntimeContext()
}
//每条数据都会经过这个方法
override def map(in: SensorReadingTest2): String = in.id+"测试富函数加一些字符串"
override def close(): Unit = {
//跟open类似,当任务停止时会执行,可以做一些如释放数据库连接等等
print("关闭了数据库连接。。。。。。")
}
}
运行效果:可以看到,整个过程中,只有一次数据库连接操作
进行了数据库连接。。。。。。。。。。
sensor1测试富函数加一些字符串
sensor2测试富函数加一些字符串
sensor3测试富函数加一些字符串
sensor4测试富函数加一些字符串
sensor4测试富函数加一些字符串
sensor4测试富函数加一些字符串
关闭了数据库连接。。。。。。
版权声明
本文为[osc_15vyay19]所创,转载请带上原文链接,感谢
https://my.oschina.net/u/4368960/blog/4708103
边栏推荐
- Ali teaches you how to use the Internet of things platform! (Internet disk link attached)
- WLAN 直连(对等连接或 P2P)调研及iOS跨平台调研
- 维图PDMS切图软件
- 2 days, using 4 hours after work to develop a test tool
- 漫画|讲解一下如何写简历&项目
- next.js实现服务端缓存
- 学习小结(关于深度学习、视觉和学习体会)
- 供货紧张!苹果被曝 iPhone 12 电源芯片产能不足
- Mozi college SQL injection solution
- 阿里出品!视觉计算开发者系列手册(附网盘链接)
猜你喜欢

Ali teaches you how to use the Internet of things platform! (Internet disk link attached)

Shell uses. Net objects to send mail

The most complete! Alibaba economy cloud original practice! (Internet disk link attached)

一文剖析2020年最火十大物联网应用|IoT Analytics 年度重磅报告出炉!

C language I blog assignment 03

Rust: performance test criteria Library

你的云服务器可以用来做什么?云服务器有什么用途?

OR Talk NO.19 | Facebook田渊栋博士:基于蒙特卡洛树搜索的隐动作集黑盒优化 - 知乎

Japan PSE certification

Flink's sink: a preliminary study
随机推荐
Game optimization performance (11) - Zhihu
Adobe media encoder /Me 2021软件安装包(附安装教程)
Top 5 Chinese cloud manufacturers in 2018: Alibaba cloud, Tencent cloud, AWS, telecom, Unicom
分布式文档存储数据库之MongoDB基础入门
在51CTO学院Get到PMP证书
VC + + specified directory file output by time
最全!阿里巴巴经济体云原生实践!(附网盘链接)
2018中国云厂商TOP5:阿里云、腾讯云、AWS、电信、联通 ...
On monotonous stack
Top 5 Chinese cloud manufacturers in 2018: Alibaba cloud, Tencent cloud, AWS, telecom, Unicom
仅用六种字符来完成Hello World,你能做到吗?
Windows10关机问题----只有“睡眠”、“更新并重启”、“更新并关机”,但是又不想更新,解决办法
当Kubernetes遇到机密计算,看阿里巴巴如何保护容器内数据的安全!(附网盘链接)
Powershell 使用.Net对象发送邮件
蘑菇街电商交易平台服务架构及改造优化历程(含PPT)
Share the experience of passing the PMP examination
墨者学院SQL注入解题
Or talk No.19 | Facebook Dr. Tian Yuandong: black box optimization of hidden action set based on Monte Carlo tree search
Analysis of istio access control
一文读懂机器学习“数据中毒”