当前位置:网站首页>Application practice | massive data, second level analysis! Flink+doris build a real-time data warehouse scheme

Application practice | massive data, second level analysis! Flink+doris build a real-time data warehouse scheme

2022-06-24 21:52:00 InfoQ

Editor's recommendation : With the rapid development of lingchuang group , In order to meet the real-time report statistics and decision analysis of billions of data , Lingchuang group chose  Flink + Doris  Real time data warehouse scheme . This article introduces the practical process of this scheme in detail .

The following article is from lingchuang group Advance Group,  Author suhao
Link to the original text :
https://mp.weixin.qq.com/s/qg_4nsfo5gxwe8_1OiWVSA

Business background

Advance Intelligence Group( Lingchuang group ) Founded on  2016  year , It's a family  AI  Technology driven technology groups , Committed to localized application through scientific and technological innovation , Transform and reshape the financial and retail industries , Create a service for consumers with diversified business layout 、 The ecosystem of enterprises and merchants . The group consists of two parts: enterprise business and consumer business , Enterprise business includes  ADVANCE.AI  and  Ginee, They are banks 、 Finance 、 Financial technology 、 Retail and e-commerce industry customers provide services based on  AI  Digital authentication technology 、 Risk management products and omni channel e-commerce service solutions ; Consumer business  Atome Financial  Including Asia's leading pay after pay platform  Atome  And digital financial services .

2021  year  9  month , Lingchuang group announced the completion of super  4  Billion dollars  D  Round of funding , After the completion of financing, the valuation of lingchuang group has exceeded  20  Billion dollars , Become one of the largest independent technology start-ups in Singapore . Business coverage in Singapore 、 Indonesia 、 The Chinese mainland 、 India 、 Vietnam, etc  17  Countries and regions , Yes  15  More than 10000 merchants and  2000  Million consumers .

With the rapid development of the group's business , To meet the real-time report statistics and decision analysis of billions of data , We choose to be based on  Apache Flink + Apache Doris  The system scheme of real-time data warehouse is constructed .

Doris  The basic principle

Apache Doris  The basic architecture is very simple , Only  FE(Frontend)、BE(Backend)  Two characters , Does not rely on any external components , Very friendly to deployment and O & M . The architecture is as follows :

null
FE(Frontend) With  Java  Language first .
Main functional responsibilities :
  • Receive user connection request (MySQL  Protocol layer )
  • Metadata storage and management
  • Query statement parsing and execution plan distribution
  • Cluster management and control

FE  There are two main roles , One is  Follower, One more  Observer,Leader  It is a special election  Follower.Follower  It is mainly used to achieve high availability of metadata , Ensure single node downtime , Metadata can be recovered online in real time , Without affecting the whole service .
BE(Backend)  With  C++  Language first .
Main functional responsibilities :
  • Data storage and management
  • Query plan execution

Technology Architecture

The overall data link is shown in the following figure :

null
  • adopt  FlinkCDC  collection  MySQL Binlog  To  Kafka  Medium  Topic1
  • Development  Flink  Tasks consume the above  Binlog  Generate a wide table of related topics , write in  Topic2
  • To configure  Doris Routine Load  Mission , take  Topic2  Data import  Doris

Application practice

About the steps 1 And steps 2 Practice ,“
be based on  Flink-CDC  Data synchronization ⽅ case
”  It has been explained in the article of , This article will describe the steps 3 Expand the detailed description .

Build table

