当前位置:网站首页>Eureka的InstanceInfoReplicator类(服务注册辅助工具)
Eureka的InstanceInfoReplicator类(服务注册辅助工具)
2022-06-22 10:52:00 【华为云】
欢迎访问我的GitHub
这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos
关于服务注册
- 以下图片来自Netflix官方,图中显示Eureka Client会向注册中心发起Get Registry请求来获取服务列表:

- 以Spring Cloud的Edgware.RELEASE版本为例,Eureka client的注册动作是在com.netflix.discovery.DiscoveryClient类的initScheduledTasks方法中执行的,相关代码片段如下所示,请注意中文注释:
//略去不相关代码...//实例化InstanceInfoReplicator对象instanceInfoReplicator = new InstanceInfoReplicator( this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); // burstSize //监听器,用来监听作为Eureka client的自身的状态变化 statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { @Override public String getId() { return "statusChangeListener"; } @Override public void notify(StatusChangeEvent statusChangeEvent) { if (InstanceStatus.DOWN == statusChangeEvent.getStatus() || InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) { // log at warn level if DOWN was involved logger.warn("Saw local status change event {}", statusChangeEvent); } else { logger.info("Saw local status change event {}", statusChangeEvent); } //状态变化时notify方法会被执行,此时上报最新状态到Eureka server instanceInfoReplicator.onDemandUpdate(); } }; if (clientConfig.shouldOnDemandUpdateStatusChange()) { //注册监听器 applicationInfoManager.registerStatusChangeListener(statusChangeListener); } //服务注册 instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());- 上述代码表明,将自身信息上报到Eureka server的工作是通过调用instanceInfoReplicator的api完成的;
InstanceInfoReplicator的作用
- 先看InstanceInfoReplicator源码的注释:
/** * A task for updating and replicating the local instanceinfo to the remote server. Properties of this task are: * - configured with a single update thread to guarantee sequential update to the remote server * - update tasks can be scheduled on-demand via onDemandUpdate() * - task processing is rate limited by burstSize * - a new update task is always scheduled automatically after an earlier update task. However if an on-demand task * is started, the scheduled automatic update task is discarded (and a new one will be scheduled after the new * on-demand update). * * @author dliu */- 我的理解:
- InstanceInfoReplicator是个任务类,负责将自身的信息周期性的上报到Eureka server;
- 有两个场景触发上报:周期性任务、服务状态变化(onDemandUpdate被调用),因此,在同一时刻有可能有两个上报的任务同时出现;
- 单线程执行上报的操作,如果有多个上报任务,也能确保是串行的;
- 有频率限制,通过burstSize参数来控制;
- 先创建的任务总是先执行,但是onDemandUpdate方法中创建的任务会将周期性任务给丢弃掉;
源码分析
- 以前面对注释的理解作为主线,去看源码:
- 先看构造方法,如下,中文注释位置需要注意:
InstanceInfoReplicator(DiscoveryClient discoveryClient, InstanceInfo instanceInfo, int replicationIntervalSeconds, int burstSize) { this.discoveryClient = discoveryClient; this.instanceInfo = instanceInfo; //线程池,core size为1,使用DelayedWorkQueue队列 this.scheduler = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-InstanceInfoReplicator-%d") .setDaemon(true) .build()); this.scheduledPeriodicRef = new AtomicReference<Future>(); this.started = new AtomicBoolean(false); //RateLimiter是个限制频率的工具类,用来限制单位时间内的任务次数 this.rateLimiter = new RateLimiter(TimeUnit.MINUTES); this.replicationIntervalSeconds = replicationIntervalSeconds; this.burstSize = burstSize; //通过周期间隔,和burstSize参数,计算每分钟允许的任务数 this.allowedRatePerMinute = 60 * this.burstSize / this.replicationIntervalSeconds; logger.info("InstanceInfoReplicator onDemand update allowed rate per min is {}", allowedRatePerMinute); }- 从以上代码可见,构造方法中准备好了线程池和频率限制工具,再算好了每分钟允许的任务数;
- 在com.netflix.discovery.DiscoveryClient类的initScheduledTasks方法中,通过调用instanceInfoReplicator.start方法启动了周期性任务,现在来看此方法:
public void start(int initialDelayMs) { //CAS操作,不但保证了只执行一次,多线程场景也能保证 if (started.compareAndSet(false, true)) { instanceInfo.setIsDirty(); // for initial register //提交一个任务,延时执行,注意第一个参数是this,因此延时结束时,InstanceInfoReplicator的run方法会被执行 Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS); //这个任务的Feature对象放在成员变量scheduledPeriodicRef中 scheduledPeriodicRef.set(next); }}- 延时时间到达时,会执行run方法:
public void run() { try { //更新信息,用于稍后的上报 discoveryClient.refreshInstanceInfo(); Long dirtyTimestamp = instanceInfo.isDirtyWithTime(); if (dirtyTimestamp != null) { //上报 discoveryClient.register(); instanceInfo.unsetIsDirty(dirtyTimestamp); } } catch (Throwable t) { logger.warn("There was a problem with the instance info replicator", t); } finally { //每次执行完毕都会创建一个延时执行的任务,就这样实现了周期性执行的逻辑 Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS); //每次创建的周期性任务,都要放入scheduledPeriodicRef, //如果外部调用了onDemandUpdate,就能通过onDemandUpdate取得当前要执行的任务 scheduledPeriodicRef.set(next); } }- 以上代码汇总起来,就完成了周期性任务的逻辑,接下来看看被外部调用的onDemandUpdate方法:
public boolean onDemandUpdate() { //没有达到频率限制才会执行 if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) { //提交一个任务 scheduler.submit(new Runnable() { @Override public void run() { logger.debug("Executing on-demand update of local InstanceInfo"); //取出之前已经提交的任务 Future latestPeriodic = scheduledPeriodicRef.get(); //如果此任务未完成,就立即取消 if (latestPeriodic != null && !latestPeriodic.isDone()) { logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update"); latestPeriodic.cancel(false); } //通过调用run方法,令任务在延时后执行,相当于周期性任务中的一次 InstanceInfoReplicator.this.run(); } }); return true; } else { //如果超过了设置的频率限制,本次onDemandUpdate方法就提交任务了 logger.warn("Ignoring onDemand update due to rate limiter"); return false; } }如上述代码所示,可见之前注释中提到的功能都已实现;
至此,InstanceInfoReplicator已分析完毕,可见这是个功能强大的辅助类,在应用信息上报到Eureka server时发挥了重要的作用,业务逻辑可以放心的提交上报请求,并发、频率超限等情况都被InstanceInfoReplicator处理好了;
欢迎关注华为云博客:程序员欣宸
边栏推荐
- [Jenkins] shell script calls Jenkins API interface
- Heidisql inserts records. There are always errors. How do you change them?
- Native JS dynamically add and delete classes
- Denso China adopts Oracle HCM cloud technology solution to accelerate the digital transformation of human resources
- What is the name of CITIC Securities app? Is it safe to open a stock account?
- Pytorch实现波阻抗反演
- 【jmeter】shell脚本自动执行jmeter
- Career development planning
- Spark精简面试
- Program architecture design for embedded software development task scheduling
猜你喜欢

Duxiaoyong, head of virtual teaching and Research Office of database course: Based on the major needs of domestic databases, explore a new mode of course system construction
推荐一款M1芯片电脑快速搭建集群的虚拟机软件

Super simple C language Snake does not flash screen double buffer

AQS的初步了解

Gartner表示:云数据库发展强劲,但本地数据库仍然充满活力

机器人强化学习——Sim-to-Real Robot Learning from Pixels with Progressive Nets (2017)

Convert the colored liquid image into transparent liquid, and CMU teaches the robot to accurately control how much water is poured into the cup

普乐蛙5d飞行影院5d动感影院体验馆设备7d多人互动影院

rtklib postpos 梳理(以单点定位为例)

laravel 开发 文章URL 生成器
随机推荐
Yolov3 target detection
Gartner said: cloud database is developing strongly, but local database is still full of vitality
机器人强化学习——Sim-to-Real Robot Learning from Pixels with Progressive Nets (2017)
Construction details of Danzhou clean animal laboratory
线程死锁的理解
MySQL使用SQL语句新增字段、删除字段
6-9 inter application communication - sub application communication
jg_ Using easyexcel to read Excel_ twenty million two hundred and twenty thousand six hundred and nineteen
批量创建/删除文件、文件夹、修改文件名 后缀名
MySQL daily experience [02]
Byte 2: why is the key of the redis master node expired, but the expired data is still read from the secondary node? How to solve it?
社恐?自我介绍时大脑一片空白?
The father of the college entrance examination student told himself at night that what he cared about most was not the child's performance, and the turning point was not false at all
【直播回顾】战码先锋第六期:共建测试子系统,赋能开发者提高代码质量
6-13 improving load performance - application cache
Go微服务(一)——RPC入门
在 Laravel 中使用计算列
Pytorch实现波阻抗反演
PHP website, how to achieve the function of batch printing express orders?
世界上第一个“半机械人”去世,改造自己只为“逆天改命”