当前位置:网站首页>Application practice | massive data, second level analysis! Flink+doris build a real-time data warehouse scheme
Application practice | massive data, second level analysis! Flink+doris build a real-time data warehouse scheme
2022-06-24 19:26:00 【SelectDB】
Editor's recommendation : With the rapid development of lingchuang group , In order to meet the real-time report statistics and decision analysis of billions of data , Lingchuang group chose Flink + Doris Real time data warehouse scheme . This article introduces the practical process of this scheme in detail .
The following article is from lingchuang group Advance Group, Author suhao
Link to the original text :https://mp.weixin.qq.com/s/qg_4nsfo5gxwe8_1OiWVSA
Business background
Advance Intelligence Group( Lingchuang group ) Founded on 2016 year , It's a family AI Technology driven technology groups , Committed to localized application through scientific and technological innovation , Transform and reshape the financial and retail industries , Create a service for consumers with diversified business layout 、 The ecosystem of enterprises and merchants . The group consists of two parts: enterprise business and consumer business , Enterprise business includes ADVANCE.AI and Ginee, They are banks 、 Finance 、 Financial technology 、 Retail and e-commerce industry customers provide services based on AI Digital authentication technology 、 Risk management products and omni channel e-commerce service solutions ; Consumer business Atome Financial Including Asia's leading pay after pay platform Atome And digital financial services .
2021 year 9 month , Lingchuang group announced the completion of super 4 Billion dollars D Round of funding , After the completion of financing, the valuation of lingchuang group has exceeded 20 Billion dollars , Become one of the largest independent technology start-ups in Singapore . Business coverage in Singapore 、 Indonesia 、 The Chinese mainland 、 India 、 Vietnam, etc 17 Countries and regions , Yes 15 More than 10000 merchants and 2000 Million consumers .
With the rapid development of the group's business , To meet the real-time report statistics and decision analysis of billions of data , We choose to be based on Apache Flink + Apache Doris The system scheme of real-time data warehouse is constructed .
Doris The basic principle
Apache Doris The basic architecture is very simple , Only FE(Frontend)、BE(Backend) Two characters , Does not rely on any external components , Very friendly to deployment and O & M . The architecture is as follows :

FE(Frontend) With Java Language first .
Main functional responsibilities :
- Receive user connection request (MySQL Protocol layer )
- Metadata storage and management
- Query statement parsing and execution plan distribution
- Cluster management and control
FE There are two main roles , One is Follower, One more Observer,Leader It is a special election Follower.Follower It is mainly used to achieve high availability of metadata , Ensure single node downtime , Metadata can be recovered online in real time , Without affecting the whole service .
BE(Backend) With C++ Language first .
Main functional responsibilities :
- Data storage and management
- Query plan execution
Technology Architecture
The overall data link is shown in the following figure :

