当前位置:网站首页>FlinkSQL-UDF自定义数据源
FlinkSQL-UDF自定义数据源
2022-07-24 07:31:00 【zxc132465258】
昨天遇到了一个比较好玩的需求,要测试Flink-iceberg的版本问题
同时不能改动线上flink和iceberg的版本
平台已经提供了,在iceberg专属的FlinkSQL端是可以勾选iceberg版本的
但是自定义数据源插入iceberg一般用的是jar包,如何不通过jar直接通过SQL生成自定义数据源
阿没错!那就是我们万能的UDF啦!UDF写一个connector,类似data-gen,想一想都很兴奋有木有,然后我发现,好像是没有这样的接口诶
但是没有关系!函数可以实现万物,本身计算机大部分都是函数做得,如果你觉得函数做不了,那只是因为自己实现不了这样的函数
在官方网站中我找到了灵感,一列对多列输出!
好了就是你了,用datagen控制生成速率,输出一个无效值,然后通过接收无效值来生成我自己的行,好一个偷天换日呀
那么直接用Java来实现flink UDF,接受一行,然后输出自己任意想模拟的数据
package udf2;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
import java.util.Random;
@FunctionHint(output = @DataTypeHint("ROW<id BIGINT, data1 STRING, data2 INT, data4 DOUBLE>"))
public class generateRowUdtf extends TableFunction<Row> {
public void eval(String a) {
Random random = new Random();
long id = Math.abs(random.nextLong()) % 20000;
String data1 = Math.abs(random.nextInt()) % 2000000 + "";
int data2 = Math.abs(random.nextInt()) % 2000;
double data4 = Math.abs(Math.random() * 2000 + 1);
collect(Row.of(id,data1,data2,data4));
}
}
然后UDF上传,写下如下SQL,对这个表进行连接操作,SQL可以在本地进行测试~直接print即可看到效果,非常滴神奇~ 这样子我们的自定义生成器就做好啦(全是api,你做了个锤子)
CREATE TEMPORARY table test_insert(
id2 String
)WITH(
'connector' = 'datagen',
'rows-per-second'='100',
'fields.id2.kind'='random',
'fields.id2.length'='8'
);
CREATE TEMPORARY SYSTEM FUNCTION generateRowUdtf AS 'udf2.generateRowUdtf';
insert into xxx
SELECT T.id, T.data1, T.data2, T.data4
FROM test_insert AS S
left join lateral table(generateRowUdtf(id2)) AS T(id,data1,data2,data4) on true;边栏推荐
- stdafx. H introduction and function
- 从CIA看常见网络攻击(爆破,PE,流量攻击)
- Opencascade notes: GP package
- MySQL queries all parents of the current node
- Bookkeeping app: xiaoha bookkeeping 1 - production of welcome page
- Customization or GM, what is the future development trend of SaaS in China?
- C语言文件操作
- fopen、fwrite、fseek、ftell、fread使用demo
- Buddy: core function entry
- JS_ Realize the separation of multiple lines of text into an array according to the newline
猜你喜欢

File upload and download demo

Stm32h750vbt6 drives programmable gain amplifier module pga113 -- Hal Library Based on cubemx

Kali安装pip以及pip换源

解压主播狂揽4000w+播放,快手美食赛道又添新风向?

php 转义字符串

Using depth and normal textures in unity
![[leetcode] 11. Container with the most water - go language solution](/img/42/3a1839dd768a5f02dc2acb5bd66438.png)
[leetcode] 11. Container with the most water - go language solution

QoS服务质量四QoS边界行为之流量监管

Part II - C language improvement_ 3. Pointer reinforcement

Compilation and debugging (GCC, g++, GDB)
随机推荐
Problems encountered in inserting large quantities of data into the database in the project
Nacos的高级部分
23. Component customization events
mysql查询当前节点的所有父级
[leetcode] 444. Sequence reconstruction
服务漏洞&FTP&RDP&SSH&rsync
Cloud version upgrade
nacos配置中心源码分析
DOM operation of JS -- style operation
24. Global event bus
Jay Chou's live broadcast was watched by more than 6.54 million people, with a total interaction volume of 450million, helping Kwai break the record again
从CIA看常见网络攻击(爆破,PE,流量攻击)
Log in to the server using the fortress machine (springboard machine)
Blockbuster live broadcast | orb-slam3 series code explanation map points (topic 2)
Three implementation methods of single sign on
PHP escape string
[steering wheel] the super favorite idea efficiency artifact save actions is uninstalled
Injectfix principle learning (to realize the heat of repair addition)
QoS quality of service three DiffServ Model message marking and PHB
numpy.arange