当前位置:网站首页>Flink practical tutorial: advanced 4-window top n

Flink practical tutorial: advanced 4-window top n

2022-06-23 21:43:00 Wuyuntao

Flow calculation Oceanus brief introduction

Flow calculation Oceanus It is a powerful tool for real-time analysis of big data product ecosystem , Is based on Apache Flink Built with one-stop development 、 Seamless connection 、 Sub second delay 、 Low cost 、 Enterprise class real-time big data analysis platform with the characteristics of security and stability . Flow calculation Oceanus The goal is to maximize the value of enterprise data , Accelerate the construction process of real-time digitization of enterprises .

This article will show you how to use Flink Implement common TopN Statistical needs . use first Python Script simulation generates commodity purchase data ( Send one message every second ) And send it to CKafka, Later on Oceanus Platform creation Flink SQL Job real-time reading CKafka Commodity data in , Scroll through window ( Based on event time ) Count the top three categories of goods purchased per minute (Top3), Finally, store the results in PostgreSQL.

Flink Practical course : Advanced 4- window TOP N practice

Lead to

Create flow calculation Oceanus colony

In stream computing Oceanus Product activity page 1 Yuan purchase Oceanus colony .

Get into Oceanus Console [1], Click on the left side of the 【 Cluster management 】, Click on the top left 【 Create clusters 】, For details, please refer to Oceanus Official documents Create an exclusive cluster [2].

Create a message queue CKafka

Get into CKafka Console [3], Click on the top left corner 【 newly build 】, establish CKafka example , For details, please refer to CKafka Create examples [4]. Then click to enter the instance , single click 【topic management 】>【 newly build 】, Can finish Topic The creation of , For details, please refer to CKafka establish Topic [5].

Data preparation

This example uses Python Script to Topic Send analog data , Prerequisite: network interworking . Here we choose to be with CKafka Same as VPC Of CVM Get into , And install Python Environmental Science . If the network doesn't work , Can be found in CKafka In the example 【 essential information 】>【 Access mode 】>【 Add routing policy 】>【 Routing type 】 Choose from VPC The Internet or Public domain name access The way to get through the network , For details, please refer to CKafka Official website Introduction process guide [6].

#!/usr/bin/python3
#  Use this script for the first time , Need to be  "pip3 install kafka"  install kafka modular 
import json
import random
import time
from kafka import KafkaProducer
​
broker_lists = ['10.0.0.29:9092']
kafka_topic_oceanus = 'oceanus_advanced4_input'
​
producer = KafkaProducer(bootstrap_servers=broker_lists,
                         value_serializer=lambda m: json.dumps(m).encode('ascii'))
​
def send_data(topic):
    user_id = random.randint(1,50)
    item_id = random.randint(1,1000)
    category_id = random.randint(1,20)
    user_behaviors = ['pv','buy','cart','fav']
    current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
    msg = {
        'user_id':user_id,
        'item_id':item_id,
        'category_id':category_id,
        'user_behavior':user_behaviors[random.randint(0,len(user_behaviors)-1)],
        'time_stamp':current_time
    }
    producer.send(topic, msg)
    print(msg)
    producer.flush()
​
if __name__ == '__main__':
    count = 1
    while True:
        #  Send one piece of data per second 
        time.sleep(1)
        send_data(kafka_topic_oceanus)

For more access methods, please refer to CKafka Send and receive messages [7]

establish PostgreSQL example

Get into PostgreSQL Console [8], Click on the top left corner 【 newly build 】 Create examples , Specific reference establish PostgreSQL example [9]. Enter the instance database , establish oceanus_advanced4_output surface , For receiving data .

--  Create table statement 
create table public.oceanus_advanced4_output (
win_start     TIMESTAMP,
category_id   INT,
buy_count     INT,
PRIMARY KEY(win_start,category_id)  );

I use DBeaver Connect to the Internet , For more connection methods, please refer to the official website documentation Connect PostgreSQL example [10]

Flow calculation Oceanus Homework

1. establish Source

