当前位置:网站首页>Application practice | massive data, second level analysis! Flink+doris build a real-time data warehouse scheme
Application practice | massive data, second level analysis! Flink+doris build a real-time data warehouse scheme
2022-06-24 21:52:00 【InfoQ】
Business background
Doris The basic principle

- Receive user connection request (MySQL Protocol layer )
- Metadata storage and management
- Query statement parsing and execution plan distribution
- Cluster management and control
- Data storage and management
- Query plan execution
Technology Architecture

- adopt FlinkCDC collection MySQL Binlog To Kafka Medium Topic1
- Development Flink Tasks consume the above Binlog Generate a wide table of related topics , write in Topic2
- To configure Doris Routine Load Mission , take Topic2 Data import Doris
Application practice
Build table
CREATE TABLE IF NOT EXISTS table_1
(
key1 varchar(32),
key2 varchar(32),
key3 varchar(32),
value1 int,
value2 varchar(128),
value3 Decimal(20, 6),
data_deal_datetime DateTime COMMENT ' Data processing time ',
data_status INT COMMENT ' Whether the data is deleted ,1 Is normal ,-1 Indicates that the data has been deleted '
)
ENGINE=OLAP
UNIQUE KEY(`key1`,`key2`,`key3`)
COMMENT "xxx"
DISTRIBUTED BY HASH(`key2`) BUCKETS 32
PROPERTIES (
"storage_type"="column",
"replication_num" = "3",
"function_column.sequence_type" = 'DateTime'
);
- data_deal_datetime Mainly the same key The judgment basis of data coverage in case of
- data_status It is compatible with the deletion of data by the business library
Data import task
CREATE ROUTINE LOAD database.table1 ON table1
COLUMNS(key1,key2,key3,value1,value2,value3,data_deal_datetime,data_status),
ORDER BY data_deal_datetime
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "10",
"max_batch_rows" = "500000",
"max_batch_size" = "209715200",
"format" = "json",
"json_root" = "$.data",
"jsonpaths"="[\"$.key1\",\"$.key2\",\"$.key3\",\"$.value1\",\"$.value2\",
\"$.value3\",\"$.data_deal_datetime\",\"$.data_status\"]"
)FROM KAFKA
(
"kafka_broker_list"="broker1_ip:port1,broker2_ip:port2,broker3_ip:port3",
"kafka_topic"="topic_name",
"property.group.id"="group_id",
"property.kafka_default_offsets"="OFFSET_BEGINNING"
);
- ORDER BY data_deal_datetime Express basis data_deal_datetime Field to overwrite key Same data
- desired_concurrent_number Indicates the desired concurrency .
- Maximum execution time of each subtask .
- Maximum number of rows read per subtask .
- Maximum number of bytes read per subtask .
Task monitoring and alarm
import pymysql # Import pymysql
import requests,json
# Open database connection
db= pymysql.connect(host="host",user="user",
password="passwd",db="database",port=port)
# Use cursor() Method get operation cursor
cur = db.cursor()
#1. Query operation
# To write sql Query statement
sql = "show routine load"
cur.execute(sql) # perform sql sentence
results = cur.fetchall() # Get all the records of the query
for row in results :
name = row[1]
state = row[7]
if state != 'RUNNING':
err_log_urls = row[16]
reason_state_changed = row[15]
msg = "doris The data import task is abnormal :\n name=%s \n state=%s \n reason_state_changed=%s \n err_log_urls=%s \n About to automatically resume , Please check the error message " % (name, state,
reason_state_changed, err_log_urls)
payload_message = {
"msg_type": "text",
"content": {
"text": msg
}
}
url = 'lark Call the police url'
s = json.dumps(payload_message)
r = requests.post(url, data=s)
cur.execute("resume routine load for " + name)
cur.close()
db.close()
Data model
Aggregate
CREATE TABLE tmp_table_1
(
user_id varchar(64) COMMENT " user id",
channel varchar(64) COMMENT " User source channel ",
city_code varchar(64) COMMENT " User's city code ",
last_visit_date DATETIME REPLACE DEFAULT "1970-01-01 00:00:00" COMMENT " Last time the user visited ",
total_cost BIGINT SUM DEFAULT "0" COMMENT " Total user consumption "
)
ENGINE=OLAP
AGGREGATE KEY(user_id, channel, city_code)
DISTRIBUTED BY HASH(user_id) BUCKETS 6
PROPERTIES("storage_type"="column","replication_num" = "1"):
insert into tmp_table_1 values('suh_001','JD','001','2022-01-01 00:00:01','57');
insert into tmp_table_1 values('suh_001','JD','001','2022-02-01 00:00:01','76');
insert into tmp_table_1 values('suh_001','JD','001','2022-03-01 00:00:01','107');