Because business data is often accompanied by  UPDATE,DELETE  Wait for the operation , In order to keep the data granularity of the real-time data warehouse consistent with the business database , So choose  Doris Unique  Model ( The data model is highlighted below ) The specific table creation statements are as follows :
CREATE TABLE IF NOT EXISTS table_1
(
key1 varchar(32),
key2 varchar(32),
key3 varchar(32),
value1 int,
value2 varchar(128),
value3 Decimal(20, 6),
data_deal_datetime DateTime COMMENT ' Data processing time ',
data_status INT COMMENT ' Whether the data is deleted ,1 Is normal ,-1 Indicates that the data has been deleted '

ENGINE=OLAP
UNIQUE KEY(`key1`,`key2`,`key3`)
COMMENT "xxx"
DISTRIBUTED BY HASH(`key2`) BUCKETS 32
PROPERTIES (
"storage_type"="column",
"replication_num" = "3",
"function_column.sequence_type" = 'DateTime'
);

You can see , There are two fields in the table structure  data_deal_datetime,data_status.
  • data_deal_datetime  Mainly the same  key  The judgment basis of data coverage in case of
  • data_status  It is compatible with the deletion of data by the business library

Data import task

Doris  Provides active pull  Kafka  The function of data , The configuration is as follows :
CREATE ROUTINE LOAD database.table1 ON table1
COLUMNS(key1,key2,key3,value1,value2,value3,data_deal_datetime,data_status),
ORDER BY data_deal_datetime
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "10",
"max_batch_rows" = "500000",
"max_batch_size" = "209715200",
"format" = "json",
"json_root" = "$.data",
"jsonpaths"="[\"$.key1\",\"$.key2\",\"$.key3\",\"$.value1\",\"$.value2\",
 \"$.value3\",\"$.data_deal_datetime\",\"$.data_status\"]"
)FROM KAFKA
(
"kafka_broker_list"="broker1_ip:port1,broker2_ip:port2,broker3_ip:port3",
"kafka_topic"="topic_name",
"property.group.id"="group_id",
"property.kafka_default_offsets"="OFFSET_BEGINNING"
);

In the import statement :
  • ORDER BY data_deal_datetime  Express basis  data_deal_datetime  Field to overwrite  key  Same data
  • desired_concurrent_number  Indicates the desired concurrency .
max_batch_interval/max_batch_rows/max_batch_size 
this  3  The parameters are represented respectively
  • Maximum execution time of each subtask .
  • Maximum number of rows read per subtask .
  • Maximum number of bytes read per subtask .

Task monitoring and alarm

Doris routine load  If dirty data is encountered, the task will be suspended , Therefore, it is necessary to regularly monitor the status of the data import task and automatically recover the failed task . And send the error message to the designated  lark  Group . The specific script is as follows :
import pymysql # Import  pymysql
import requests,json


# Open database connection
db= pymysql.connect(host="host",user="user",
 password="passwd",db="database",port=port)

#  Use cursor() Method get operation cursor
cur = db.cursor()

#1. Query operation
#  To write sql  Query statement  
sql = "show routine load"
cur.execute(sql) # perform sql sentence
results = cur.fetchall() # Get all the records of the query
for row in results :
 name = row[1]
 state = row[7]
 if state != 'RUNNING':
 err_log_urls = row[16]
 reason_state_changed = row[15]
 msg = "doris  The data import task is abnormal :\n name=%s \n state=%s \n reason_state_changed=%s \n err_log_urls=%s \n About to automatically resume , Please check the error message " % (name, state,
reason_state_changed, err_log_urls)
 payload_message = {
 "msg_type": "text",
 "content": {
 "text": msg
 }
}
 url = 'lark  Call the police url'
 s = json.dumps(payload_message)
 r = requests.post(url, data=s)
 cur.execute("resume routine load for " + name)

cur.close()
db.close()

Now online configuration monitoring  1  Once per minute , If a task is suspended , The import task will be resumed automatically , But the dirty data that causes the task to fail will be skipped , At this time, it is necessary to manually check the failure reason , Trigger the import of this data again after repair .

Data model

Doris  Internal table , There are mainly  3  A data model , Namely  Aggregate 、Unique 、Duplicate. Before introducing the data model , Explain first  Column: stay  Doris  in ,Column  It can be divided into two categories :Key  and  Value. From a business perspective ,Key  and  Value  Corresponding to dimension column and indicator column respectively .

Aggregate

Simply speaking ,Aggregate  A model is a prepolymerization model , Be similar to  MOLAP, By defining in advance  Key  Column and  Value  How columns are aggregated , When data is imported, the  Key  Column the same data according to  value  Columns are aggregated together , That is, the final exterior and interior  Key  Only one piece of the same data is reserved ,Value  Calculate according to the corresponding rules . Here's an example .

The table structure is as follows :
CREATE TABLE tmp_table_1
 (
 user_id varchar(64) COMMENT " user id",
 channel varchar(64) COMMENT " User source channel ",
 city_code varchar(64) COMMENT " User's city code ",
 last_visit_date DATETIME REPLACE DEFAULT "1970-01-01 00:00:00" COMMENT " Last time the user visited ",
 total_cost BIGINT SUM DEFAULT "0" COMMENT " Total user consumption "
 )
ENGINE=OLAP
AGGREGATE KEY(user_id, channel, city_code)
DISTRIBUTED BY HASH(user_id) BUCKETS 6
 PROPERTIES("storage_type"="column","replication_num" = "1"):

