当前位置:网站首页>DataX tutorial (09) - how does dataX achieve speed limit?
DataX tutorial (09) - how does dataX achieve speed limit?
2022-06-25 06:28:00 【Yanglinwei】
List of articles
01 introduction
Through the previous blog , We are right. DataX Have a certain in-depth understanding :
- 《DataX course (01)- introduction 》
- 《DataX course (02)- IDEA function DataX Complete process ( Fill all the holes )》
- 《DataX course (03)- Source code interpretation ( Super detailed Edition )
- 《DataX course (04)- The configuration is complete 》
- 《DataX course (05)- DataX Web Project practice 》
- 《DataX course (06)- DataX tuning 》
- 《DataX course (07)- The illustration DataX Task assignment and execution process 》
- 《DataX course (08)- Monitoring and reporting 》
As for DataX Learn more , I raised a question , Exactly? DataX How to limit the speed ? This article will explain .
02 Reverse positioning code
We know it's in core.json In the document speed Speed limit in the method DataX Of , Can pass record Number of records and byte Bytes to speed limit :
This configuration is CoreConstant Class defines :
Select constant , Right click to use IDEA Of Find Usages… You can see that this value is called in two places :
Next , Look at these two configurations in Channel Class to implement speed limit .
03 Channel Class
From the chart , You can see in the Channel On initialization , The number of records that have initiated the speed limit (recordSpeed) And the number of bytes (byteSpeed) , Next Control+F have a look recordSpeed Where was it called .
You can see in the statPush The method uses :
statPush Description of the whole process :
- Judge
byteSpeed(bps)andrecordSpeed(tps)Are they all greater than 0? If not , The exit ; - Based on the current
byteSpeedAnd setbyteSpeedcontrast , Find the sleep time ( The formula :currentByteSpeed * interval / this.byteSpeed- interval;) - Based on the current
recordSpeedAnd setrecordSpeedcontrast , Find the sleep time ( The formula :currentRecordSpeed * interval / this.recordSpeed - interval;) - Take the maximum sleep time ;
Thread.sleep(sleepTime)To sleep
Stick it on the bottom statPush Complete code for :
private void statPush(long recordSize, long byteSize) {
currentCommunication.increaseCounter(CommunicationTool.READ_SUCCEED_RECORDS,
recordSize);
currentCommunication.increaseCounter(CommunicationTool.READ_SUCCEED_BYTES,
byteSize);
// Statistics while reading waitCounter that will do , Because of the writing (pull) It might be blocking , But when I read it, I can read the blocking counter Count
currentCommunication.setLongCounter(CommunicationTool.WAIT_READER_TIME, waitReaderTime);
currentCommunication.setLongCounter(CommunicationTool.WAIT_WRITER_TIME, waitWriterTime);
boolean isChannelByteSpeedLimit = (this.byteSpeed > 0);
boolean isChannelRecordSpeedLimit = (this.recordSpeed > 0);
if (!isChannelByteSpeedLimit && !isChannelRecordSpeedLimit) {
return;
}
long lastTimestamp = lastCommunication.getTimestamp();
long nowTimestamp = System.currentTimeMillis();
long interval = nowTimestamp - lastTimestamp;
if (interval - this.flowControlInterval >= 0) {
long byteLimitSleepTime = 0;
long recordLimitSleepTime = 0;
if (isChannelByteSpeedLimit) {
long currentByteSpeed = (CommunicationTool.getTotalReadBytes(currentCommunication) -
CommunicationTool.getTotalReadBytes(lastCommunication)) * 1000 / interval;
if (currentByteSpeed > this.byteSpeed) {
// Calculation basis byteLimit Sleep time obtained
byteLimitSleepTime = currentByteSpeed * interval / this.byteSpeed
- interval;
}
}
if (isChannelRecordSpeedLimit) {
long currentRecordSpeed = (CommunicationTool.getTotalReadRecords(currentCommunication) -
CommunicationTool.getTotalReadRecords(lastCommunication)) * 1000 / interval;
if (currentRecordSpeed > this.recordSpeed) {
// Calculation basis recordLimit Sleep time obtained
recordLimitSleepTime = currentRecordSpeed * interval / this.recordSpeed
- interval;
}
}
// Sleep time greater
long sleepTime = byteLimitSleepTime < recordLimitSleepTime ?
recordLimitSleepTime : byteLimitSleepTime;
if (sleepTime > 0) {
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
lastCommunication.setLongCounter(CommunicationTool.READ_SUCCEED_BYTES,
currentCommunication.getLongCounter(CommunicationTool.READ_SUCCEED_BYTES));
lastCommunication.setLongCounter(CommunicationTool.READ_FAILED_BYTES,
currentCommunication.getLongCounter(CommunicationTool.READ_FAILED_BYTES));
lastCommunication.setLongCounter(CommunicationTool.READ_SUCCEED_RECORDS,
currentCommunication.getLongCounter(CommunicationTool.READ_SUCCEED_RECORDS));
lastCommunication.setLongCounter(CommunicationTool.READ_FAILED_RECORDS,
currentCommunication.getLongCounter(CommunicationTool.READ_FAILED_RECORDS));
lastCommunication.setTimestamp(nowTimestamp);
}
}
04 At the end of the article
By reading this article, you can know DataX The principle of speed limit , If there is any doubt about children's shoes , Welcome to leave a message , The end of this paper !
边栏推荐
- JSON. toJSONString(object, SerializerFeature.WriteMapNullValue); Second parameter action
- MV command – move or rename files
- Es11 new methods: dynamic import(), bigint, globalthis, optional chain, and null value merging operator
- CTFSHOW
- Optimal Parking
- After five years of software testing in ByteDance, I was dismissed in December to remind my brother of paddling
- Wireless industrial Internet of things data monitoring terminal
- Notes on dashboard & kuboard installation in kubernetes cluster
- Lesson 9: workspace introduction
- The sum problem
猜你喜欢

Pre knowledge of asynchronous operation

Hands on deep learning (III)
![[hand torn STL] Stack & queue](/img/db/d05c52f8e3fb0aade51460e86cf623.jpg)
[hand torn STL] Stack & queue

The elephant turns around and starts the whole body. Ali pushes Maoxiang not only to Jingdong
![[v2.0] automatic update system based on motion step API (support disconnection reconnection and data compensation)](/img/73/2ec957d58616d692e571a70826787f.jpg)
[v2.0] automatic update system based on motion step API (support disconnection reconnection and data compensation)

Getting started with Silverlight development 1

ARM processor operating mode

Personal blog system graduation project opening report

Guess the size of the number
![[open source sharing] deeply study KVM, CEPH, fuse features, including open source projects, code cases, articles, videos, architecture brain maps, etc](/img/9d/9bcf52f521e92cf97eb1d545931c68.jpg)
[open source sharing] deeply study KVM, CEPH, fuse features, including open source projects, code cases, articles, videos, architecture brain maps, etc
随机推荐
DNS domain name system
How often should you refactor- How often should you refactor?
Leetcode sword finger offer question brushing - day 27
2022-02-19: fence installation. In a two-dimensional garden, there are some trees represented by (x, y) coordinates. As the installation cost is very expensive, your task is to enclose all the trees w
Gavin's insight on transformer live class - line by line analysis and field experiment analysis of insurance BOT microservice code of insurance industry in the actual combat of Rasa dialogue robot pro
@Detailed explanation of valid annotation usage
十大券商公司哪个佣金最低,最安全可靠?有知道的吗
[v2.0] automatic update system based on motion step API (support disconnection reconnection and data compensation)
Global and China chemical mechanical polishing abrasive materials market demand outlook and investment scale forecast report 2022 Edition
[Suanli network] technological innovation of Suanli Network -- Key Technologies of green and security
Uname command – displays system information
Analysis report on production and sales demand and sales prospect of global and Chinese phosphating solution Market 2022-2028
Which of the top ten securities companies has the lowest Commission and is the most safe and reliable? Do you know anything
[Suanli network] problems and challenges faced by the development of Suanli network
Report on the application prospect and investment potential of global and Chinese cell therapy industry 2022-2028
The sum problem
Kotlin reflection -- Notes
What elements are indispensable for the development of the character? What are the stages
Research Report on demand and Competitive Prospect of global and Chinese welding personal protective equipment industry 2022-2027
Sword finger offer II 095 Longest common subsequence