当前位置:网站首页>Getting started with Flink - word statistics
Getting started with Flink - word statistics
2022-06-26 09:44:00 【Look at the data at the top of the mountain】
Using batch processing wc
/*
* The batch
* */
public class WordCount {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource<String> data = env.readTextFile("input/a.txt");
FlatMapOperator<String, Tuple2<String, Long>> wordsTuple = data.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
String[] words = line.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word, 1L));
}
}).returns(Types.TUPLE(Types.STRING,Types.LONG));
// One thing to note here is the use of Java Medium lambda When the expression , Need to add .returns(Types.TUPLE(Types.STRING,Types.LONG)), Because when Lambda Expressions use java When generics , Due to the existence of generic erasure , Declaration type information to be displayed
UnsortedGrouping<Tuple2<String, Long>> groupWord = wordsTuple.groupBy(0);
AggregateOperator<Tuple2<String, Long>> result = groupWord.sum(1);
result.print();
}
}
Using bounded flow processing wc
/*
* Bounded flow processing
* */
public class Word {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
DataStream<String> source = environment.readTextFile("input/a.txt");
SingleOutputStreamOperator<String> flatMap = source.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String line, Collector<String> out) throws Exception {
String[] words = line.split(" ");
for (String word : words) {
out.collect(word);
}
}
});
// SingleOutputStreamOperator<Tuple2<String, Integer>> map = flatMap.map(word -> Tuple2.of(word, 1)).returns(Types.TUPLE(Types.STRING,Types.INT));
SingleOutputStreamOperator<Tuple2<String, Integer>> map = flatMap.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String s) throws Exception {
return Tuple2.of(s, 1);
}
});
KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = map.keyBy(0);
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = keyedStream.sum(1);
sum.print();
environment.execute();
}
}
/*
* Bounded flow processing
* */
public class Word {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
DataStream<String> source = environment.readTextFile("input/a.txt");
SingleOutputStreamOperator<Tuple2<String, Integer>> result = source.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] words = line.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word, 1));
}
}
}).keyBy(0).sum(1);
result.print();
environment.execute();
}
}
边栏推荐
- 十万行事务锁,开了眼界了。
- "One week's work on Analog Electronics" - optocoupler and other components
- 欧冠比赛数据集(梅西不哭-离开巴萨也可能再创巅峰)
- 【CVPR 2019】Semantic Image Synthesis with Spatially-Adaptive Normalization(SPADE)
- 软件测试---如何选择合适的正交表
- thinkphp6.0的第三方扩展包,支持上传阿里云,七牛云
- 爬虫相关文章收藏:pyppeteer 、Burpsuite
- jz2440---使用uboot燒錄程序
- Shared by Merrill Lynch data technology expert team, smoking detection related practice based on Jetson nano
- "One week's work on Analog Electronics" - integrated operational amplifier
猜你喜欢

正则表达的学习

2021-11-22 运动规划杂记

Wechat official account reported error 10003

逻辑英语结构【重点】

Badge series 4: use of circle Ci

c语言语法基础之——局部变量及存储类别、全局变量及存储类别、宏定义 学习

The first techo day Tencent technology open day, 628

mysql 数据库字段查询区分大小写设置

install realsense2: The following packages have unmet dependencies: libgtk-3-dev

How to solve the problem that NVIDIA model cannot be viewed by inputting NVIDIA SMI and quickly view NVIDIA model information of computer graphics card
随机推荐
"One week's work on Analog Electronics" - power amplifier
英语常用短语
2021年全国职业院校技能大赛(中职组)网络安全竞赛试题(1)详细解析教程
Origin of QPM
How to solve the sample imbalance problem in machine learning?
MapReduce & yarn theory
I am in Zhongshan. Where can I open an account? Is online account opening safe?
测试须知——常见接口协议解析
【CVPR 2021】Intra-Inter Camera Similarity for Unsupervised Person Re-Identification (IICS++)
Spark based distributed parallel processing optimization strategy - Merrill Lynch data
节流,防抖,new函数,柯里化
LeetCode 958. 二叉树的完全性校验
点击遮罩层关闭弹窗
npm WARN config global `--global`, `--local` are deprecated. Use `--location=global` instead. npm ER
2021年全国职业院校技能大赛(中职组)网络安全竞赛试题(2)详解
The first techo day Tencent technology open day, 628
Single sign on logic
The most complete and simple nanny tutorial: deep learning environment configuration anaconda+pychart+cuda+cudnn+tensorflow+pytorch
install ompl.sh
kubernetes集群部署(v1.23.5)