当前位置:网站首页>应用实践 | 海量数据,秒级分析!Flink+Doris 构建实时数仓方案
应用实践 | 海量数据,秒级分析!Flink+Doris 构建实时数仓方案
2022-06-24 19:25:00 【InfoQ】
业务背景
Doris 基本原理

- 接收用户连接请求(MySQL 协议层)
- 元数据存储与管理
- 查询语句的解析与执行计划下发
- 集群管控
- 数据存储与管理
- 查询计划的执行
技术架构

- 通过 FlinkCDC 采集 MySQL Binlog 到 Kafka 中的 Topic1
- 开发 Flink 任务消费上述 Binlog 生成相关主题的宽表,写入 Topic2
- 配置 Doris Routine Load 任务,将 Topic2 的数据导入 Doris
应用实践
建表
CREATE TABLE IF NOT EXISTS table_1
(
key1 varchar(32),
key2 varchar(32),
key3 varchar(32),
value1 int,
value2 varchar(128),
value3 Decimal(20, 6),
data_deal_datetime DateTime COMMENT '数据处理时间',
data_status INT COMMENT '数据是否删除,1表示正常,-1表示数据已经删除'
)
ENGINE=OLAP
UNIQUE KEY(`key1`,`key2`,`key3`)
COMMENT "xxx"
DISTRIBUTED BY HASH(`key2`) BUCKETS 32
PROPERTIES (
"storage_type"="column",
"replication_num" = "3",
"function_column.sequence_type" = 'DateTime'
);
- data_deal_datetime 主要是相同 key 情况下数据覆盖的判断依据
- data_status 用来兼容业务库对数据的删除操作
数据导入任务
CREATE ROUTINE LOAD database.table1 ON table1
COLUMNS(key1,key2,key3,value1,value2,value3,data_deal_datetime,data_status),
ORDER BY data_deal_datetime
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "10",
"max_batch_rows" = "500000",
"max_batch_size" = "209715200",
"format" = "json",
"json_root" = "$.data",
"jsonpaths"="[\"$.key1\",\"$.key2\",\"$.key3\",\"$.value1\",\"$.value2\",
\"$.value3\",\"$.data_deal_datetime\",\"$.data_status\"]"
)FROM KAFKA
(
"kafka_broker_list"="broker1_ip:port1,broker2_ip:port2,broker3_ip:port3",
"kafka_topic"="topic_name",
"property.group.id"="group_id",
"property.kafka_default_offsets"="OFFSET_BEGINNING"
);
- ORDER BY data_deal_datetime 表示根据 data_deal_datetime 字段去覆盖 key 相同的数据
- desired_concurrent_number 表示期望的并发度。
- 每个子任务最大执行时间。
- 每个子任务最多读取的行数。
- 每个子任务最多读取的字节数。
任务监控与报警
import pymysql #导入 pymysql
import requests,json
#打开数据库连接
db= pymysql.connect(host="host",user="user",
password="passwd",db="database",port=port)
# 使用cursor()方法获取操作游标
cur = db.cursor()
#1.查询操作
# 编写sql 查询语句
sql = "show routine load"
cur.execute(sql) #执行sql语句
results = cur.fetchall() #获取查询的所有记录
for row in results :
name = row[1]
state = row[7]
if state != 'RUNNING':
err_log_urls = row[16]
reason_state_changed = row[15]
msg = "doris 数据导入任务异常:\n name=%s \n state=%s \n reason_state_changed=%s \n err_log_urls=%s \n即将自动恢复,请检查错误信息" % (name, state,
reason_state_changed, err_log_urls)
payload_message = {
"msg_type": "text",
"content": {
"text": msg
}
}
url = 'lark 报警url'
s = json.dumps(payload_message)
r = requests.post(url, data=s)
cur.execute("resume routine load for " + name)
cur.close()
db.close()
数据模型
Aggregate
CREATE TABLE tmp_table_1
(
user_id varchar(64) COMMENT "用户id",
channel varchar(64) COMMENT "用户来源渠道",
city_code varchar(64) COMMENT "用户所在城市编码",
last_visit_date DATETIME REPLACE DEFAULT "1970-01-01 00:00:00" COMMENT "用户最后一次访问时间",
total_cost BIGINT SUM DEFAULT "0" COMMENT "用户总消费"
)
ENGINE=OLAP
AGGREGATE KEY(user_id, channel, city_code)
DISTRIBUTED BY HASH(user_id) BUCKETS 6
PROPERTIES("storage_type"="column","replication_num" = "1"):
insert into tmp_table_1 values('suh_001','JD','001','2022-01-01 00:00:01','57');
insert into tmp_table_1 values('suh_001','JD','001','2022-02-01 00:00:01','76');
insert into tmp_table_1 values('suh_001','JD','001','2022-03-01 00:00:01','107');

