当前位置:网站首页>Flink: from introduction to Zhenxiang (6. Flink implements UDF function - realizes more fine-grained control flow)
Flink: from introduction to Zhenxiang (6. Flink implements UDF function - realizes more fine-grained control flow)
2020-11-08 12:06:00 【osc_15vyay19】
Flink Provides a variety of data conversion operations , But in the actual business process, there are many data structures that need to be processed in business 、 Rules and so on , You need to write your own business code , It's used at this time flink Provided function class (Function Class)
Flink Exposed everything udf Function interface ( The implementation mode is interface or abstract class ), for example MapFunction,FilterFunction,ProcessFunction etc. .
A small chestnut , To filter the data to sensor3 Start with data
Still com.mafei.apitest Create a new one scala Object UDFTest1
The rest of the code is the same as before , Read the file and do some simple processing , A custom function class is added here MyFilterFunction, When use , Just add... To the logic .filter The method can ,
package com.mafei.apitest
import org.apache.flink.api.common.functions.{FilterFunction, ReduceFunction, RichFilterFunction}
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
// Get sensor data
case class SensorReadingTest1(id: String,timestamp: Long, temperature: Double)
object UdfTest1 {
def main(args: Array[String]): Unit = {
// Create an execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
case class Person(name: String, age: Int)
val inputStream= env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt")
env.setParallelism(1)
// inputStream.print()
// First convert to sample class type
val dataStream = inputStream
.map(data => {
val arr = data.split(",") // according to , Split data , To get the results
SensorReadingTest1(arr(0), arr(1).toLong, arr(2).toDouble) // Generate data for a sensor class , Parameters are passed in the middle toLong and toDouble Because the default split is string category
// }).filter(new MyFilterFunction)
// }).filter(_.id.startsWith("sensor1")) // If it's very simple logic , You can also write anonymous classes like this , It's the same effect as writing a function
// }).filter(new RichFilterFunction[SensorReadingTest1] {
// override def filter(t: SensorReadingTest1): Boolean =
// t.id.startsWith("sensor3")
// }) // Anonymous class implementation effect , And above 2 The effects are the same
}).filter(new KeywordFilterFunction("sensor3")) // You can also pass in the parameters to be filtered
dataStream.print()
env.execute("udf test")
}
}
// Customize a function class , Filter it , Implement... In the interface filter The method can
class MyFilterFunction extends FilterFunction[SensorReadingTest1] {
override def filter(t: SensorReadingTest1): Boolean = t.id.startsWith("sensor3")
}
// Custom function class , Same as above , Added the transmission reference ,
class KeywordFilterFunction(keyword: String) extends FilterFunction[SensorReadingTest1]{
override def filter(t: SensorReadingTest1): Boolean =
t.id.startsWith(keyword)
}
Code structure and running effect diagram

RichMap
Mainly do some data processing and other operations , The code demonstrates MapperDemo and RichMapDemo The difference and operation effect of
package com.mafei.apitest
import org.apache.flink.api.common.functions.{FilterFunction, MapFunction, RichMapFunction}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
// Get sensor data
case class SensorReadingTest2(id: String,timestamp: Long, temperature: Double)
object UdfTest2 {
def main(args: Array[String]): Unit = {
// Create an execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
case class Person(name: String, age: Int)
val inputStream= env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt")
env.setParallelism(1)
// inputStream.print()
// First convert to sample class type
val dataStream = inputStream
.map(data => {
val arr = data.split(",") // according to , Split data , To get the results
SensorReadingTest2(arr(0), arr(1).toLong, arr(2).toDouble) // Generate data for a sensor class , Parameters are passed in the middle toLong and toDouble Because the default split is string category
}).map(new RichMapDemo())
dataStream.print()
env.execute("udf test")
}
}
class MapperDemo extends MapFunction[SensorReadingTest2, String]{
override def map(t: SensorReadingTest2): String = t.id+" Test to add some strings "
}
// Rich function , There are more classes than above open and close Other methods , Can do some database connection and other operations
class RichMapDemo extends RichMapFunction[SensorReadingTest2, String]{
// The main operations here are initialization , When starting the call , The whole process will only be called once , It is similar to the variables loaded by class initialization , Like database connection and so on
override def open(parameters: Configuration): Unit = {
println(" A database connection was made ..........")
// Get runtime context
getRuntimeContext()
}
// Every data goes through this method
override def map(in: SensorReadingTest2): String = in.id+" Test the rich function and add some strings "
override def close(): Unit = {
// Follow open similar , When the task stops , You can do something like release database connection and so on
print(" Closed database connection ......")
}
}
Running effect : You can see , The whole process , Only one database connection operation
A database connection was made ..........
sensor1 Test the rich function and add some strings
sensor2 Test the rich function and add some strings
sensor3 Test the rich function and add some strings
sensor4 Test the rich function and add some strings
sensor4 Test the rich function and add some strings
sensor4 Test the rich function and add some strings
Closed database connection ......
版权声明
本文为[osc_15vyay19]所创,转载请带上原文链接,感谢
边栏推荐
- 2018中国云厂商TOP5:阿里云、腾讯云、AWS、电信、联通 ...
- 一文读懂机器学习“数据中毒”
- [data structure Python description] use hash table to manually implement a dictionary class based on Python interpreter
- 分布式文档存储数据库之MongoDB基础入门
- On the confirmation of original data assets
- Python basic syntax variables
- 分布式文档存储数据库之MongoDB基础入门
- 当Kubernetes遇到机密计算,看阿里巴巴如何保护容器内数据的安全!(附网盘链接)
- Bohai bank million level fines continue: Li Volta said that the governance is perfect, the growth rate is declining
- python基础教程python opencv pytesseract 验证码识别的实现
猜你喜欢

Top 5 Chinese cloud manufacturers in 2018: Alibaba cloud, Tencent cloud, AWS, telecom, Unicom

Share the experience of passing the PMP examination

在51CTO学院Get到PMP证书

C language I blog assignment 03

Get PMP certificate at 51CTO College

Shell uses. Net objects to send mail

Or talk No.19 | Facebook Dr. Tian Yuandong: black box optimization of hidden action set based on Monte Carlo tree search

2018中国云厂商TOP5:阿里云、腾讯云、AWS、电信、联通 ...

A scheme to improve the memory utilization of flutter

C语言I博客作业03
随机推荐
C语言I博客作业03
Adobe Lightroom /Lr 2021软件安装包(附安装教程)
Ali tear off the e-commerce label
VC + + specified directory file output by time
Analysis of istio access control
Xamarin deploys IOS from scratch Walterlv.CloudKeyboard application
laravel8更新之速率限制改进
一文读懂机器学习“数据中毒”
擅长To C的腾讯,如何借腾讯云在这几个行业云市场占有率第一?
This time Kwai tiktok is faster than shaking.
Top 5 Chinese cloud manufacturers in 2018: Alibaba cloud, Tencent cloud, AWS, telecom, Unicom
Flink从入门到真香(3、从集合和文件中读取数据)
Flink的sink实战之一:初探
[data structure Python description] use hash table to manually implement a dictionary class based on Python interpreter
个人目前技术栈
2018中国云厂商TOP5:阿里云、腾讯云、AWS、电信、联通 ...
Share the experience of passing the PMP examination
Ubuntu20.04下访问FTP服务器乱码问题+上传文件
11 server monitoring tools commonly used by operation and maintenance personnel
If you don't understand the gap with others, you will never become an architect! What's the difference between a monthly salary of 15K and a monthly salary of 65K?