当前位置:网站首页>DataX tutorial (09) - how does dataX achieve speed limit?
DataX tutorial (09) - how does dataX achieve speed limit?
2022-06-25 06:28:00 【Yanglinwei】
List of articles
01 introduction
Through the previous blog , We are right. DataX
Have a certain in-depth understanding :
- 《DataX course (01)- introduction 》
- 《DataX course (02)- IDEA function DataX Complete process ( Fill all the holes )》
- 《DataX course (03)- Source code interpretation ( Super detailed Edition )
- 《DataX course (04)- The configuration is complete 》
- 《DataX course (05)- DataX Web Project practice 》
- 《DataX course (06)- DataX tuning 》
- 《DataX course (07)- The illustration DataX Task assignment and execution process 》
- 《DataX course (08)- Monitoring and reporting 》
As for DataX
Learn more , I raised a question , Exactly? DataX
How to limit the speed ? This article will explain .
02 Reverse positioning code
We know it's in core.json
In the document speed Speed limit in the method DataX
Of , Can pass record
Number of records and byte
Bytes to speed limit :
This configuration is CoreConstant
Class defines :
Select constant , Right click to use IDEA Of Find Usages… You can see that this value is called in two places :
Next , Look at these two configurations in Channel
Class to implement speed limit .
03 Channel Class
From the chart , You can see in the Channel
On initialization , The number of records that have initiated the speed limit (recordSpeed
) And the number of bytes (byteSpeed
) , Next Control+F
have a look recordSpeed
Where was it called .
You can see in the statPush
The method uses :statPush
Description of the whole process :
- Judge
byteSpeed(bps)
andrecordSpeed(tps)
Are they all greater than 0? If not , The exit ; - Based on the current
byteSpeed
And setbyteSpeed
contrast , Find the sleep time ( The formula :currentByteSpeed * interval / this.byteSpeed- interval;
) - Based on the current
recordSpeed
And setrecordSpeed
contrast , Find the sleep time ( The formula :currentRecordSpeed * interval / this.recordSpeed - interval;
) - Take the maximum sleep time ;
Thread.sleep(sleepTime)
To sleep
Stick it on the bottom statPush
Complete code for :
private void statPush(long recordSize, long byteSize) {
currentCommunication.increaseCounter(CommunicationTool.READ_SUCCEED_RECORDS,
recordSize);
currentCommunication.increaseCounter(CommunicationTool.READ_SUCCEED_BYTES,
byteSize);
// Statistics while reading waitCounter that will do , Because of the writing (pull) It might be blocking , But when I read it, I can read the blocking counter Count
currentCommunication.setLongCounter(CommunicationTool.WAIT_READER_TIME, waitReaderTime);
currentCommunication.setLongCounter(CommunicationTool.WAIT_WRITER_TIME, waitWriterTime);
boolean isChannelByteSpeedLimit = (this.byteSpeed > 0);
boolean isChannelRecordSpeedLimit = (this.recordSpeed > 0);
if (!isChannelByteSpeedLimit && !isChannelRecordSpeedLimit) {
return;
}
long lastTimestamp = lastCommunication.getTimestamp();
long nowTimestamp = System.currentTimeMillis();
long interval = nowTimestamp - lastTimestamp;
if (interval - this.flowControlInterval >= 0) {
long byteLimitSleepTime = 0;
long recordLimitSleepTime = 0;
if (isChannelByteSpeedLimit) {
long currentByteSpeed = (CommunicationTool.getTotalReadBytes(currentCommunication) -
CommunicationTool.getTotalReadBytes(lastCommunication)) * 1000 / interval;
if (currentByteSpeed > this.byteSpeed) {
// Calculation basis byteLimit Sleep time obtained
byteLimitSleepTime = currentByteSpeed * interval / this.byteSpeed
- interval;
}
}
if (isChannelRecordSpeedLimit) {
long currentRecordSpeed = (CommunicationTool.getTotalReadRecords(currentCommunication) -
CommunicationTool.getTotalReadRecords(lastCommunication)) * 1000 / interval;
if (currentRecordSpeed > this.recordSpeed) {
// Calculation basis recordLimit Sleep time obtained
recordLimitSleepTime = currentRecordSpeed * interval / this.recordSpeed
- interval;
}
}
// Sleep time greater
long sleepTime = byteLimitSleepTime < recordLimitSleepTime ?
recordLimitSleepTime : byteLimitSleepTime;
if (sleepTime > 0) {
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
lastCommunication.setLongCounter(CommunicationTool.READ_SUCCEED_BYTES,
currentCommunication.getLongCounter(CommunicationTool.READ_SUCCEED_BYTES));
lastCommunication.setLongCounter(CommunicationTool.READ_FAILED_BYTES,
currentCommunication.getLongCounter(CommunicationTool.READ_FAILED_BYTES));
lastCommunication.setLongCounter(CommunicationTool.READ_SUCCEED_RECORDS,
currentCommunication.getLongCounter(CommunicationTool.READ_SUCCEED_RECORDS));
lastCommunication.setLongCounter(CommunicationTool.READ_FAILED_RECORDS,
currentCommunication.getLongCounter(CommunicationTool.READ_FAILED_RECORDS));
lastCommunication.setTimestamp(nowTimestamp);
}
}
04 At the end of the article
By reading this article, you can know DataX
The principle of speed limit , If there is any doubt about children's shoes , Welcome to leave a message , The end of this paper !
边栏推荐
- [kicad image] download and installation
- RM command – remove file or directory
- Record of friend guide
- Fdisk command – disk partition
- Guess the size of the number
- Uncaught typeerror cannot set properties of undefined (setting 'classname') reported by binding onclick event in jsfor loop
- 3-7sql injection website instance step 3: attack type and attack strategy
- Rhcsa day 4
- With a younger brother OCR, say no to various types of verification codes!
- How to open an account online? Is it safe to open an account online?
猜你喜欢
Wan Yin revealed that he was rejected by MIT in this way: "the department doesn't like you". He confronted the principal and realized
Viewing Chinese science and technology from the Winter Olympics (V): the Internet of things
Es11 new methods: dynamic import(), bigint, globalthis, optional chain, and null value merging operator
DNS domain name system
Mongodb delete data
Notes on dashboard & kuboard installation in kubernetes cluster
Large funds support ecological construction, and Plato farm builds a real meta universe with Dao as its governance
[Suanli network] problems and challenges faced by the development of Suanli network
[hand torn STL] Stack & queue
How two hosts in different network segments directly connected communicate
随机推荐
3-7sql injection website instance step 3: attack type and attack strategy
Wireless industrial Internet of things data monitoring terminal
In depth inventory: 23 vscode plug-in artifacts that improve development efficiency and aesthetics
Report on strategic suggestions on investment direction and Prospect of global and Chinese marine biological industry (2022 Edition)
[no title] dream notes 2022-02-20
Wechat applet authorization login + mobile phone sending verification code +jwt verification interface (laravel8+php)
Copying DNA
The sum problem
Ifconfig command – displays or sets network devices
How to open an account online? Is it safe to open an account online?
PHP output (print) log to TXT text
Es11 new methods: dynamic import(), bigint, globalthis, optional chain, and null value merging operator
How to deploy locally developed SAP ui5 applications to ABAP servers
Research Report on global and Chinese vaccine market profit forecast and the 14th five year plan development strategy 2022-2028
What happens when redis runs out of memory
Guess the size of the number
[kicad image] download and installation
Analysis report on global and Chinese pharmaceutical excipients industry competition and marketing model 2022-2028
Optimal Parking
At the age of 26, I was transferred to software testing with zero foundation. Now I have successfully entered the job with a monthly salary of 12K. However, no one understands my bitterness