当前位置:网站首页>DataX tutorial (09) - how does dataX achieve speed limit?
DataX tutorial (09) - how does dataX achieve speed limit?
2022-06-25 06:28:00 【Yanglinwei】
List of articles
01 introduction
Through the previous blog , We are right. DataX Have a certain in-depth understanding :
- 《DataX course (01)- introduction 》
- 《DataX course (02)- IDEA function DataX Complete process ( Fill all the holes )》
- 《DataX course (03)- Source code interpretation ( Super detailed Edition )
- 《DataX course (04)- The configuration is complete 》
- 《DataX course (05)- DataX Web Project practice 》
- 《DataX course (06)- DataX tuning 》
- 《DataX course (07)- The illustration DataX Task assignment and execution process 》
- 《DataX course (08)- Monitoring and reporting 》
As for DataX Learn more , I raised a question , Exactly? DataX How to limit the speed ? This article will explain .
02 Reverse positioning code
We know it's in core.json In the document speed Speed limit in the method DataX Of , Can pass record Number of records and byte Bytes to speed limit :
This configuration is CoreConstant Class defines :
Select constant , Right click to use IDEA Of Find Usages… You can see that this value is called in two places :
Next , Look at these two configurations in Channel Class to implement speed limit .
03 Channel Class
From the chart , You can see in the Channel On initialization , The number of records that have initiated the speed limit (recordSpeed) And the number of bytes (byteSpeed) , Next Control+F have a look recordSpeed Where was it called .
You can see in the statPush The method uses :
statPush Description of the whole process :
- Judge
byteSpeed(bps)andrecordSpeed(tps)Are they all greater than 0? If not , The exit ; - Based on the current
byteSpeedAnd setbyteSpeedcontrast , Find the sleep time ( The formula :currentByteSpeed * interval / this.byteSpeed- interval;) - Based on the current
recordSpeedAnd setrecordSpeedcontrast , Find the sleep time ( The formula :currentRecordSpeed * interval / this.recordSpeed - interval;) - Take the maximum sleep time ;
Thread.sleep(sleepTime)To sleep
Stick it on the bottom statPush Complete code for :
private void statPush(long recordSize, long byteSize) {
currentCommunication.increaseCounter(CommunicationTool.READ_SUCCEED_RECORDS,
recordSize);
currentCommunication.increaseCounter(CommunicationTool.READ_SUCCEED_BYTES,
byteSize);
// Statistics while reading waitCounter that will do , Because of the writing (pull) It might be blocking , But when I read it, I can read the blocking counter Count
currentCommunication.setLongCounter(CommunicationTool.WAIT_READER_TIME, waitReaderTime);
currentCommunication.setLongCounter(CommunicationTool.WAIT_WRITER_TIME, waitWriterTime);
boolean isChannelByteSpeedLimit = (this.byteSpeed > 0);
boolean isChannelRecordSpeedLimit = (this.recordSpeed > 0);
if (!isChannelByteSpeedLimit && !isChannelRecordSpeedLimit) {
return;
}
long lastTimestamp = lastCommunication.getTimestamp();
long nowTimestamp = System.currentTimeMillis();
long interval = nowTimestamp - lastTimestamp;
if (interval - this.flowControlInterval >= 0) {
long byteLimitSleepTime = 0;
long recordLimitSleepTime = 0;
if (isChannelByteSpeedLimit) {
long currentByteSpeed = (CommunicationTool.getTotalReadBytes(currentCommunication) -
CommunicationTool.getTotalReadBytes(lastCommunication)) * 1000 / interval;
if (currentByteSpeed > this.byteSpeed) {
// Calculation basis byteLimit Sleep time obtained
byteLimitSleepTime = currentByteSpeed * interval / this.byteSpeed
- interval;
}
}
if (isChannelRecordSpeedLimit) {
long currentRecordSpeed = (CommunicationTool.getTotalReadRecords(currentCommunication) -
CommunicationTool.getTotalReadRecords(lastCommunication)) * 1000 / interval;
if (currentRecordSpeed > this.recordSpeed) {
// Calculation basis recordLimit Sleep time obtained
recordLimitSleepTime = currentRecordSpeed * interval / this.recordSpeed
- interval;
}
}
// Sleep time greater
long sleepTime = byteLimitSleepTime < recordLimitSleepTime ?
recordLimitSleepTime : byteLimitSleepTime;
if (sleepTime > 0) {
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
lastCommunication.setLongCounter(CommunicationTool.READ_SUCCEED_BYTES,
currentCommunication.getLongCounter(CommunicationTool.READ_SUCCEED_BYTES));
lastCommunication.setLongCounter(CommunicationTool.READ_FAILED_BYTES,
currentCommunication.getLongCounter(CommunicationTool.READ_FAILED_BYTES));
lastCommunication.setLongCounter(CommunicationTool.READ_SUCCEED_RECORDS,
currentCommunication.getLongCounter(CommunicationTool.READ_SUCCEED_RECORDS));
lastCommunication.setLongCounter(CommunicationTool.READ_FAILED_RECORDS,
currentCommunication.getLongCounter(CommunicationTool.READ_FAILED_RECORDS));
lastCommunication.setTimestamp(nowTimestamp);
}
}
04 At the end of the article
By reading this article, you can know DataX The principle of speed limit , If there is any doubt about children's shoes , Welcome to leave a message , The end of this paper !
边栏推荐
- Cannot activate inspection type when SAP retail uses transaction code mm41 to create commodity master data?
- DF command – displays disk space usage
- You can see the classification of SQL injection. SQL injection point /sql injection type /sql injection has several /sql injection point classifications
- TFTP command – uploading and downloading files
- Zhinai's database
- HCIP Day 16
- IQ debugging of Hisilicon platform ISP and image (1)
- Exercise: completion
- @The difference between notempty, @notnull and @notblank
- Pre knowledge of asynchronous operation
猜你喜欢

Es11 new methods: dynamic import(), bigint, globalthis, optional chain, and null value merging operator

Notes on dashboard & kuboard installation in kubernetes cluster
![[Suanli network] technological innovation of Suanli Network -- Key Technologies of green and security](/img/52/7dedc5b6e213839fbf5cee3963ac99.jpg)
[Suanli network] technological innovation of Suanli Network -- Key Technologies of green and security

JS dynamic table creation

The perfect presentation of Dao in the metauniverse, and platofarm creates a farm themed metauniverse

Three tier architecture experiment

Cannot activate inspection type when SAP retail uses transaction code mm41 to create commodity master data?

RT thread i/o device model and layering
![[short time average zero crossing rate] short time average zero crossing rate of speech signal based on MATLAB [including Matlab source code 1721]](/img/4a/304f262c1c08800aa95f9e2d537e4d.jpg)
[short time average zero crossing rate] short time average zero crossing rate of speech signal based on MATLAB [including Matlab source code 1721]

3-7sql injection website instance step 3: attack type and attack strategy
随机推荐
Lesson 9: workspace introduction
Viewing Chinese science and technology from the Winter Olympics (V): the Internet of things
Asemi fast recovery diode us1m parameters, us1m recovery time, us1m voltage drop
Methods for obtaining some information of equipment
2022-02-19: fence installation. In a two-dimensional garden, there are some trees represented by (x, y) coordinates. As the installation cost is very expensive, your task is to enclose all the trees w
Research Report on demand and Competitive Prospect of global and Chinese welding personal protective equipment industry 2022-2027
Kotlin reflection -- Notes
Ifconfig command – displays or sets network devices
JD 8 fleet stores search history, deletes history, clears history (not finished)
Understand what ICMP Protocol is
3-7sql injection website instance step 3: attack type and attack strategy
DF command – displays disk space usage
VMware virtual machine prompt: the virtual device ide1:0 cannot be connected because there is no corresponding device on the host.
An interview question record about where in MySQL
证券如何在线开户?在线开户是安全么?
BigDecimal. Summary of setscale usage
@Principle of preauthorize permission control
How to use asemi FET 7n80 and how to use 7n80
Is it safe to open a stock account on the Internet in Beijing?
Analysis of common interview questions in redis