当前位置:网站首页>Flink从入门到真香(6、Flink实现UDF函数-实现更细粒度的控制流)
Flink从入门到真香(6、Flink实现UDF函数-实现更细粒度的控制流)
2020-11-08 12:06:00 【osc_15vyay19】
Flink提供了各种数据的转换操作,但实际业务过程中有很多业务上需要处理的数据结构、规则等等,需要自己写自己的业务代码,这时候就用到的flink提供的函数类(Function Class)
Flink暴露了所有udf函数的接口(实现方式为接口或者抽象类),例如MapFunction,FilterFunction,ProcessFunction等。
一个小栗子,要筛选数据中以sensor3为开头的数据
还是在com.mafei.apitest新建一个scala Object UDFTest1
其他代码跟之前一样,读取文件做些简单处理,这里增加了一个自定义的函数类MyFilterFunction,在使用时,只需要在逻辑处增加.filter方法即可,
package com.mafei.apitest
import org.apache.flink.api.common.functions.{FilterFunction, ReduceFunction, RichFilterFunction}
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
//获取传感器数据
case class SensorReadingTest1(id: String,timestamp: Long, temperature: Double)
object UdfTest1 {
def main(args: Array[String]): Unit = {
//创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
case class Person(name: String, age: Int)
val inputStream= env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt")
env.setParallelism(1)
// inputStream.print()
//先转换成样例类类型
val dataStream = inputStream
.map(data => {
val arr = data.split(",") //按照,分割数据,获取结果
SensorReadingTest1(arr(0), arr(1).toLong, arr(2).toDouble) //生成一个传感器类的数据,参数中传toLong和toDouble是因为默认分割后是字符串类别
// }).filter(new MyFilterFunction)
// }).filter(_.id.startsWith("sensor1")) //如果特别简单的逻辑,也可以匿名类直接这样子写,和写一个函数是一样的效果
// }).filter(new RichFilterFunction[SensorReadingTest1] {
// override def filter(t: SensorReadingTest1): Boolean =
// t.id.startsWith("sensor3")
// }) //匿名类的实现效果,和上面2种效果都是一样的
}).filter(new KeywordFilterFunction("sensor3")) //也可以把要过滤的参数传进去
dataStream.print()
env.execute("udf test")
}
}
//自定义一个函数类,做过滤,实现接口中的filter方法即可
class MyFilterFunction extends FilterFunction[SensorReadingTest1] {
override def filter(t: SensorReadingTest1): Boolean = t.id.startsWith("sensor3")
}
//自定义的函数类,和上面一样,增加了传参,
class KeywordFilterFunction(keyword: String) extends FilterFunction[SensorReadingTest1]{
override def filter(t: SensorReadingTest1): Boolean =
t.id.startsWith(keyword)
}
代码结构及运行效果图
RichMap
主要做一些数据处理等操作,代码演示了 MapperDemo和RichMapDemo的区别及运行效果
package com.mafei.apitest
import org.apache.flink.api.common.functions.{FilterFunction, MapFunction, RichMapFunction}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
//获取传感器数据
case class SensorReadingTest2(id: String,timestamp: Long, temperature: Double)
object UdfTest2 {
def main(args: Array[String]): Unit = {
//创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
case class Person(name: String, age: Int)
val inputStream= env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt")
env.setParallelism(1)
// inputStream.print()
//先转换成样例类类型
val dataStream = inputStream
.map(data => {
val arr = data.split(",") //按照,分割数据,获取结果
SensorReadingTest2(arr(0), arr(1).toLong, arr(2).toDouble) //生成一个传感器类的数据,参数中传toLong和toDouble是因为默认分割后是字符串类别
}).map(new RichMapDemo())
dataStream.print()
env.execute("udf test")
}
}
class MapperDemo extends MapFunction[SensorReadingTest2, String]{
override def map(t: SensorReadingTest2): String = t.id+"测试加一些字符串"
}
//富函数,比上面类多了open和close等方法,可以做些数据库连接等操作
class RichMapDemo extends RichMapFunction[SensorReadingTest2, String]{
//这里主要是一些初始化操作,启动调用时,整个过程只会调用一次,类似于类初始化加载的变量,像数据库连接等等
override def open(parameters: Configuration): Unit = {
println("进行了数据库连接。。。。。。。。。。")
//获取运行时上下文
getRuntimeContext()
}
//每条数据都会经过这个方法
override def map(in: SensorReadingTest2): String = in.id+"测试富函数加一些字符串"
override def close(): Unit = {
//跟open类似,当任务停止时会执行,可以做一些如释放数据库连接等等
print("关闭了数据库连接。。。。。。")
}
}
运行效果:可以看到,整个过程中,只有一次数据库连接操作
进行了数据库连接。。。。。。。。。。
sensor1测试富函数加一些字符串
sensor2测试富函数加一些字符串
sensor3测试富函数加一些字符串
sensor4测试富函数加一些字符串
sensor4测试富函数加一些字符串
sensor4测试富函数加一些字符串
关闭了数据库连接。。。。。。
版权声明
本文为[osc_15vyay19]所创,转载请带上原文链接,感谢
https://my.oschina.net/u/4368960/blog/4708103
边栏推荐
- C语言I博客作业03
- Windows10关机问题----只有“睡眠”、“更新并重启”、“更新并关机”,但是又不想更新,解决办法
- Recommend an economic science video, very valuable!
- 2018中国云厂商TOP5:阿里云、腾讯云、AWS、电信、联通 ...
- The container with the most water
- 2018中国云厂商TOP5:阿里云、腾讯云、AWS、电信、联通 ...
- If you don't understand the gap with others, you will never become an architect! What's the difference between a monthly salary of 15K and a monthly salary of 65K?
- 这次,快手终于比抖音'快'了!
- 当Kubernetes遇到机密计算,看阿里巴巴如何保护容器内数据的安全!(附网盘链接)
- Hematemesis! Alibaba Android Development Manual! (Internet disk link attached)
猜你喜欢
Win10 Terminal + WSL 2 安装配置指南,精致开发体验
Adobe media encoder /Me 2021软件安装包(附安装教程)
Entry level! Teach you how to develop small programs without asking for help (with internet disk link)
笔试面试题目:判断单链表是否有环
Windows10关机问题----只有“睡眠”、“更新并重启”、“更新并关机”,但是又不想更新,解决办法
你搞不懂与别人的差距,永远成不了架构师!月薪15K和月薪65K,你差在那了?
Python基础语法
阿里教你深入浅出玩转物联网平台!(附网盘链接)
Istio traffic management -- progress gateway
Rust: command line parameter and environment variable operation
随机推荐
Top 5 Chinese cloud manufacturers in 2018: Alibaba cloud, Tencent cloud, AWS, telecom, Unicom
Python Gadgets: code conversion
Don't look! Full interpretation of Alibaba cloud's original data lake system! (Internet disk link attached)
Python基础语法
在51CTO学院Get到PMP证书
如何将 PyTorch Lightning 模型部署到生产中
第二次作业
新的目标市场在哪里?锚定的产品是什么?| 十问2021中国企业服务
Oops, the system is under attack again
笔试面试题目:判断单链表是否有环
C语言I博客作业03
一个方案提升Flutter内存利用率
Flink's sink: a preliminary study
Win10 Terminal + WSL 2 安装配置指南,精致开发体验
python小工具:编码转换
What can your cloud server do? What is the purpose of cloud server?
WLAN 直连(对等连接或 P2P)调研及iOS跨平台调研
Dogs can also operate drones! You're right, but it's actually an autonomous drone - you know
蘑菇街电商交易平台服务架构及改造优化历程(含PPT)
Win10 terminal + WSL 2 installation and configuration guide, exquisite development experience