CREATE TABLE `kafka_json_source_table` (
  user_id        INT,
  item_id        INT,
  category_id    INT,
  user_behavior  VARCHAR,
  time_stamp     TIMESTAMP(3),
  WATERMARK FOR time_stamp AS time_stamp - INTERVAL '3' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'oceanus_advanced4_input',    --  Replace with what you want to consume  Topic
  'scan.startup.mode' = 'latest-offset', --  It can be  latest-offset / earliest-offset / specific-offsets / group-offsets / timestamp  Any kind of 
  'properties.bootstrap.servers' = '10.0.0.29:9092',  --  Replace with your  Kafka  Connection address 
  'properties.group.id' = 'testGroup',     --  Required parameters ,  Be sure to designate  Group ID
  'format' = 'json',
  'json.fail-on-missing-field' = 'false',  --  If set to  false,  If you encounter a missing field, you will not report an error .
  'json.ignore-parse-errors' = 'true'      --  If set to  true, Ignore any parsing errors .
);

2. establish Sink

CREATE TABLE `jdbc_upsert_sink_table` (
    win_start     TIMESTAMP(3),
    category_id   INT,
    buy_count     INT,
    PRIMARY KEY (win_start,category_id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:postgresql://10.0.0.236:5432/postgres?currentSchema=public&reWriteBatchedInserts=true',              --  Please replace with your actual  MySQL  Connection parameters 
    'table-name' = 'oceanus_advanced4_output', --  Data table to write to 
    'username' = 'root',                      --  User name for database access ( Need to provide  INSERT  jurisdiction )
    'password' = 'yourpassword',               --  Password for database access 
    'sink.buffer-flush.max-rows' = '200',     --  Number of batch outputs 
    'sink.buffer-flush.interval' = '2s'       --  Interval for batch output 
);

3. Writing business SQL

--  Create a temporary view , Used to filter the original data 、 Window aggregation 
CREATE VIEW `kafka_json_source_view` AS
SELECT
  TUMBLE_START(time_stamp,INTERVAL '1' MINUTE) AS win_start,
  category_id,
  COUNT(1) AS buy_count
FROM `kafka_json_source_table`
WHERE user_behavior = 'buy'
GROUP BY TUMBLE(time_stamp,INTERVAL '1' MINUTE),category_id;
--  Count every minute  Top3  Purchase type 
INSERT INTO `jdbc_upsert_sink_table`
SELECT
b.win_start,
b.category_id,
CAST(b.buy_count AS INT) AS buy_count
FROM (SELECT *
          ,ROW_NUMBER() OVER (PARTITION BY win_start ORDER BY buy_count DESC) AS rn
      FROM `kafka_json_source_view`
      ) b
WHERE b.rn <= 3;

summary

This article USES the TUMBLE WINDOW coordination ROW_NUMBER function , Statistical analysis of the top three types of goods purchased per minute , The user can select the corresponding window function according to the actual demand to count the corresponding TopN.. For more use of window functions, please refer to Time window function [11].

The author will rn Fields and win_end Write after field clipping ( That is, no ranking optimization ), No... In use rn In the scene of , You need to be very careful about the primary key of the result table , If the definition is misunderstood, it will directly lead to TopN Is not accurate .

Reference link

[1] Oceanus Console :https://console.cloud.tencent.com/oceanus/overview

[2] Create an exclusive cluster :https://cloud.tencent.com/document/product/849/48298

[3] CKafka Console :https://console.cloud.tencent.com/ckafka/index?rid=1

[4] CKafka Create examples :https://cloud.tencent.com/document/product/597/54839

[5] Ckafka establish Topic:https://cloud.tencent.com/document/product/597/54854

[6] CKafka Introduction process guide :https://cloud.tencent.com/document/product/597/54837

[7] CKafka Send and receive messages :https://cloud.tencent.com/document/product/597/54834

[8] PostgreSQL Console :https://console.cloud.tencent.com/postgres/index

[9] establish PostgreSQL example :https://cloud.tencent.com/document/product/409/56961

[10] Connect PostgreSQL example :https://cloud.tencent.com/document/product/409/40429

[11] Time window function :https://cloud.tencent.com/document/product/849/18077

原网站

版权声明
本文为[Wuyuntao]所创,转载请带上原文链接,感谢
https://yzsam.com/2021/12/202112211533080799.html