Unique 模型
Duplicate 模型
CREATE TABLE tmp_table_2
(
user_id varchar(64) COMMENT "用户id",
channel varchar(64) COMMENT "用户来源渠道",
city_code varchar(64) COMMENT "用户所在城市编码",
visit_date DATETIME COMMENT "用户登陆时间",
cost BIGINT COMMENT "用户消费金额"
)
ENGINE=OLAP
DUPLICATE KEY(user_id, channel, city_code)
DISTRIBUTED BY HASH(user_id) BUCKETS 6
PROPERTIES("storage_type"="column","replication_num" = "1");
insert into tmp_table_2 values('suh_001','JD','001','2022-01-01 00:00:01','57');
insert into tmp_table_2 values('suh_001','JD','001','2022-02-01 00:00:01','76');
insert into tmp_table_2 values('suh_001','JD','001','2022-03-01 00:00:01','107');

数据模型的选择建议
总结


边栏推荐
- MySQL optimizes query speed
- Oauth2.0 introduction
- BPF_ PROG_ TYPE_ SOCKET_ Filter function implementation
- Li Kou daily question - day 25 -496 Next larger element I
- 介绍BootLoader、PM、kernel和系统开机的总体流程
- Advanced secret of xtransfer technology newcomers: the treasure you can't miss mentor
- Kernel Debugging Tricks
- Direct attack on "three summers" production: good harvest news spreads frequently and summer broadcasting is in full swing
- Alibaba cloud lightweight servers open designated ports
- 使用 Go 编程语言 66 个陷阱:Golang 开发者的陷阱和常见错误指北
猜你喜欢

socket done

【Camera基础(一)】Camera摄像头工作原理及整机架构

Data link layer & some other protocols or technologies

Oauth2.0 introduction

Volcano becomes spark default batch scheduler

Byte software testing basin friends, you can change jobs. Is this still the byte you are thinking about?

JMeter implementation specifies concurrent loop testing

C语言实现DNS请求器

Volcano成Spark默认batch调度器

ping: www.baidu. Com: unknown name or service
随机推荐
Network layer & IP
leetcode1863_2021-10-14
About transform InverseTransformPoint, transform. InverseTransofrmDirection
ping: www.baidu. Com: unknown name or service
Data link layer & some other protocols or technologies
123. 买卖股票的最佳时机 III
A field in the database is of JSON type and stores ["1", "2", "3"]
图的邻接表存储 数组实现
Slider controls the playback progress of animator animation
Distributed basic concepts
Debugging Analysis of Kernel panics and Kernel oopses using System Map
HCIA assessment
[cloud native learning notes] kubernetes Foundation
Functional analysis of ebpf tracepoint
Volcano成Spark默认batch调度器
使用Adb连接设备时提示设备无权限
Introduce the overall process of bootloader, PM, kernel and system startup
【Camera基础(一)】Camera摄像头工作原理及整机架构
leetcode_1365
WMI and PowerShell get TCP connection list