当前位置:网站首页>应用实践 | 海量数据,秒级分析!Flink+Doris 构建实时数仓方案
应用实践 | 海量数据,秒级分析!Flink+Doris 构建实时数仓方案
2022-06-24 19:25:00 【InfoQ】
业务背景
Doris 基本原理

- 接收用户连接请求(MySQL 协议层)
- 元数据存储与管理
- 查询语句的解析与执行计划下发
- 集群管控
- 数据存储与管理
- 查询计划的执行
技术架构

- 通过 FlinkCDC 采集 MySQL Binlog 到 Kafka 中的 Topic1
- 开发 Flink 任务消费上述 Binlog 生成相关主题的宽表,写入 Topic2
- 配置 Doris Routine Load 任务,将 Topic2 的数据导入 Doris
应用实践
建表
CREATE TABLE IF NOT EXISTS table_1
(
key1 varchar(32),
key2 varchar(32),
key3 varchar(32),
value1 int,
value2 varchar(128),
value3 Decimal(20, 6),
data_deal_datetime DateTime COMMENT '数据处理时间',
data_status INT COMMENT '数据是否删除,1表示正常,-1表示数据已经删除'
)
ENGINE=OLAP
UNIQUE KEY(`key1`,`key2`,`key3`)
COMMENT "xxx"
DISTRIBUTED BY HASH(`key2`) BUCKETS 32
PROPERTIES (
"storage_type"="column",
"replication_num" = "3",
"function_column.sequence_type" = 'DateTime'
);
- data_deal_datetime 主要是相同 key 情况下数据覆盖的判断依据
- data_status 用来兼容业务库对数据的删除操作
数据导入任务
CREATE ROUTINE LOAD database.table1 ON table1
COLUMNS(key1,key2,key3,value1,value2,value3,data_deal_datetime,data_status),
ORDER BY data_deal_datetime
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "10",
"max_batch_rows" = "500000",
"max_batch_size" = "209715200",
"format" = "json",
"json_root" = "$.data",
"jsonpaths"="[\"$.key1\",\"$.key2\",\"$.key3\",\"$.value1\",\"$.value2\",
\"$.value3\",\"$.data_deal_datetime\",\"$.data_status\"]"
)FROM KAFKA
(
"kafka_broker_list"="broker1_ip:port1,broker2_ip:port2,broker3_ip:port3",
"kafka_topic"="topic_name",
"property.group.id"="group_id",
"property.kafka_default_offsets"="OFFSET_BEGINNING"
);
- ORDER BY data_deal_datetime 表示根据 data_deal_datetime 字段去覆盖 key 相同的数据
- desired_concurrent_number 表示期望的并发度。
- 每个子任务最大执行时间。
- 每个子任务最多读取的行数。
- 每个子任务最多读取的字节数。
任务监控与报警
import pymysql #导入 pymysql
import requests,json
#打开数据库连接
db= pymysql.connect(host="host",user="user",
password="passwd",db="database",port=port)
# 使用cursor()方法获取操作游标
cur = db.cursor()
#1.查询操作
# 编写sql 查询语句
sql = "show routine load"
cur.execute(sql) #执行sql语句
results = cur.fetchall() #获取查询的所有记录
for row in results :
name = row[1]
state = row[7]
if state != 'RUNNING':
err_log_urls = row[16]
reason_state_changed = row[15]
msg = "doris 数据导入任务异常:\n name=%s \n state=%s \n reason_state_changed=%s \n err_log_urls=%s \n即将自动恢复,请检查错误信息" % (name, state,
reason_state_changed, err_log_urls)
payload_message = {
"msg_type": "text",
"content": {
"text": msg
}
}
url = 'lark 报警url'
s = json.dumps(payload_message)
r = requests.post(url, data=s)
cur.execute("resume routine load for " + name)
cur.close()
db.close()
数据模型
Aggregate
CREATE TABLE tmp_table_1
(
user_id varchar(64) COMMENT "用户id",
channel varchar(64) COMMENT "用户来源渠道",
city_code varchar(64) COMMENT "用户所在城市编码",
last_visit_date DATETIME REPLACE DEFAULT "1970-01-01 00:00:00" COMMENT "用户最后一次访问时间",
total_cost BIGINT SUM DEFAULT "0" COMMENT "用户总消费"
)
ENGINE=OLAP
AGGREGATE KEY(user_id, channel, city_code)
DISTRIBUTED BY HASH(user_id) BUCKETS 6
PROPERTIES("storage_type"="column","replication_num" = "1"):
insert into tmp_table_1 values('suh_001','JD','001','2022-01-01 00:00:01','57');
insert into tmp_table_1 values('suh_001','JD','001','2022-02-01 00:00:01','76');
insert into tmp_table_1 values('suh_001','JD','001','2022-03-01 00:00:01','107');

