当前位置:网站首页>DataX tutorial (09) - how does dataX achieve speed limit?
DataX tutorial (09) - how does dataX achieve speed limit?
2022-06-25 06:28:00 【Yanglinwei】
List of articles
01 introduction
Through the previous blog , We are right. DataX Have a certain in-depth understanding :
- 《DataX course (01)- introduction 》
- 《DataX course (02)- IDEA function DataX Complete process ( Fill all the holes )》
- 《DataX course (03)- Source code interpretation ( Super detailed Edition )
- 《DataX course (04)- The configuration is complete 》
- 《DataX course (05)- DataX Web Project practice 》
- 《DataX course (06)- DataX tuning 》
- 《DataX course (07)- The illustration DataX Task assignment and execution process 》
- 《DataX course (08)- Monitoring and reporting 》
As for DataX Learn more , I raised a question , Exactly? DataX How to limit the speed ? This article will explain .
02 Reverse positioning code
We know it's in core.json In the document speed Speed limit in the method DataX Of , Can pass record Number of records and byte Bytes to speed limit :
This configuration is CoreConstant Class defines :
Select constant , Right click to use IDEA Of Find Usages… You can see that this value is called in two places :
Next , Look at these two configurations in Channel Class to implement speed limit .
03 Channel Class
From the chart , You can see in the Channel On initialization , The number of records that have initiated the speed limit (recordSpeed) And the number of bytes (byteSpeed) , Next Control+F have a look recordSpeed Where was it called .
You can see in the statPush The method uses :
statPush Description of the whole process :
- Judge
byteSpeed(bps)andrecordSpeed(tps)Are they all greater than 0? If not , The exit ; - Based on the current
byteSpeedAnd setbyteSpeedcontrast , Find the sleep time ( The formula :currentByteSpeed * interval / this.byteSpeed- interval;) - Based on the current
recordSpeedAnd setrecordSpeedcontrast , Find the sleep time ( The formula :currentRecordSpeed * interval / this.recordSpeed - interval;) - Take the maximum sleep time ;
Thread.sleep(sleepTime)To sleep
Stick it on the bottom statPush Complete code for :
private void statPush(long recordSize, long byteSize) {
currentCommunication.increaseCounter(CommunicationTool.READ_SUCCEED_RECORDS,
recordSize);
currentCommunication.increaseCounter(CommunicationTool.READ_SUCCEED_BYTES,
byteSize);
// Statistics while reading waitCounter that will do , Because of the writing (pull) It might be blocking , But when I read it, I can read the blocking counter Count
currentCommunication.setLongCounter(CommunicationTool.WAIT_READER_TIME, waitReaderTime);
currentCommunication.setLongCounter(CommunicationTool.WAIT_WRITER_TIME, waitWriterTime);
boolean isChannelByteSpeedLimit = (this.byteSpeed > 0);
boolean isChannelRecordSpeedLimit = (this.recordSpeed > 0);
if (!isChannelByteSpeedLimit && !isChannelRecordSpeedLimit) {
return;
}
long lastTimestamp = lastCommunication.getTimestamp();
long nowTimestamp = System.currentTimeMillis();
long interval = nowTimestamp - lastTimestamp;
if (interval - this.flowControlInterval >= 0) {
long byteLimitSleepTime = 0;
long recordLimitSleepTime = 0;
if (isChannelByteSpeedLimit) {
long currentByteSpeed = (CommunicationTool.getTotalReadBytes(currentCommunication) -
CommunicationTool.getTotalReadBytes(lastCommunication)) * 1000 / interval;
if (currentByteSpeed > this.byteSpeed) {
// Calculation basis byteLimit Sleep time obtained
byteLimitSleepTime = currentByteSpeed * interval / this.byteSpeed
- interval;
}
}
if (isChannelRecordSpeedLimit) {
long currentRecordSpeed = (CommunicationTool.getTotalReadRecords(currentCommunication) -
CommunicationTool.getTotalReadRecords(lastCommunication)) * 1000 / interval;
if (currentRecordSpeed > this.recordSpeed) {
// Calculation basis recordLimit Sleep time obtained
recordLimitSleepTime = currentRecordSpeed * interval / this.recordSpeed
- interval;
}
}
// Sleep time greater
long sleepTime = byteLimitSleepTime < recordLimitSleepTime ?
recordLimitSleepTime : byteLimitSleepTime;
if (sleepTime > 0) {
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
lastCommunication.setLongCounter(CommunicationTool.READ_SUCCEED_BYTES,
currentCommunication.getLongCounter(CommunicationTool.READ_SUCCEED_BYTES));
lastCommunication.setLongCounter(CommunicationTool.READ_FAILED_BYTES,
currentCommunication.getLongCounter(CommunicationTool.READ_FAILED_BYTES));
lastCommunication.setLongCounter(CommunicationTool.READ_SUCCEED_RECORDS,
currentCommunication.getLongCounter(CommunicationTool.READ_SUCCEED_RECORDS));
lastCommunication.setLongCounter(CommunicationTool.READ_FAILED_RECORDS,
currentCommunication.getLongCounter(CommunicationTool.READ_FAILED_RECORDS));
lastCommunication.setTimestamp(nowTimestamp);
}
}
04 At the end of the article
By reading this article, you can know DataX The principle of speed limit , If there is any doubt about children's shoes , Welcome to leave a message , The end of this paper !
边栏推荐
- Microsoft issued a document to celebrate Net 20th anniversary!
- Cat command – display the file contents on the terminal device
- At the age of 26, I was transferred to software testing with zero foundation. Now I have successfully entered the job with a monthly salary of 12K. However, no one understands my bitterness
- Netstat command – displays network status
- [kicad image] download and installation
- Analysis report on demand scale and Supply Prospect of global and Chinese thermal insulation materials market during the 14th Five Year Plan period 2022-2028
- Getting started with Silverlight development 1
- Rhcsa--- day 6 operation
- What elements are indispensable for the development of the character? What are the stages
- Guess the size of the number
猜你喜欢

Processes and threads - concepts and process scheduling

Personal blog system graduation project opening report

Microsoft issued a document to celebrate Net 20th anniversary!

Rhcsa--- day 6 operation

JS dynamic table creation

DNS domain name system

Understand what MTU is
![[short time energy] short time energy of speech signal based on MATLAB [including Matlab source code 1719]](/img/a1/0cb61368cb1d0817d74781084a4466.jpg)
[short time energy] short time energy of speech signal based on MATLAB [including Matlab source code 1719]

Location object
![[data visualization application] draw spatial map (with R language code)](/img/2d/04e5015573d10bdd6325ae497bfeb3.jpg)
[data visualization application] draw spatial map (with R language code)
随机推荐
Getting started with Silverlight development 1
[no title] dream notes 2022-02-20
You can see the classification of SQL injection. SQL injection point /sql injection type /sql injection has several /sql injection point classifications
General test point ideas are summarized and shared, which can be directly used in interview and actual software testing
TFTP command – uploading and downloading files
JD 7 head search navigation layout
Netstat command – displays network status
Advantages and disadvantages of using SNMP and WMI polling
Leetcode sword finger offer question brushing - day 27
Handling skills of SQL optimization (2)
[hand torn STL] Stack & queue
Processes and threads - concepts and process scheduling
证券如何在线开户?在线开户是安全么?
Monitoring access: how to grant minimum WMI access to the monitoring service account
We cannot activate inspection type for article master in transaction code MM41?
Distributed solar photovoltaic inverter monitoring
DF command – displays disk space usage
2022 AI trend 8 forecast!
[kicad image] download and installation
Lesson 8: FTP server setup and loading