当前位置:网站首页>[Flink] protocol operator reduce
[Flink] protocol operator reduce
2022-07-25 03:27:00 【No bug is the biggest bug】
One 、 Entity class
@Data
public class Event {
public String user;
public String url;
public Long timestamp;
public Event(String user, String url, Long timestamp) {
this.user = user;
this.url = url;
this.timestamp = timestamp;
}
}Two 、 Custom data sources simulate streaming data
public class ClickSource implements SourceFunction<Event> {
// Declare a flag class
private Boolean running = true;
@Override
public void run(SourceContext<Event> ctx) throws Exception {
// Randomly generated data
Random random = new Random();
// Define the data set selected by the field
String[] users = {"Mary","Alice","Bob","Cary"};
String[] urls = {"./home","./cart","./fav","./prod?id=100","./prod:id=10"};
// Loop data
while (running){
String user = users[random.nextInt(users.length)];
String url = urls[random.nextInt(user.length())];
Long time = System.currentTimeMillis();
ctx.collect(new Event(user,url,time));
Thread.sleep(2000);
}
}
@Override
public void cancel() {
running = false;
}
}3、 ... and 、 Task code
public static void main(String[] args) throws Exception {
// Create an execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Set parallelism
env.setParallelism(1);
// Read data source
DataStreamSource<Event> dataStream = env.addSource(new ClickSource());
// Count the access frequency of each user
SingleOutputStreamOperator<Tuple2<String, Integer>> map = dataStream.map(new MapFunction<Event, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(Event event) throws Exception {
return Tuple2.of(event.user, 1);
}
});
// Group first Group by user name
KeyedStream<Tuple2<String, Integer>, String> keyBy = map.keyBy(data -> data.f0);
// Make a protocol
SingleOutputStreamOperator<Tuple2<String, Integer>> reduce = keyBy.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {
return new Tuple2<>(t1.f0, t1.f1 + t2.f1);
}
});
// Select the most active user currently To specify a key Put all the data in the same group
SingleOutputStreamOperator<Tuple2<String, Integer>> operator = reduce.keyBy(data -> "key")
.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {
return t1.f1 > t2.f1 ? t1 : t2;
}
});
// Print
operator.print();
// Start
env.execute();
}边栏推荐
- Bgy development small example
- Secondary vocational network security skills competition P100 dcore (light CMS system) SQL injection
- Machine learning notes - building a recommendation system (4) matrix decomposition for collaborative filtering
- MySQL configuration in CDH installation
- mysql_ Backup restore_ Specify table_ Backup table_ Restore table_ innobackup
- FLASH read / write problem of stm32cubemx
- Solve mysql5.6 database specified key was too long; Max key length is 767 bytes
- Swagger key configuration items
- Fiddler grabs packets and displays err_ TUNNEL_ CONNECTION_ FAILED
- JS password combination rule - 8-16 digit combination of numbers and characters, not pure numbers and pure English
猜你喜欢

Question D: pruning shrubs

Li Kou 279 complete square - dynamic programming

Bubble sort / heap sort

Swagger key configuration items

Reasons for not sending requests after uni app packaging
![Easyexcel sets the style of the last row [which can be expanded to each row]](/img/25/8fb41f222cb3cca0119515d070b667.png)
Easyexcel sets the style of the last row [which can be expanded to each row]

Flink1.15 source code reading - Flink annotations

55k is stable, and the recommendation system will always drop God!

Brief understanding of operational amplifier

Openlayers ol ext: Transform object, rotate, stretch, zoom in
随机推荐
Hw2021 attack and defense drill experience - Insights
Imeta | ggclusternet microbial network analysis and visualization nanny level tutorial
kettle_ Configure database connection_ report errors
Swagger key configuration items
Hal library serial port for note taking
P100 MSSQL database penetration test of secondary vocational network security skills competition
JS common interview questions
Use of CCleaner
Enter an integer and a binary tree
Uni app configuration
Network security - comprehensive penetration test -cve-2018-10933-libssh maintain access
B. All Distinct
Dc-2-range practice
Page performance: how to optimize pages systematically?
Bubble sort / heap sort
Direct insert sort / Hill sort
Record once C # extract audio files with ffmempeg
Database transactions (often asked)
Brief understanding of operational amplifier
Canvas record