Unique 模型
Duplicate 模型
CREATE TABLE tmp_table_2
(
user_id varchar(64) COMMENT "用户id",
channel varchar(64) COMMENT "用户来源渠道",
city_code varchar(64) COMMENT "用户所在城市编码",
visit_date DATETIME COMMENT "用户登陆时间",
cost BIGINT COMMENT "用户消费金额"
)
ENGINE=OLAP
DUPLICATE KEY(user_id, channel, city_code)
DISTRIBUTED BY HASH(user_id) BUCKETS 6
PROPERTIES("storage_type"="column","replication_num" = "1");
insert into tmp_table_2 values('suh_001','JD','001','2022-01-01 00:00:01','57');
insert into tmp_table_2 values('suh_001','JD','001','2022-02-01 00:00:01','76');
insert into tmp_table_2 values('suh_001','JD','001','2022-03-01 00:00:01','107');

数据模型的选择建议
总结


边栏推荐
- leetcode_1470_2021.10.12
- OSI and tcp/ip model
- [cloud native learning notes] learn about kubernetes configuration list yaml file
- [cloud native learning notes] learn about kubernetes' pod
- Memcached comprehensive analysis – 3 Deletion mechanism and development direction of memcached
- leetcode1720_2021-10-14
- 01---两列波在相遇处发生干涉的条件
- 66 pitfalls in go programming language: pitfalls and common errors of golang developers
- 虚拟机CentOS7中无图形界面安装Oracle(保姆级安装)
- Implementing DNS requester with C language
猜你喜欢

Memcached comprehensive analysis – 5 Memcached applications and compatible programs

memcached全面剖析–2. 理解memcached的內存存儲

OSI and tcp/ip model

Installing Oracle without graphical interface in virtual machine centos7 (nanny level installation)

Alibaba cloud lightweight servers open designated ports

Make tea and talk about heroes! Leaders of Fujian Provincial Development and Reform Commission and Fujian municipal business office visited Yurun Health Division for exchange and guidance

使用 Go 编程语言 66 个陷阱:Golang 开发者的陷阱和常见错误指北

memcached全面剖析–5. memcached的应用和兼容程序

2022国际女性工程师日:戴森设计大奖彰显女性设计实力

ping: www.baidu. Com: unknown name or service
随机推荐
Volcano becomes spark default batch scheduler
Docking of arkit and character creator animation curves
力扣每日一题-第25天-496.下一个更大元素Ⅰ
图的邻接表存储 数组实现
Blender's landscape
EditText 控制软键盘出现 搜索
Volcano成Spark默认batch调度器
Decoration home page custom full screen video playback effect GIF dynamic picture production video tutorial playback code operation settings full screen center Alibaba international station
Fuzhou business office of Fujian development and Reform Commission visited the health department of Yurun university to guide and inspect the work
[camera Foundation (I)] working principle and overall structure of camera
福建省发改委福州市营商办莅临育润大健康事业部指导视察工作
【产品设计研发协作工具】上海道宁为您提供蓝湖介绍、下载、试用、教程
What does CTO (technical director) usually do?
JMeter implementation specifies concurrent loop testing
02---纵波不可能产生的现象
Failed to open after installing Charles without any prompt
socket(2)
推荐模型之多任务模型:ESMM、MMOE
关于Unity中的transform.InverseTransformPoint, transform.InverseTransofrmDirection
升哲科技 AI 智能防溺水服务上线