当前位置:网站首页>Nepal graph learning Chapter 3_ Multithreading completes 6000w+ relational data migration
Nepal graph learning Chapter 3_ Multithreading completes 6000w+ relational data migration
2022-06-26 03:42:00 【scl、】
background
nebula Support excel File data migration , therefore xxx The system can upload from MySQL Or exported by other tools excel File and then execute the mapping node 、 Relationship import . To decouple and improve the user experience , The process uses kafka Asynchronous completion .
There is no problem with the scenario of small amount of data , But once the amount of data is greater than 100w+, because excel Single page only supports 100w+ data , Such as 6000w+ Even more need to be split 60 Multiple excel Obviously cumbersome and unrealistic , Therefore, we need to implement a quick and convenient way to complete this small requirement .
Scene analysis
The whole migration process is roughly divided into three stages
1、 How to find out the data as soon as possible ? Multi thread data merging problem ?
2、 After the data is found, the format is transformed ?nebula How to handle special characters that are not supported ?
3、 How to save formatted data to nebula? How many groups should be inserted ? How to handle insertion failure ?
1、 Multi thread segmented data query
Single table 6000w+ Data query , Which is more efficient to get data by single thread or multi thread ? Intuition is multithreading , But querying the database is IO intensive , Multithreading is mainly stack pressing CPU promote CPU The effect of intensive tasks is obvious .
True knowledge comes from practice , Run through the native code . In order to prevent OOM, First, use the data volume of a single table 6538587 Test the query + conversion nGQL Format .
The idea and code of single thread query are not mentioned , The main idea is multithreading . The code is no longer posted .
// Is to open a thread pool , The number of core threads depends on the business scenario and the server cpu Number setting , What I set directly here is 20 individual .
// After that, the logic processing uses [CompleteableFuture Asynchronous orchestration tools API](https://www.liaoxuefeng.com/wiki/1252599548343744/1306581182447650) Complete the submission of multithreaded tasks supplyAsyc()
// Then you can chain handle exceptions 、nGQL Format processing 、 Result set merge .
// main Threads will be multiple task Put one in list Then traverse get() Block and wait for all thread tasks to be processed .
1.1limit Segmented query
Because the number of threads is dead 20 individual , Therefore, you can query the total amount of data in a single table first count.
then count % 20 Is there a remainder to determine the number of each group count % 20 == 0? count / 20 : count / 20 + 1;
According to id Segmentation is enough , If the table id It is a primary key and continuous , Use it directly where id <= maxPartId and id >= minPartId that will do , In order to prevent the occurrence of discontinuous key value tables , Use limit+offset To complete , Stick it down sql
<select id="selectProductBrandByInterval" resultType="xxx" parameterType="java.lang.Integer">
select
id as id,
sku_id as sku_id,
brand_id as brand_id
from xxx limit #{offset},#{limit}
</select>
test 6538587 Data multi-threaded query + Turn into nGQL Handle waste time
- // Single thread query 130s
- // 10 Thread queries 110s
- // 20 Thread queries 110 s
There are still some improvements in efficiency , And if you put more cpu The effect should be more obvious on the server . Look back sql What you write can still be optimized
know limit and offst principle Is to take all first , Then throw it to offset Front part , This follows offset Is too large , The more you throw away , Theoretical efficiency will also be lower .
Why? offset When it's too big limit Search will slow down ? It needs to understand limit How the operation works , Take the following query as an example :
select * from table_name limit 10000,10
This sentence SQL The logic of execution is
1. Read the... From the data table N Add a piece of data to the dataset
2. Repeat the first step until N = 10000 + 10
3. according to offset Abandon the front 10000 Number of pieces
4. Return the rest of 10 Data
obviously , Lead to this sentence SQL The problem of slow speed appears in the second step ! This front 10000 This data is completely meaningless to this query , But it takes up most of the query time ! How to solve ? First of all, we have to understand why the database queries like this .
author :jaren
link :https://www.jianshu.com/p/efecd0b66c55
source : Simple books
The copyright belongs to the author . Commercial reprint please contact the author for authorization , Non-commercial reprint please indicate the source .
1.2 Optimize limit Segmented query
When the offset offset When it's too big , Use limit The efficiency is not so high , It can be optimized .
(1) If id Bond disorder , You can use a parent query to in Replace with a join query inner join
(2) If id Bond ordering , have access to id>= 、limit
First look up the index column that needs data ( Assuming that id, Subquery because only id Field ,val Can go over the index . Indexes that do not require subqueries also need to be returned to the table .) Then find the required data through index column query .
Stick it down sql
# The parent query uses the join query inner join ,110s
<select id="selectProductBrandByIntervalImprove" resultType="xxx" parameterType="java.lang.Integer">
select
id as id,
sku_id as sku_id,
brand_id as brand_id
from xxx
inner join
( select id from xxx limit #{offset},#{limit}) b using (id)
</select>
# Father inquiry id>= Subquery
<select id="selectProductBrandByIntervalImprove" resultType="xxx" parameterType="java.lang.Integer">
select
id as id,
sku_id as sku_id,
brand_id as brand_id
from xxx
where id >=
( select id from xxx limit #{offset},1) limit #{limit}
</select>
efficiency , stay Navicat Ban cache Efficiency has improved a lot , however java There is little difference in the effect of running .
- // 20 Thread optimization limit( Parent query uses inner join) Inquire about —》110s
- // 20 Thread optimization limit( Father inquires id Orderly , direct >= Subquery )—》113 s
2、 Merge results to complete migration
Each thread task queries the data , Then convert to List<String> , It's important to note that nebula Substitution of special characters is not supported ( Like single quotes 、 comma 、 Escape slash 、 Both Chinese and English brackets should be replaced , And suggest nGQL Statement inserts a string using single quotes instead of double quotes , Prevent escape insert failure )
Every item It's just one. 1000 Bar record nGQL Insert group . Multithread merge , Need to use thread safe CopyOnWriteList aggregate . Of course. 600w+ The whole data is queried 、 conversion 、 No problem inserting , In the following single table 6000w+ data When something goes wrong .
Only some threads can query successfully , Probably 1000w+ data , And transform nGQL The process directly throws heap OOM abnormal .
Use jps、jmap、jconsole Analyze memory , Modify the default maximum heap memory from 4G Change it to -Xmx6044m, Find it can reach 2000w The left and right queries are OOM 了 , Therefore, when the memory condition of this machine is limited, we can only find another way .
Combined with the business, it can be inferred that , The Cenozoic is frequent GC, Memory is too high in the old days , Lead to OOM, In the old days, the memory fluctuated constantly, and the business analysis should be CopyOnWriteList Why , The query results of each thread should be summarized to CopyOnWriteList.
Preliminary analysis based on data , One Java Object The memory occupied should be 16Bytes, Add... In the object String Member attribute 5 individual , Share in 4Bytes * 5 + 16Bytes = 36Bytes.6000w+ An object needs :6000w+ * 36bytes / 1024 / 1024 = 2059M, Roughly speaking 2G many , And there's more List<String> The collection and CopyOnWriteList A large amount of memory is required for backup and replication of .
And the default. -Xmx1024 The heap memory specified by the parameter is only 1G, Obviously not enough , It happens all the time 400 many times young gc Objects are piled up to the old age .

Finally, rely on the basis id grouping , Each processing 1000w+ data , Run away together 6 Sub serial Only to solve .
Here's how to use JVM Some command steps of memory analysis tool .
Cumulative import quantity 
3、JVM Memory analysis
One 、 Tools
1、jps: see java Process number pid
2、jconsole: Visual interface , Look at the memory , Number of threads, etc .
3、jmap: Generate dump file
# Or it can be generated manually dump file , Use mat Analysis or online websites
# Get dump The next step in the file is to analyze , Because there is no JDK Environmental Science , Download the MAT The tool also reports an error .
# So you can use an online dump Analysis website :https://heaphero.io/index.jsp Or is it https://gceasy.io/index.jsp
jmap -dump:format=b,file=heap.dump 8544
Dumping heap to D:\myidea_projects\data-conversion\data-conversion\heap.bin ...
Heap dump file created [1141143851 bytes in 10.996 secs]
Two 、 Parameters
# 1、 Make it happen OOM Time to generate dump file
# Give Way JVM In case of OOM(OutOfMemoryError) Time generation Dump file
-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/path/heap/dump
# 2、 Print GC journal
-XX:+PrintGCDetails
边栏推荐
猜你喜欢
随机推荐
Question about SQL: SQL question -- SQL code for multiple account logins
Is the compass app regular? Is it safe or not
[paper notes] supersizing self supervision: learning to grasp from 50K tries and 700 robot hours
项目部署遇到的问题-生产环境
Partition, column, list
Qt编译出错ERROR: Unknown module(s) in QT: script
MySQL高级篇第一章(linux下安装MySQL)【下】
MySQL高級篇第一章(linux下安裝MySQL)【下】
优化——多目标规划
云计算基础-0
USB驱动-debug
Is it safe for Caicai securities to open an account in 2022?
You cannot call Glide. get() in registerComponents(), use the provided Glide instance instead
Insect structure and Deconstruction
【哈希表】很简单的拉链法哈希结构,以至于效果太差,冲突太多,链表太长
Qt 中 deleteLater 使用总结
分割、柱子、list
小程序或者for循序要不要加key?
Qixia fire department carries out fire safety training on construction site
请求对象,发送请求








