当前位置:网站首页>Flink从入门到真香(6、Flink实现UDF函数-实现更细粒度的控制流)
Flink从入门到真香(6、Flink实现UDF函数-实现更细粒度的控制流)
2020-11-08 12:06:00 【osc_15vyay19】
Flink提供了各种数据的转换操作,但实际业务过程中有很多业务上需要处理的数据结构、规则等等,需要自己写自己的业务代码,这时候就用到的flink提供的函数类(Function Class)
Flink暴露了所有udf函数的接口(实现方式为接口或者抽象类),例如MapFunction,FilterFunction,ProcessFunction等。
一个小栗子,要筛选数据中以sensor3为开头的数据
还是在com.mafei.apitest新建一个scala Object UDFTest1
其他代码跟之前一样,读取文件做些简单处理,这里增加了一个自定义的函数类MyFilterFunction,在使用时,只需要在逻辑处增加.filter方法即可,
package com.mafei.apitest
import org.apache.flink.api.common.functions.{FilterFunction, ReduceFunction, RichFilterFunction}
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
//获取传感器数据
case class SensorReadingTest1(id: String,timestamp: Long, temperature: Double)
object UdfTest1 {
def main(args: Array[String]): Unit = {
//创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
case class Person(name: String, age: Int)
val inputStream= env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt")
env.setParallelism(1)
// inputStream.print()
//先转换成样例类类型
val dataStream = inputStream
.map(data => {
val arr = data.split(",") //按照,分割数据,获取结果
SensorReadingTest1(arr(0), arr(1).toLong, arr(2).toDouble) //生成一个传感器类的数据,参数中传toLong和toDouble是因为默认分割后是字符串类别
// }).filter(new MyFilterFunction)
// }).filter(_.id.startsWith("sensor1")) //如果特别简单的逻辑,也可以匿名类直接这样子写,和写一个函数是一样的效果
// }).filter(new RichFilterFunction[SensorReadingTest1] {
// override def filter(t: SensorReadingTest1): Boolean =
// t.id.startsWith("sensor3")
// }) //匿名类的实现效果,和上面2种效果都是一样的
}).filter(new KeywordFilterFunction("sensor3")) //也可以把要过滤的参数传进去
dataStream.print()
env.execute("udf test")
}
}
//自定义一个函数类,做过滤,实现接口中的filter方法即可
class MyFilterFunction extends FilterFunction[SensorReadingTest1] {
override def filter(t: SensorReadingTest1): Boolean = t.id.startsWith("sensor3")
}
//自定义的函数类,和上面一样,增加了传参,
class KeywordFilterFunction(keyword: String) extends FilterFunction[SensorReadingTest1]{
override def filter(t: SensorReadingTest1): Boolean =
t.id.startsWith(keyword)
}
代码结构及运行效果图
RichMap
主要做一些数据处理等操作,代码演示了 MapperDemo和RichMapDemo的区别及运行效果
package com.mafei.apitest
import org.apache.flink.api.common.functions.{FilterFunction, MapFunction, RichMapFunction}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
//获取传感器数据
case class SensorReadingTest2(id: String,timestamp: Long, temperature: Double)
object UdfTest2 {
def main(args: Array[String]): Unit = {
//创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
case class Person(name: String, age: Int)
val inputStream= env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt")
env.setParallelism(1)
// inputStream.print()
//先转换成样例类类型
val dataStream = inputStream
.map(data => {
val arr = data.split(",") //按照,分割数据,获取结果
SensorReadingTest2(arr(0), arr(1).toLong, arr(2).toDouble) //生成一个传感器类的数据,参数中传toLong和toDouble是因为默认分割后是字符串类别
}).map(new RichMapDemo())
dataStream.print()
env.execute("udf test")
}
}
class MapperDemo extends MapFunction[SensorReadingTest2, String]{
override def map(t: SensorReadingTest2): String = t.id+"测试加一些字符串"
}
//富函数,比上面类多了open和close等方法,可以做些数据库连接等操作
class RichMapDemo extends RichMapFunction[SensorReadingTest2, String]{
//这里主要是一些初始化操作,启动调用时,整个过程只会调用一次,类似于类初始化加载的变量,像数据库连接等等
override def open(parameters: Configuration): Unit = {
println("进行了数据库连接。。。。。。。。。。")
//获取运行时上下文
getRuntimeContext()
}
//每条数据都会经过这个方法
override def map(in: SensorReadingTest2): String = in.id+"测试富函数加一些字符串"
override def close(): Unit = {
//跟open类似,当任务停止时会执行,可以做一些如释放数据库连接等等
print("关闭了数据库连接。。。。。。")
}
}
运行效果:可以看到,整个过程中,只有一次数据库连接操作
进行了数据库连接。。。。。。。。。。
sensor1测试富函数加一些字符串
sensor2测试富函数加一些字符串
sensor3测试富函数加一些字符串
sensor4测试富函数加一些字符串
sensor4测试富函数加一些字符串
sensor4测试富函数加一些字符串
关闭了数据库连接。。。。。。
版权声明
本文为[osc_15vyay19]所创,转载请带上原文链接,感谢
https://my.oschina.net/u/4368960/blog/4708103
边栏推荐
- Shell uses. Net objects to send mail
- Top 5 Chinese cloud manufacturers in 2018: Alibaba cloud, Tencent cloud, AWS, telecom, Unicom
- VC + + specified directory file output by time
- On monotonous stack
- 软件测试培训班出来好找工作么
- 一个方案提升Flutter内存利用率
- 原创 | 数据资产确权浅议
- Rust: performance test criteria Library
- 【计算机网络】学习笔记,第三篇:数据链路层(谢希仁版)
- Windows10关机问题----只有“睡眠”、“更新并重启”、“更新并关机”,但是又不想更新,解决办法
猜你喜欢
Win10 terminal + WSL 2 installation and configuration guide, exquisite development experience
Written interview topic: looking for the lost pig
你的云服务器可以用来做什么?云服务器有什么用途?
2 days, using 4 hours after work to develop a test tool
当Kubernetes遇到机密计算,看阿里巴巴如何保护容器内数据的安全!(附网盘链接)
[data structure Python description] use hash table to manually implement a dictionary class based on Python interpreter
When kubernetes encounters confidential computing, see how Alibaba protects the data in the container! (Internet disk link attached)
Mozi college SQL injection solution
年轻一代 winner 的程序人生,改变世界的起点藏在身边
一文读懂机器学习“数据中毒”
随机推荐
攻防世界之web新手题
不多不少,大学里必做的五件事(从我的大一说起)
YGC问题排查,又让我涨姿势了!
一文读懂机器学习“数据中毒”
入门级!教你小程序开发不求人(附网盘链接)
Top 5 Chinese cloud manufacturers in 2018: Alibaba cloud, Tencent cloud, AWS, telecom, Unicom
If you don't understand the gap with others, you will never become an architect! What's the difference between a monthly salary of 15K and a monthly salary of 65K?
2018中国云厂商TOP5:阿里云、腾讯云、AWS、电信、联通 ...
It's worth seeing! EMR elastic low cost offline big data analysis best practice (with network disk link)
分布式文档存储数据库之MongoDB基础入门
next.js实现服务端缓存
【计算机网络】学习笔记,第三篇:数据链路层(谢希仁版)
Win10 terminal + WSL 2 installation and configuration guide, exquisite development experience
华为云重大变革:Cloud&AI 升至华为第四大 BG ,火力全开
Python基础语法
你的云服务器可以用来做什么?云服务器有什么用途?
供货紧张!苹果被曝 iPhone 12 电源芯片产能不足
Win10 Terminal + WSL 2 安装配置指南,精致开发体验
解析Istio访问控制
PMP心得分享