- adopt FlinkCDC collection MySQL Binlog To Kafka Medium Topic1
- Development Flink Tasks consume the above Binlog Generate a wide table of related topics , write in Topic2
- To configure Doris Routine Load Mission , take Topic2 Data import Doris
Application practice
About the steps 1 And steps 2 Practice ,“ be based on Flink-CDC Data synchronization ⽅ case ” It has been explained in the article of , This article will describe the steps 3 Expand the detailed description .
Build table
Because business data is often accompanied by UPDATE,DELETE Wait for the operation , In order to keep the data granularity of the real-time data warehouse consistent with the business database , So choose Doris Unique Model ( The data model is highlighted below ) The specific table creation statements are as follows :
CREATE TABLE IF NOT EXISTS table_1(key1 varchar(32),key2 varchar(32),key3 varchar(32),value1 int,value2 varchar(128),value3 Decimal(20, 6),data_deal_datetime DateTime COMMENT ' Data processing time ',data_status INT COMMENT ' Whether the data is deleted ,1 Is normal ,-1 Indicates that the data has been deleted ') ENGINE=OLAPUNIQUE KEY(`key1`,`key2`,`key3`)COMMENT "xxx"DISTRIBUTED BY HASH(`key2`) BUCKETS 32PROPERTIES ("storage_type"="column","replication_num" = "3","function_column.sequence_type" = 'DateTime');You can see , There are two fields in the table structure data_deal_datetime,data_status.
- data_deal_datetime Mainly the same key The judgment basis of data coverage in case of
- data_status It is compatible with the deletion of data by the business library
Data import task
Doris Provides active pull Kafka The function of data , The configuration is as follows :
CREATE ROUTINE LOAD database.table1 ON table1COLUMNS(key1,key2,key3,value1,value2,value3,data_deal_datetime,data_status),ORDER BY data_deal_datetimePROPERTIES("desired_concurrent_number"="3","max_batch_interval" = "10","max_batch_rows" = "500000","max_batch_size" = "209715200","format" = "json","json_root" = "$.data","jsonpaths"="["$.key1","$.key2","$.key3","$.value1","$.value2", "$.value3","$.data_deal_datetime","$.data_status"]")FROM KAFKA("kafka_broker_list"="broker1_ip:port1,broker2_ip:port2,broker3_ip:port3","kafka_topic"="topic_name","property.group.id"="group_id","property.kafka_default_offsets"="OFFSET_BEGINNING");In the import statement :
- ORDER BY data_deal_datetime Express basis data_deal_datetime Field to overwrite key Same data
- desired_concurrent_number Indicates the desired concurrency .
max_batch_interval/max_batch_rows/max_batch_size this 3 The parameters are represented respectively :
- Maximum execution time of each subtask .
- Maximum number of rows read per subtask .
- Maximum number of bytes read per subtask .
Task monitoring and alarm
Doris routine load If dirty data is encountered, the task will be suspended , Therefore, it is necessary to regularly monitor the status of the data import task and automatically recover the failed task . And send the error message to the designated lark Group . The specific script is as follows :
import pymysql # Import pymysqlimport requests,json# Open database connection db= pymysql.connect(host="host",user="user", password="passwd",db="database",port=port)# Use cursor() Method get operation cursor cur = db.cursor()#1. Query operation # To write sql Query statement sql = "show routine load"cur.execute(sql) # perform sql sentence results = cur.fetchall() # Get all the records of the query for row in results : name = row[1] state = row[7] if state != 'RUNNING': err_log_urls = row[16] reason_state_changed = row[15] msg = "doris The data import task is abnormal :\n name=%s \n state=%s \n reason_state_changed=%s \n err_log_urls=%s \n About to automatically resume , Please check the error message " % (name, state,reason_state_changed, err_log_urls) payload_message = { "msg_type": "text", "content": { "text": msg }} url = 'lark Call the police url' s = json.dumps(payload_message) r = requests.post(url, data=s) cur.execute("resume routine load for " + name)cur.close()db.close()Now online configuration monitoring 1 Once per minute , If a task is suspended , The import task will be resumed automatically , But the dirty data that causes the task to fail will be skipped , At this time, it is necessary to manually check the failure reason , Trigger the import of this data again after repair .
Data model
Doris Internal table , There are mainly 3 A data model , Namely Aggregate 、Unique 、Duplicate. Before introducing the data model , Explain first Column: stay Doris in ,Column It can be divided into two categories :Key and Value. From a business perspective ,Key and Value Corresponding to dimension column and indicator column respectively .
Aggregate
Simply speaking ,Aggregate A model is a prepolymerization model , Be similar to MOLAP, By defining in advance Key Column and Value How columns are aggregated , When data is imported, the Key Column the same data according to value Columns are aggregated together , That is, the final exterior and interior Key Only one piece of the same data is reserved ,Value Calculate according to the corresponding rules . Here's an example .
The table structure is as follows :
CREATE TABLE tmp_table_1 ( user_id varchar(64) COMMENT " user id", channel varchar(64) COMMENT " User source channel ", city_code varchar(64) COMMENT " User's city code ", last_visit_date DATETIME REPLACE DEFAULT "1970-01-01 00:00:00" COMMENT " Last time the user visited ", total_cost BIGINT SUM DEFAULT "0" COMMENT " Total user consumption " )ENGINE=OLAPAGGREGATE KEY(user_id, channel, city_code)DISTRIBUTED BY HASH(user_id) BUCKETS 6 PROPERTIES("storage_type"="column","replication_num" = "1"):In the table structure ,Key The columns are user_id、channel、city_code ,Value The column is last_visit_date、total_cost, Their aggregation methods are REPLACE、SUM.
Now? , Insert a batch of data into the table :
insert into tmp_table_1 values('suh_001','JD','001','2022-01-01 00:00:01','57');insert into tmp_table_1 values('suh_001','JD','001','2022-02-01 00:00:01','76');insert into tmp_table_1 values('suh_001','JD','001','2022-03-01 00:00:01','107');According to our understanding , Now? tmp_table_1 Although we inserted 3 Data , But this 3 Of data Key It's all consistent , Then there should be only one piece of data in the final table , also last_visit_date The value should be "2022-03-01 00:00:01",total_cost The value should be 240. Let's verify :

You can see , The result is as we expected ⼀ Cause .
Unique Model
Just like the real-time data warehouse constructed this time , We pay more attention to how to ensure the uniqueness of the primary key ⼀ sex , That is, how to get Primary Key only ⼀ Sexual constraints .⼤ Home can refer to ⾯ Examples of creating tables ⼦, Here ⾥ No more examples .
Duplicate Model
In some multidimensional analysis scenarios , The data has no primary key , There are no aggregate requirements . So quote ⼊ Duplicate The data model is full ⾜ This kind of demand . Illustrate with examples .
The table structure is as follows :
CREATE TABLE tmp_table_2 ( user_id varchar(64) COMMENT " user id", channel varchar(64) COMMENT " User source channel ", city_code varchar(64) COMMENT " User's city code ", visit_date DATETIME COMMENT " User login time ",cost BIGINT COMMENT " User consumption amount " )ENGINE=OLAPDUPLICATE KEY(user_id, channel, city_code)DISTRIBUTED BY HASH(user_id) BUCKETS 6 PROPERTIES("storage_type"="column","replication_num" = "1");insert data :
insert into tmp_table_2 values('suh_001','JD','001','2022-01-01 00:00:01','57');insert into tmp_table_2 values('suh_001','JD','001','2022-02-01 00:00:01','76');insert into tmp_table_2 values('suh_001','JD','001','2022-03-01 00:00:01','107');Because the data is Duplicate Model , There will be no processing , The query should find 3 Data

