当前位置:网站首页>Source code analysis of nine routing strategies for distributed task scheduling platform XXL job
Source code analysis of nine routing strategies for distributed task scheduling platform XXL job
2022-06-25 15:34:00 【Grey Wolf_ cxh】
Routing policy source code :
Source code mainly in com.xxl.job.admin.core.route It's a bag

First declare a routing policy abstract class :
public abstract class ExecutorRouter {
protected static Logger logger = LoggerFactory.getLogger(ExecutorRouter.class);
/**
* route address
*
* @param addressList
* @return ReturnT.content=address
*/
public abstract ReturnT<String> route(TriggerParam triggerParam, List<String> addressList);
}1. first : Return to the first in the list of registered addresses
public class ExecutorRouteFirst extends ExecutorRouter {
@Override
public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList){
return new ReturnT<String>(addressList.get(0));
}
}2. the last one : Return to the last of the registered address list
public class ExecutorRouteLast extends ExecutorRouter {
@Override
public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
return new ReturnT<String>(addressList.get(addressList.size()-1));
}
}3. polling : Created a static ConcurrentMap object , This routeCountEachJob It is used to store routing tasks , It also sets the cache time , Valid for 24 Hours , When more than 24 In an hour , Automatically clear the current cache .
among ConcurrentMap Of key by jobId,value For the current jobId Corresponding counter , Every time you visit, you will add one , Maximum increase to 1000000, And then from [0,100) The random number of starts to increase again .
The idea of this algorithm is to take the remainder , Calculate the current... Each time jobId The value of the corresponding counter , then The value of the counter % Register address list size , Find the address of this poll .
public class ExecutorRouteRound extends ExecutorRouter {
private static ConcurrentMap<Integer, AtomicInteger> routeCountEachJob = new ConcurrentHashMap<>();
private static long CACHE_VALID_TIME = 0;
private static int count(int jobId) {
// cache clear
if (System.currentTimeMillis() > CACHE_VALID_TIME) {
routeCountEachJob.clear();
CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24;
}
AtomicInteger count = routeCountEachJob.get(jobId);
if (count == null || count.get() > 1000000) {
// Active during initialization Random once , Relieve stress for the first time
count = new AtomicInteger(new Random().nextInt(100));
} else {
// count++
count.addAndGet(1);
}
routeCountEachJob.put(jobId, count);
return count.get();
}
@Override
public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
String address = addressList.get(count(triggerParam.getJobId())%addressList.size());
return new ReturnT<String>(address);
}
}
4. Random : Through one Random Object's nextInt The method is to find [0, Register address list size ) Any address in the interval
public class ExecutorRouteRandom extends ExecutorRouter {
private static Random localRandom = new Random();
@Override
public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
String address = addressList.get(localRandom.nextInt(addressList.size()));
return new ReturnT<String>(address);
}
}
5. Uniformity HASH: Virtual nodes are introduced , The nodes of each server will generate the corresponding 100 Virtual nodes , Such a small number of server nodes can be realized by introducing virtual nodes , It will increase the number of nodes , So a large number of nodes are allocated to hash The ring is relatively uniform , So it is easy to solve the problem of uneven data distribution .
/**
* The machine address under the group is the same , Different JOB Evenly hashed on different machines , Ensure the machine allocation under grouping JOB Average ; And each JOB Fixed scheduling of one of the machines ;
* a、virtual node: Address imbalances
* b、hash method replace hashCode:String Of hashCode May repeat , Further expansion is needed hashCode Value range of
*
*/
public class ExecutorRouteConsistentHash extends ExecutorRouter {
private static int VIRTUAL_NODE_NUM = 100;
/**
* get hash code on 2^32 ring (md5 Hash calculation hash value )
* @param key
* @return
*/
private static long hash(String key) {
// md5 byte
MessageDigest md5;
try {
md5 = MessageDigest.getInstance("MD5");
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException("MD5 not supported", e);
}
md5.reset();
byte[] keyBytes = null;
try {
keyBytes = key.getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
throw new RuntimeException("Unknown string :" + key, e);
}
md5.update(keyBytes);
byte[] digest = md5.digest();
// hash code, Truncate to 32-bits
long hashCode = ((long) (digest[3] & 0xFF) << 24)
| ((long) (digest[2] & 0xFF) << 16)
| ((long) (digest[1] & 0xFF) << 8)
| (digest[0] & 0xFF);
long truncateHashCode = hashCode & 0xffffffffL;
return truncateHashCode;
}
public String hashJob(int jobId, List<String> addressList) {
// ------A1------A2-------A3------
// -----------J1------------------
TreeMap<Long, String> addressRing = new TreeMap<Long, String>();
for (String address: addressList) {
for (int i = 0; i < VIRTUAL_NODE_NUM; i++) {
long addressHash = hash("SHARD-" + address + "-NODE-" + i);
addressRing.put(addressHash, address);
}
}
long jobHash = hash(String.valueOf(jobId));
SortedMap<Long, String> lastRing = addressRing.tailMap(jobHash);
if (!lastRing.isEmpty()) {
return lastRing.get(lastRing.firstKey());
}
return addressRing.firstEntry().getValue();
}
@Override
public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
String address = hashJob(triggerParam.getJobId(), addressList);
return new ReturnT<String>(address);
}
}
6. The least used : By building a job and address map, For the first time, the registered address of the actuator corresponding to the task is randomly numbered with a serial number , Then the registered addresses of the actuators are sorted from small to large , The filtering process finds the first one with the smallest serial number as the next routing address , Then the currently selected address number value +1, In this way, the Registrar address with the lowest number will be selected as the next routing address .
/**
* Single JOB Corresponding to each actuator , The one with the lowest frequency of use is preferred to be elected
* a(*)、LFU(Least Frequently Used): The least used , frequency / frequency
* b、LRU(Least Recently Used): Most recently unused , Time
*
* Created by xuxueli on 17/3/10.
*/
public class ExecutorRouteLFU extends ExecutorRouter {
private static ConcurrentMap<Integer, HashMap<String, Integer>> jobLfuMap = new ConcurrentHashMap<Integer, HashMap<String, Integer>>();
private static long CACHE_VALID_TIME = 0;
public String route(int jobId, List<String> addressList) {
// cache clear
if (System.currentTimeMillis() > CACHE_VALID_TIME) {
jobLfuMap.clear();
CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24;
}
// lfu item init
HashMap<String, Integer> lfuItemMap = jobLfuMap.get(jobId); // Key Sorting can be done with TreeMap+ Construct parameters Compare;Value Sorting can only be done through ArrayList;
if (lfuItemMap == null) {
lfuItemMap = new HashMap<String, Integer>();
jobLfuMap.putIfAbsent(jobId, lfuItemMap); // Avoid overlapping
}
// put new
for (String address: addressList) {
if (!lfuItemMap.containsKey(address) || lfuItemMap.get(address) >1000000 ) {
lfuItemMap.put(address, new Random().nextInt(addressList.size())); // Active during initialization Random once , Relieve stress for the first time
}
}
// remove old
List<String> delKeys = new ArrayList<>();
for (String existKey: lfuItemMap.keySet()) {
if (!addressList.contains(existKey)) {
delKeys.add(existKey);
}
}
if (delKeys.size() > 0) {
for (String delKey: delKeys) {
lfuItemMap.remove(delKey);
}
}
// load least userd count address
List<Map.Entry<String, Integer>> lfuItemList = new ArrayList<Map.Entry<String, Integer>>(lfuItemMap.entrySet());
Collections.sort(lfuItemList, new Comparator<Map.Entry<String, Integer>>() {
@Override
public int compare(Map.Entry<String, Integer> o1, Map.Entry<String, Integer> o2) {
return o1.getValue().compareTo(o2.getValue());
}
});
Map.Entry<String, Integer> addressItem = lfuItemList.get(0);
String minAddress = addressItem.getKey();
addressItem.setValue(addressItem.getValue() + 1);
return addressItem.getKey();
}
@Override
public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
String address = route(triggerParam.getJobId(), addressList);
return new ReturnT<String>(address);
}
}
7. Most recently unused :
/**
* Single JOB Corresponding to each actuator , The longest used priority is elected
* a、LFU(Least Frequently Used): The least used , frequency / frequency
* b(*)、LRU(Least Recently Used): Most recently unused , Time
*
* Created by xuxueli on 17/3/10.
*/
public class ExecutorRouteLRU extends ExecutorRouter {
private static ConcurrentMap<Integer, LinkedHashMap<String, String>> jobLRUMap = new ConcurrentHashMap<Integer, LinkedHashMap<String, String>>();
private static long CACHE_VALID_TIME = 0;
public String route(int jobId, List<String> addressList) {
// cache clear
if (System.currentTimeMillis() > CACHE_VALID_TIME) {
jobLRUMap.clear();
CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24;
}
// init lru
LinkedHashMap<String, String> lruItem = jobLRUMap.get(jobId);
if (lruItem == null) {
/**
* LinkedHashMap
* a、accessOrder:true= Order of access (get/put When sorting );false= Insert sequential scheduling ;
* b、removeEldestEntry: When a new element is added, it will call , return true The oldest element will be deleted ; It can be packaged LinkedHashMap And rewrite the method , For example, define the maximum capacity , Beyond is to return true Can achieve a fixed length LRU Algorithm ;
*/
lruItem = new LinkedHashMap<String, String>(16, 0.75f, true);
jobLRUMap.putIfAbsent(jobId, lruItem);
}
// put new
for (String address: addressList) {
if (!lruItem.containsKey(address)) {
lruItem.put(address, address);
}
}
// remove old
List<String> delKeys = new ArrayList<>();
for (String existKey: lruItem.keySet()) {
if (!addressList.contains(existKey)) {
delKeys.add(existKey);
}
}
if (delKeys.size() > 0) {
for (String delKey: delKeys) {
lruItem.remove(delKey);
}
}
// load
String eldestKey = lruItem.entrySet().iterator().next().getKey();
String eldestValue = lruItem.get(eldestKey);
return eldestValue;
}
@Override
public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
String address = route(triggerParam.getJobId(), addressList);
return new ReturnT<String>(address);
}
}
8. Fail over : Traverse the address list of all registered nodes , Then the heartbeat processing is performed separately , Until you find a node that sends a successful heartbeat as the next routing node .
public class ExecutorRouteFailover extends ExecutorRouter {
@Override
public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
StringBuffer beatResultSB = new StringBuffer();
for (String address : addressList) {
// beat
ReturnT<String> beatResult = null;
try {
ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
beatResult = executorBiz.beat();
} catch (Exception e) {
logger.error(e.getMessage(), e);
beatResult = new ReturnT<String>(ReturnT.FAIL_CODE, ""+e );
}
beatResultSB.append( (beatResultSB.length()>0)?"<br><br>":"")
.append(I18nUtil.getString("jobconf_beat") + ":")
.append("<br>address:").append(address)
.append("<br>code:").append(beatResult.getCode())
.append("<br>msg:").append(beatResult.getMsg());
// beat success
if (beatResult.getCode() == ReturnT.SUCCESS_CODE) {
beatResult.setMsg(beatResultSB.toString());
beatResult.setContent(address);
return beatResult;
}
}
return new ReturnT<String>(ReturnT.FAIL_CODE, beatResultSB.toString());
}
}9. Busy transfer : Traverse the address list of all registered nodes , Then send idle heartbeat processing respectively , Until a node whose idle heartbeat is returned by the machine is found as the node for the next route .
public class ExecutorRouteBusyover extends ExecutorRouter {
@Override
public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
StringBuffer idleBeatResultSB = new StringBuffer();
for (String address : addressList) {
// beat
ReturnT<String> idleBeatResult = null;
try {
ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
idleBeatResult = executorBiz.idleBeat(new IdleBeatParam(triggerParam.getJobId()));
} catch (Exception e) {
logger.error(e.getMessage(), e);
idleBeatResult = new ReturnT<String>(ReturnT.FAIL_CODE, ""+e );
}
idleBeatResultSB.append( (idleBeatResultSB.length()>0)?"<br><br>":"")
.append(I18nUtil.getString("jobconf_idleBeat") + ":")
.append("<br>address:").append(address)
.append("<br>code:").append(idleBeatResult.getCode())
.append("<br>msg:").append(idleBeatResult.getMsg());
// beat success
if (idleBeatResult.getCode() == ReturnT.SUCCESS_CODE) {
idleBeatResult.setMsg(idleBeatResultSB.toString());
idleBeatResult.setContent(address);
return idleBeatResult;
}
}
return new ReturnT<String>(ReturnT.FAIL_CODE, idleBeatResultSB.toString());
}
}边栏推荐
- Graphic control and layout basis of R visualization
- Advertising effect cluster analysis (kmeans)
- Shared memory synchronous encapsulation
- Golang channel reading data
- Stderr and stdout related standard outputs and other C system APIs
- Image segmentation based on deep learning: network structure design
- Go language template text/template error unexpected EOF
- Websocket (WS) cluster solution
- GDB debugging
- [paper notes] mcunetv2: memory efficient patch based influence for tiny deep learning
猜你喜欢