In the table structure ,Key  The columns are  user_id、channel、city_code ,Value  The column is  last_visit_date、total_cost, Their aggregation methods are  REPLACE、SUM.

Now? , Insert a batch of data into the table :
insert into tmp_table_1 values('suh_001','JD','001','2022-01-01 00:00:01','57');
insert into tmp_table_1 values('suh_001','JD','001','2022-02-01 00:00:01','76');
insert into tmp_table_1 values('suh_001','JD','001','2022-03-01 00:00:01','107');

According to our understanding , Now?  tmp_table_1  Although we inserted  3  Data , But this  3  Of data  Key  It's all consistent , Then there should be only one piece of data in the final table , also  last_visit_date  The value should be "2022-03-01 00:00:01",total_cost  The value should be  240. Let's verify :

null
You can see , The result is as we expected ⼀ Cause .

Unique  Model

Just like the real-time data warehouse constructed this time , We pay more attention to how to ensure the uniqueness of the primary key ⼀ sex , That is, how to get  Primary Key  only ⼀ Sexual constraints .⼤ Home can refer to ⾯ Examples of creating tables ⼦, Here ⾥ No more examples .

Duplicate  Model

In some multidimensional analysis scenarios , The data has no primary key , There are no aggregate requirements . So quote ⼊ Duplicate  The data model is full ⾜ This kind of demand . Illustrate with examples .

The table structure is as follows :
CREATE TABLE tmp_table_2
 (
 user_id varchar(64) COMMENT " user id",
 channel varchar(64) COMMENT " User source channel ",
 city_code varchar(64) COMMENT " User's city code ",
 visit_date DATETIME COMMENT " User login time ",
cost BIGINT COMMENT " User consumption amount "
 )
ENGINE=OLAP
DUPLICATE KEY(user_id, channel, city_code)
DISTRIBUTED BY HASH(user_id) BUCKETS 6
 PROPERTIES("storage_type"="column","replication_num" = "1");

insert data :
insert into tmp_table_2 values('suh_001','JD','001','2022-01-01 00:00:01','57');
insert into tmp_table_2 values('suh_001','JD','001','2022-02-01 00:00:01','76');
insert into tmp_table_2 values('suh_001','JD','001','2022-03-01 00:00:01','107');

Because the data is  Duplicate  Model , There will be no processing , The query should find  3  Data
null

Suggestions on the selection of data model

Because the data model has been determined when creating tables , And cannot be modified . therefore , Choosing an appropriate data model is very important .

Aggregate  The model can be pre aggregated , It greatly reduces the amount of data to be scanned and the amount of calculation of queries when aggregating queries , It is very suitable for report query scenarios with fixed patterns . But the model is right  count(*)  The query is very unfriendly . At the same time, because it is fixed  Value  The aggregation method on the column , When performing other types of aggregate queries , Semantic correctness needs to be considered .

Unique  The model is for scenarios that require unique primary key constraints , It can guarantee the uniqueness of the primary key , But you can't take advantage of  ROLLUP  Query advantages brought by pre aggregation .

Duplicate  Suitable for any dimension  Ad-hoc  Inquire about , Although it is also impossible to take advantage of the characteristics of prepolymerization , But not constrained by the aggregation model , It can give play to the advantages of the inventory model .

summary

Flink + Doris  After the built real-time data warehouse goes online , The corresponding speed of the report interface has been significantly improved , Single table  10  Billion level aggregate query response speed  TP95  by  0.79  second ,TP99  by  5.03  second . up to now , The whole digital warehouse system has been running smoothly  8  More than a month .

Welcome more open source technology enthusiasts to join us  Apache Doris  Community , Grow up hand in hand , Build community ecology .
null

SelectDB  Is an open source technology company , Committed to  Apache Doris  The community provides a full-time engineer 、 A team of product managers and support engineers , Prosper the open source community ecology , Create an international industry standard in the field of real-time analytical databases . be based on  Apache Doris  R & D of a new generation of cloud native real-time data warehouse  SelectDB, Running on multiple clouds , Provide users and customers with out of the box capability .

Related links :

SelectDB  Official website :
https://selectdb.com 
(We Are Coming Soon)

Apache Doris  Official website :
http://doris.apache.org

Apache Doris Github:
https://github.com/apache/doris

Apache Doris  Developer mail group :
[email protected]
原网站

版权声明
本文为[InfoQ]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/175/202206241841165142.html

随机推荐