当前位置:网站首页>Nepal graph learning Chapter 3_ Multithreading completes 6000w+ relational data migration

Nepal graph learning Chapter 3_ Multithreading completes 6000w+ relational data migration

2022-06-26 03:42:00 scl、

background

nebula Support excel File data migration , therefore xxx The system can upload from MySQL Or exported by other tools excel File and then execute the mapping node 、 Relationship import . To decouple and improve the user experience , The process uses kafka Asynchronous completion .

There is no problem with the scenario of small amount of data , But once the amount of data is greater than 100w+, because excel Single page only supports 100w+ data , Such as 6000w+ Even more need to be split 60 Multiple excel Obviously cumbersome and unrealistic , Therefore, we need to implement a quick and convenient way to complete this small requirement .

Scene analysis

The whole migration process is roughly divided into three stages

1、 How to find out the data as soon as possible ? Multi thread data merging problem ?
2、 After the data is found, the format is transformed ?nebula How to handle special characters that are not supported ?
3、 How to save formatted data to nebula? How many groups should be inserted ? How to handle insertion failure ?

1、 Multi thread segmented data query

Single table 6000w+ Data query , Which is more efficient to get data by single thread or multi thread ? Intuition is multithreading , But querying the database is IO intensive , Multithreading is mainly stack pressing CPU promote CPU The effect of intensive tasks is obvious .

True knowledge comes from practice , Run through the native code . In order to prevent OOM, First, use the data volume of a single table 6538587 Test the query + conversion nGQL Format .

The idea and code of single thread query are not mentioned , The main idea is multithreading . The code is no longer posted .

//  Is to open a thread pool , The number of core threads depends on the business scenario and the server cpu Number setting , What I set directly here is 20 individual .

//  After that, the logic processing uses [CompleteableFuture Asynchronous orchestration tools API](https://www.liaoxuefeng.com/wiki/1252599548343744/1306581182447650) Complete the submission of multithreaded tasks supplyAsyc()

//  Then you can chain handle exceptions 、nGQL Format processing 、 Result set merge .

// main Threads will be multiple task Put one in list Then traverse get() Block and wait for all thread tasks to be processed .
1.1limit Segmented query

Because the number of threads is dead 20 individual , Therefore, you can query the total amount of data in a single table first count.

then count % 20 Is there a remainder to determine the number of each group count % 20 == 0? count / 20 : count / 20 + 1;

According to id Segmentation is enough , If the table id It is a primary key and continuous , Use it directly where id <= maxPartId and id >= minPartId that will do , In order to prevent the occurrence of discontinuous key value tables , Use limit+offset To complete , Stick it down sql

<select id="selectProductBrandByInterval" resultType="xxx" parameterType="java.lang.Integer">
        select
        id as id,
        sku_id as sku_id,
        brand_id as brand_id
        from xxx limit  #{offset},#{limit}
    </select>

test 6538587 Data multi-threaded query + Turn into nGQL Handle waste time

  • // Single thread query 130s
  • // 10 Thread queries 110s
  • // 20 Thread queries 110 s

There are still some improvements in efficiency , And if you put more cpu The effect should be more obvious on the server . Look back sql What you write can still be optimized

know limit and offst principle Is to take all first , Then throw it to offset Front part , This follows offset Is too large , The more you throw away , Theoretical efficiency will also be lower .

 Why?  offset  When it's too big  limit  Search will slow down ? It needs to understand  limit  How the operation works , Take the following query as an example :

select * from table_name limit 10000,10
 This sentence  SQL  The logic of execution is 
1. Read the... From the data table N Add a piece of data to the dataset 
2. Repeat the first step until  N = 10000 + 10
3. according to  offset  Abandon the front  10000  Number of pieces 
4. Return the rest of  10  Data 

 obviously , Lead to this sentence  SQL  The problem of slow speed appears in the second step ! This front  10000  This data is completely meaningless to this query , But it takes up most of the query time ! How to solve ? First of all, we have to understand why the database queries like this .

 author :jaren
 link :https://www.jianshu.com/p/efecd0b66c55
 source : Simple books 
 The copyright belongs to the author . Commercial reprint please contact the author for authorization , Non-commercial reprint please indicate the source .
1.2 Optimize limit Segmented query

When the offset offset When it's too big , Use limit The efficiency is not so high , It can be optimized .

(1) If id Bond disorder , You can use a parent query to in Replace with a join query inner join
(2) If id Bond ordering , have access to id>= 、limit