Some usage records about using pyqt5

Mining procedure processing

Introduction to flexible array

Js- get the mouse coordinates and follow them

Agent and classloader

Learning notes on February 18, 2022 (C language)

JMeter reading and writing excel requires jxl jar

QT pattern prompt box implementation

解决Visio和office365安装兼容问题
![[C language] implementation of magic square array (the most complete)](/img/b2/2595263b77e0abac667972bbfe0c8a.jpg)
[C language] implementation of magic square array (the most complete)
随机推荐
解决Visio和office365安装兼容问题
[paper notes] contextual transformer networks for visual recognition
Completabilefuture of asynchronous tools for concurrent programming
Architecture evolution of high-performance servers -- Suggestions
Learning notes on February 18, 2022 (C language)
Detailed summary of reasons why alertmanager fails to send alarm messages at specified intervals / irregularly
Work of the first week
Getting started with lambda8 new features
How to download and install Weka package
Is it safe to open a stock account in Guoxin golden sun?
在打新债开户证券安全吗,需要什么准备
Ubuntu 20.04 installing mysql8.0 and modifying the MySQL password
通过客户经理的开户链接开股票账户安全吗?
Using reentrantlock and synchronized to implement blocking queue
剑指 Offer 06. 从尾到头打印链表
Yolov4 coco pre train Darknet weight file
55 specific ways to improve program design (1)
[paper notes] mcunetv2: memory efficient patch based influence for tiny deep learning
2. operator and expression multiple choice questions
Breakpad usage and DMP analysis