当前位置:网站首页>Flink practical tutorial: advanced 4-window top n
Flink practical tutorial: advanced 4-window top n
2022-06-23 21:43:00 【Wuyuntao】
Flow calculation Oceanus brief introduction
Flow calculation Oceanus It is a powerful tool for real-time analysis of big data product ecosystem , Is based on Apache Flink Built with one-stop development 、 Seamless connection 、 Sub second delay 、 Low cost 、 Enterprise class real-time big data analysis platform with the characteristics of security and stability . Flow calculation Oceanus The goal is to maximize the value of enterprise data , Accelerate the construction process of real-time digitization of enterprises .
This article will show you how to use Flink Implement common TopN Statistical needs . use first Python Script simulation generates commodity purchase data ( Send one message every second ) And send it to CKafka, Later on Oceanus Platform creation Flink SQL Job real-time reading CKafka Commodity data in , Scroll through window ( Based on event time ) Count the top three categories of goods purchased per minute (Top3), Finally, store the results in PostgreSQL.
Lead to
Create flow calculation Oceanus colony
In stream computing Oceanus Product activity page 1 Yuan purchase Oceanus colony .
Get into Oceanus Console [1], Click on the left side of the 【 Cluster management 】, Click on the top left 【 Create clusters 】, For details, please refer to Oceanus Official documents Create an exclusive cluster [2].
Create a message queue CKafka
Get into CKafka Console [3], Click on the top left corner 【 newly build 】, establish CKafka example , For details, please refer to CKafka Create examples [4]. Then click to enter the instance , single click 【topic management 】>【 newly build 】, Can finish Topic The creation of , For details, please refer to CKafka establish Topic [5].
Data preparation
This example uses Python Script to Topic Send analog data , Prerequisite: network interworking . Here we choose to be with CKafka Same as VPC Of CVM Get into , And install Python Environmental Science . If the network doesn't work , Can be found in CKafka In the example 【 essential information 】>【 Access mode 】>【 Add routing policy 】>【 Routing type 】 Choose from VPC The Internet or Public domain name access The way to get through the network , For details, please refer to CKafka Official website Introduction process guide [6].
#!/usr/bin/python3
# Use this script for the first time , Need to be "pip3 install kafka" install kafka modular
import json
import random
import time
from kafka import KafkaProducer
broker_lists = ['10.0.0.29:9092']
kafka_topic_oceanus = 'oceanus_advanced4_input'
producer = KafkaProducer(bootstrap_servers=broker_lists,
value_serializer=lambda m: json.dumps(m).encode('ascii'))
def send_data(topic):
user_id = random.randint(1,50)
item_id = random.randint(1,1000)
category_id = random.randint(1,20)
user_behaviors = ['pv','buy','cart','fav']
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
msg = {
'user_id':user_id,
'item_id':item_id,
'category_id':category_id,
'user_behavior':user_behaviors[random.randint(0,len(user_behaviors)-1)],
'time_stamp':current_time
}
producer.send(topic, msg)
print(msg)
producer.flush()
if __name__ == '__main__':
count = 1
while True:
# Send one piece of data per second
time.sleep(1)
send_data(kafka_topic_oceanus)For more access methods, please refer to CKafka Send and receive messages [7]
establish PostgreSQL example
Get into PostgreSQL Console [8], Click on the top left corner 【 newly build 】 Create examples , Specific reference establish PostgreSQL example [9]. Enter the instance database , establish oceanus_advanced4_output surface , For receiving data .
-- Create table statement create table public.oceanus_advanced4_output ( win_start TIMESTAMP, category_id INT, buy_count INT, PRIMARY KEY(win_start,category_id) );
I use DBeaver Connect to the Internet , For more connection methods, please refer to the official website documentation Connect PostgreSQL example [10]
Flow calculation Oceanus Homework
1. establish Source
CREATE TABLE `kafka_json_source_table` ( user_id INT, item_id INT, category_id INT, user_behavior VARCHAR, time_stamp TIMESTAMP(3), WATERMARK FOR time_stamp AS time_stamp - INTERVAL '3' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'oceanus_advanced4_input', -- Replace with what you want to consume Topic 'scan.startup.mode' = 'latest-offset', -- It can be latest-offset / earliest-offset / specific-offsets / group-offsets / timestamp Any kind of 'properties.bootstrap.servers' = '10.0.0.29:9092', -- Replace with your Kafka Connection address 'properties.group.id' = 'testGroup', -- Required parameters , Be sure to designate Group ID 'format' = 'json', 'json.fail-on-missing-field' = 'false', -- If set to false, If you encounter a missing field, you will not report an error . 'json.ignore-parse-errors' = 'true' -- If set to true, Ignore any parsing errors . );
2. establish Sink
CREATE TABLE `jdbc_upsert_sink_table` (
win_start TIMESTAMP(3),
category_id INT,
buy_count INT,
PRIMARY KEY (win_start,category_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://10.0.0.236:5432/postgres?currentSchema=public&reWriteBatchedInserts=true', -- Please replace with your actual MySQL Connection parameters
'table-name' = 'oceanus_advanced4_output', -- Data table to write to
'username' = 'root', -- User name for database access ( Need to provide INSERT jurisdiction )
'password' = 'yourpassword', -- Password for database access
'sink.buffer-flush.max-rows' = '200', -- Number of batch outputs
'sink.buffer-flush.interval' = '2s' -- Interval for batch output
);3. Writing business SQL
-- Create a temporary view , Used to filter the original data 、 Window aggregation CREATE VIEW `kafka_json_source_view` AS SELECT TUMBLE_START(time_stamp,INTERVAL '1' MINUTE) AS win_start, category_id, COUNT(1) AS buy_count FROM `kafka_json_source_table` WHERE user_behavior = 'buy' GROUP BY TUMBLE(time_stamp,INTERVAL '1' MINUTE),category_id;
-- Count every minute Top3 Purchase type
INSERT INTO `jdbc_upsert_sink_table`
SELECT
b.win_start,
b.category_id,
CAST(b.buy_count AS INT) AS buy_count
FROM (SELECT *
,ROW_NUMBER() OVER (PARTITION BY win_start ORDER BY buy_count DESC) AS rn
FROM `kafka_json_source_view`
) b
WHERE b.rn <= 3;summary
This article USES the TUMBLE WINDOW coordination ROW_NUMBER function , Statistical analysis of the top three types of goods purchased per minute , The user can select the corresponding window function according to the actual demand to count the corresponding TopN.. For more use of window functions, please refer to Time window function [11].
The author will
rnFields andwin_endWrite after field clipping ( That is, no ranking optimization ), No... In usernIn the scene of , You need to be very careful about the primary key of the result table , If the definition is misunderstood, it will directly lead to TopN Is not accurate .
Reference link
[1] Oceanus Console :https://console.cloud.tencent.com/oceanus/overview
[2] Create an exclusive cluster :https://cloud.tencent.com/document/product/849/48298
[3] CKafka Console :https://console.cloud.tencent.com/ckafka/index?rid=1
[4] CKafka Create examples :https://cloud.tencent.com/document/product/597/54839
[5] Ckafka establish Topic:https://cloud.tencent.com/document/product/597/54854
[6] CKafka Introduction process guide :https://cloud.tencent.com/document/product/597/54837
[7] CKafka Send and receive messages :https://cloud.tencent.com/document/product/597/54834
[8] PostgreSQL Console :https://console.cloud.tencent.com/postgres/index
[9] establish PostgreSQL example :https://cloud.tencent.com/document/product/409/56961
[10] Connect PostgreSQL example :https://cloud.tencent.com/document/product/409/40429
[11] Time window function :https://cloud.tencent.com/document/product/849/18077
边栏推荐
- Start /affinity specifies the number of vcpu to run OpenSSL speed to test the performance of a single vcpu
- What are the processing methods for PPT pictures
- DM sub database and sub table DDL "optimistic coordination" mode introduction - tidb tool sharing
- Go language limits the number of goroutines
- How to make a label for an electric fan
- Code implementation of CAD drawing online web measurement tool (measuring distance, area, angle, etc.)
- [js] 生成随机数组
- Uniapp routing page Jump
- Outlook开机自启+关闭时最小化
- Explain the rainbow ingress universal domain name resolution mechanism
猜你喜欢

Selenium batch query athletes' technical grades

Facing the problem of lock waiting, how to realize the second level positioning and analysis of data warehouse

What are the main dimensions of PMO performance appraisal?

Selenium批量查询运动员技术等级

New SQL syntax quick manual!

Beitong G3 game console unpacking experience. It turns out that mobile game experts have achieved this

Outlook开机自启+关闭时最小化

微信小程序中发送网络请求
![Harmonyos application development -- mynotepad[memo][api v6] based on textfield and image pseudo rich text](/img/b1/71cc36c45102bdb9c06e099eb42267.jpg)
Harmonyos application development -- mynotepad[memo][api v6] based on textfield and image pseudo rich text

HDLBits-&gt;Circuits-&gt;Arithmetic Circuitd-&gt;3-bit binary adder
随机推荐
Initial experience of nodejs express framework
2021-12-22: palindrome substring. Give you a string s, please count and return
大一女生废话编程爆火!懂不懂编程的看完都拴Q了
Stm32 w5500 implements TCP, DHCP and web server
How to open a stock account? What are the main considerations for opening an account? Is there a security risk in opening an account online?
Ffmpeg for audio and video commands
【Proteus仿真】LCD1602+DS1307按键设置简易时钟
How to batch generate UPC-A codes
小程序ssl证书过期是什么原因导致的?小程序ssl证书到期了怎么解决?
Global and Chinese market of cloud billing services 2022-2028: Research Report on technology, participants, trends, market size and share
I am 30 years old, no longer young, and have nothing
How to calculate individual income tax? You know what?
How to view the hard disk of ECS? How about the speed and stability of the server
SAP retail transaction code mp38 can perform forecasts for multiple items
HDLBits-&gt; Circuits-&gt; Arithmetic Circuitd-&gt; 3-bit binary adder
[js] 生成随机数组
Freshman girls' nonsense programming is popular! Those who understand programming are tied with Q after reading
数据可视化之:没有西瓜的夏天不叫夏天
Teach you to turn the web page into Exe file (super simple)
Start /affinity specifies the number of vcpu to run OpenSSL speed to test the performance of a single vcpu