当前位置:网站首页>[Flink] aggregation operator
[Flink] aggregation operator
2022-07-25 03:27:00 【No bug is the biggest bug】
One 、 Entity class
@Data
public class Event {
public String user;
public String url;
public Long timestamp;
public Event(String user, String url, Long timestamp) {
this.user = user;
this.url = url;
this.timestamp = timestamp;
}
}Two 、 Custom data sources simulate streaming data
public class ClickSource implements SourceFunction<Event> {
// Declare a flag class
private Boolean running = true;
@Override
public void run(SourceContext<Event> ctx) throws Exception {
// Randomly generated data
Random random = new Random();
// Define the data set selected by the field
String[] users = {"Mary","Alice","Bob","Cary"};
String[] urls = {"./home","./cart","./fav","./prod?id=100","./prod:id=10"};
// Loop data
while (running){
String user = users[random.nextInt(users.length)];
String url = urls[random.nextInt(user.length())];
Long time = System.currentTimeMillis();
ctx.collect(new Event(user,url,time));
Thread.sleep(2000);
}
}
@Override
public void cancel() {
running = false;
}
}3、 ... and 、 Task code
public static void main(String[] args) throws Exception {
// Create an execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Set parallelism
env.setParallelism(1);
// Read data source
DataStreamSource<Event> dataStream = env.addSource(new ClickSource());
// Press key groups to perform aggregate query Extract the latest access data of the current user max Only take the maximum value of the current specified field maxBy Round the maximum value of a piece of data
SingleOutputStreamOperator<Event> max = dataStream.keyBy(new KeySelector<Event, String>() {
// Group here by user name
@Override
public String getKey(Event event) throws Exception {
return event.user;
}
}).max("timestamp");
SingleOutputStreamOperator<Event> maxBy = dataStream.keyBy(new KeySelector<Event, String>() {
// Group here by user name
@Override
public String getKey(Event event) throws Exception {
return event.user;
}
}).maxBy("timestamp");
// Print
max.print("max");
maxBy.print("maxBy");
// Start
env.execute();
}边栏推荐
- Message queue (MQ)
- Query the information of students whose grades are above 80
- Vscode copy synchronization plug-in expansion
- Network security - comprehensive penetration test -cve-2018-10933-libssh maintain access
- Database transactions (often asked)
- PHP record
- Vscode configuration, eslint+prettier combined with detailed configuration steps, standardized development
- Force deduction brush question 7. Integer inversion
- Many local and municipal supervision departments carried out cold drink sampling inspection, and Zhong Xue's high-quality products were all qualified
- JS password combination rule - 8-16 digit combination of numbers and characters, not pure numbers and pure English
猜你喜欢

Brief understanding of operational amplifier

mysql_ Master slave synchronization_ Show slave status details

Solution: owner's smart site supervision cloud platform

Electronic bidding procurement mall system: optimize traditional procurement business and speed up enterprise digital upgrading
![Matplotlib tutorial (I) [getting to know Matplotlib first]](/img/dc/e7dfc435900c14e3e9be07f6ad75fe.png)
Matplotlib tutorial (I) [getting to know Matplotlib first]

Use of stm32cubemonitor part I - data plotting and instrument display

292. Nim game

05 - MVVM model

B. Almost Ternary Matrix

Message queue (MQ)
随机推荐
Function of each layer of data warehouse
mysql_ Backup restore_ Specify table_ Backup table_ Restore table_ innobackup
144. Preorder traversal of binary tree
Openlayers draw deletes the last point when drawing
[template engine] microservice Learning Notes 6: freemaker
Dc-1-practice
[brother hero July training] day 19: binary tree
Image processing based on hog feature
04 -- two ways of writing el and data
How to use two queues to simulate the implementation of a stack
Moveit2 - 10.urdf and SRDF
What is technical support| Daily anecdotes
C language_ Structure introduction
What should testers do if they encounter a bug that is difficult to reproduce?
Dc-2-range practice
C. Mark and His Unfinished Essay
Canvas record
mysql_ Account authorization permission recycling, account locking and unlocking, account creation and deletion
05 - MVVM model
Network security - comprehensive penetration test -cve-2018-10933-libssh maintain access