First look up the index column that needs data ( Assuming that id, Subquery because only id Field ,val Can go over the index . Indexes that do not require subqueries also need to be returned to the table .) Then find the required data through index column query .

Stick it down sql

#  The parent query uses the join query inner join ,110s
<select id="selectProductBrandByIntervalImprove" resultType="xxx" parameterType="java.lang.Integer">
        select
            id as id,
            sku_id as sku_id,
            brand_id as brand_id
        from xxx
        inner join
            ( select id from xxx  limit #{offset},#{limit}) b using (id)
    </select>

#  Father inquiry id>= Subquery 
<select id="selectProductBrandByIntervalImprove" resultType="xxx" parameterType="java.lang.Integer">
        select
            id as id,
            sku_id as sku_id,
            brand_id as brand_id
        from xxx
       where id >= 
            ( select id from xxx  limit #{offset},1) limit #{limit}
    </select>

efficiency , stay Navicat Ban cache Efficiency has improved a lot , however java There is little difference in the effect of running .

  • // 20 Thread optimization limit( Parent query uses inner join) Inquire about —》110s
  • // 20 Thread optimization limit( Father inquires id Orderly , direct >= Subquery )—》113 s

2、 Merge results to complete migration

Each thread task queries the data , Then convert to List<String> , It's important to note that nebula Substitution of special characters is not supported ( Like single quotes 、 comma 、 Escape slash 、 Both Chinese and English brackets should be replaced , And suggest nGQL Statement inserts a string using single quotes instead of double quotes , Prevent escape insert failure )

Every item It's just one. 1000 Bar record nGQL Insert group . Multithread merge , Need to use thread safe CopyOnWriteList aggregate . Of course. 600w+ The whole data is queried 、 conversion 、 No problem inserting , In the following single table 6000w+ data When something goes wrong .

Only some threads can query successfully , Probably 1000w+ data , And transform nGQL The process directly throws heap OOM abnormal .

Use jps、jmap、jconsole Analyze memory , Modify the default maximum heap memory from 4G Change it to -Xmx6044m, Find it can reach 2000w The left and right queries are OOM 了 , Therefore, when the memory condition of this machine is limited, we can only find another way .

Combined with the business, it can be inferred that , The Cenozoic is frequent GC, Memory is too high in the old days , Lead to OOM, In the old days, the memory fluctuated constantly, and the business analysis should be CopyOnWriteList Why , The query results of each thread should be summarized to CopyOnWriteList.

Preliminary analysis based on data , One Java Object The memory occupied should be 16Bytes, Add... In the object String Member attribute 5 individual , Share in 4Bytes * 5 + 16Bytes = 36Bytes.6000w+ An object needs :6000w+ * 36bytes / 1024 / 1024 = 2059M, Roughly speaking 2G many , And there's more List<String> The collection and CopyOnWriteList A large amount of memory is required for backup and replication of .

And the default. -Xmx1024 The heap memory specified by the parameter is only 1G, Obviously not enough , It happens all the time 400 many times young gc Objects are piled up to the old age .

 Insert picture description here

Finally, rely on the basis id grouping , Each processing 1000w+ data , Run away together 6 Sub serial Only to solve .

Here's how to use JVM Some command steps of memory analysis tool .

Cumulative import quantity
 Insert picture description here

3、JVM Memory analysis

One 、 Tools

1、jps: see java Process number pid

2、jconsole: Visual interface , Look at the memory , Number of threads, etc .

3、jmap: Generate dump file 
#  Or it can be generated manually dump file , Use mat Analysis or online websites 
#  Get dump The next step in the file is to analyze , Because there is no JDK Environmental Science , Download the MAT The tool also reports an error .
#  So you can use an online dump Analysis website :https://heaphero.io/index.jsp Or is it https://gceasy.io/index.jsp

jmap -dump:format=b,file=heap.dump 8544

Dumping heap to D:\myidea_projects\data-conversion\data-conversion\heap.bin ...
Heap dump file created [1141143851 bytes in 10.996 secs]

Two 、 Parameters

# 1、 Make it happen OOM Time to generate dump file 
#  Give Way JVM In case of OOM(OutOfMemoryError) Time generation Dump file 
-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/path/heap/dump

# 2、 Print GC journal 
-XX:+PrintGCDetails
原网站

版权声明
本文为[scl、]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/177/202206260314002245.html