Unique Model
Duplicate Model
CREATE TABLE tmp_table_2
(
user_id varchar(64) COMMENT " user id",
channel varchar(64) COMMENT " User source channel ",
city_code varchar(64) COMMENT " User's city code ",
visit_date DATETIME COMMENT " User login time ",
cost BIGINT COMMENT " User consumption amount "
)
ENGINE=OLAP
DUPLICATE KEY(user_id, channel, city_code)
DISTRIBUTED BY HASH(user_id) BUCKETS 6
PROPERTIES("storage_type"="column","replication_num" = "1");
insert into tmp_table_2 values('suh_001','JD','001','2022-01-01 00:00:01','57');
insert into tmp_table_2 values('suh_001','JD','001','2022-02-01 00:00:01','76');
insert into tmp_table_2 values('suh_001','JD','001','2022-03-01 00:00:01','107');

Suggestions on the selection of data model
summary


边栏推荐
- Pattern recognition - 0 introduction
- 【产品设计研发协作工具】上海道宁为您提供蓝湖介绍、下载、试用、教程
- Tdengine can read and write through dataX
- Unity关于本地坐标和世界坐标之间的转换
- leetcode1720_2021-10-14
- 为什么生命科学企业都在陆续上云?
- 架构实战营 第 6 期 毕业总结
- C语言实现DNS请求器
- Memcached comprehensive analysis – 5 Memcached applications and compatible programs
- Graduation summary of phase 6 of the construction practice camp
猜你喜欢
Advanced secret of xtransfer technology newcomers: the treasure you can't miss mentor
【论】A deep-learning model for urban traffic flow prediction with traffic events mined from twitter
Fuzhou business office of Fujian development and Reform Commission visited the health department of Yurun university to guide and inspect the work
Implementing DNS requester with C language
VirtualBox虚拟机安装Win10企业版
【吴恩达笔记】多变量线性回归
SAP接口debug设置外部断点
2022 international women engineers' Day: Dyson design award shows women's design strength
66 pitfalls in go programming language: pitfalls and common errors of golang developers
手动事务的几个类
随机推荐
[product design and R & D collaboration tool] Shanghai daoning provides you with blue lake introduction, download, trial and tutorial
Slider controls the playback progress of animator animation
Blender FAQs
EditText 控制软键盘出现 搜索
Implementing DNS requester with C language
Memcached comprehensive analysis – 3 Deletion mechanism and development direction of memcached
如何化解35岁危机?华为云数据库首席架构师20年技术经验分享
TypeScript快速入门
煮茶论英雄!福建省发改委、市营商办领导一行莅临育润大健康事业部交流指导
Direct attack on "three summers" production: good harvest news spreads frequently and summer broadcasting is in full swing
应用实践 | 海量数据,秒级分析!Flink+Doris 构建实时数仓方案
Tso hardware sharding is a header copy problem
升哲科技 AI 智能防溺水服务上线
Make tea and talk about heroes! Leaders of Fujian Provincial Development and Reform Commission and Fujian municipal business office visited Yurun Health Division for exchange and guidance
队列实现原理和应用
LeetCode-513. Find the value in the lower left corner of the tree
Blender's simple skills - array, rotation, array and curve
Based on asp Net development of fixed assets management system source code enterprise fixed assets management system source code
Decoration home page custom full screen video playback effect GIF dynamic picture production video tutorial playback code operation settings full screen center Alibaba international station
【吴恩达笔记】多变量线性回归