Suggestions on the selection of data model
Because the data model has been determined when creating tables , And cannot be modified . therefore , Choosing an appropriate data model is very important .
Aggregate The model can be pre aggregated , It greatly reduces the amount of data to be scanned and the amount of calculation of queries when aggregating queries , It is very suitable for report query scenarios with fixed patterns . But the model is right count(*) The query is very unfriendly . At the same time, because it is fixed Value The aggregation method on the column , When performing other types of aggregate queries , Semantic correctness needs to be considered .
Unique The model is for scenarios that require unique primary key constraints , It can guarantee the uniqueness of the primary key , But you can't take advantage of ROLLUP Query advantages brought by pre aggregation .
Duplicate Suitable for any dimension Ad-hoc Inquire about , Although it is also impossible to take advantage of the characteristics of prepolymerization , But not constrained by the aggregation model , It can give play to the advantages of the inventory model .
summary
Flink + Doris After the built real-time data warehouse goes online , The corresponding speed of the report interface has been significantly improved , Single table 10 Billion level aggregate query response speed TP95 by 0.79 second ,TP99 by 5.03 second . up to now , The whole digital warehouse system has been running smoothly 8 More than a month .
Welcome more open source technology enthusiasts to join us Apache Doris Community , Grow up hand in hand , Build community ecology .



SelectDB Is an open source technology company , Committed to Apache Doris The community provides a full-time engineer 、 A team of product managers and support engineers , Prosper the open source community ecology , Create an international industry standard in the field of real-time analytical databases . be based on Apache Doris R & D of a new generation of cloud native real-time data warehouse SelectDB, Running on multiple clouds , Provide users and customers with out of the box capability .
Related links :
SelectDB Official website :
https://selectdb.com (We Are Coming Soon)
Apache Doris Official website :
Apache Doris Github:
https://github.com/apache/doris
Apache Doris Developer mail group :

边栏推荐
- Ls common parameters
- 智能合约安全审计入门篇 —— delegatecall (2)
- STM32 uses time delay to realize breathing lamp register version
- Mq-2 smoke concentration sensor (STM32F103)
- Would you like to ask whether the same multiple tasks of the PgSQL CDC account will have an impact? I now have only one of the three tasks
- 特尔携手微软发挥边云协同势能,推动AI规模化部署
- Multi segment curve temperature control FB (SCL program) of PLC function block series
- Does finkcdc support sqlserver2008?
- Development of NFT dual currency pledge liquidity mining system
- Understanding openstack network
猜你喜欢

【Go语言刷题篇】Go从0到入门4:切片的高级用法、初级复习与Map入门学习

通过SCCM SQL生成计算机上一次登录用户账户报告

LCD1602 string display (STM32F103)

Intel and Microsoft give full play to the potential energy of edge cloud collaboration to promote the large-scale deployment of AI

一次 MySQL 误操作导致的事故,高可用都不顶不住!

Why is nodejs so fast?

Using alicloud RDS for SQL Server Performance insight to optimize database load - first understanding of performance insight

Internet of things? Come and see Arduino on the cloud

Experience of MDM master data project implementation for manufacturing projects

R language 4.1.0 software installation package and installation tutorial
随机推荐
华为机器学习服务语音识别功能,让应用绘“声”绘色
mysql binlog 数据源配置文档麻烦分享一下
Why are life science enterprises on the cloud in succession?
NFT质押流动性挖矿系统开发技术
Pingcap was selected as the "voice of customers" of Gartner cloud database in 2022, and won the highest score of "outstanding performer"
[computer talk club] Lecture 3: how to raise key issues?
NFT双币质押流动性挖矿系统开发
thinkphp6中怎么使用jwt认证
Volcano becomes spark default batch scheduler
Unityshader world coordinates do not change with the model
Unity mobile game performance optimization spectrum CPU time-consuming optimization divided by engine modules
Xiaobai, let me ask you guys, is MySQL binlog extracted by CDC in strict order
Real time rendering: the difference between real-time, offline, cloud rendering and hybrid rendering
Drawing DEM with GEE gracefully
three. Basic framework created by JS
A detailed explanation of the implementation principle of go Distributed Link Tracking
Multi segment curve temperature control FB (SCL program) of PLC function block series
Volcano devient l'ordonnanceur de lots par défaut Spark
Game between apifox and other interface development tools
我链接mysql 报这个错 是啥意思呀?