当前位置:网站首页>Eureka的InstanceInfoReplicator类(服务注册辅助工具)

Eureka的InstanceInfoReplicator类(服务注册辅助工具)

2022-06-22 10:52:00 华为云

欢迎访问我的GitHub

这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos

关于服务注册

  • 以下图片来自Netflix官方,图中显示Eureka Client会向注册中心发起Get Registry请求来获取服务列表:

image.png

  • 以Spring Cloud的Edgware.RELEASE版本为例,Eureka client的注册动作是在com.netflix.discovery.DiscoveryClient类的initScheduledTasks方法中执行的,相关代码片段如下所示,请注意中文注释:
//略去不相关代码...//实例化InstanceInfoReplicator对象instanceInfoReplicator = new InstanceInfoReplicator(                    this,                    instanceInfo,                    clientConfig.getInstanceInfoReplicationIntervalSeconds(),                    2); // burstSize            //监听器,用来监听作为Eureka client的自身的状态变化            statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {                @Override                public String getId() {                    return "statusChangeListener";                }                @Override                public void notify(StatusChangeEvent statusChangeEvent) {                    if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||                            InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {                        // log at warn level if DOWN was involved                        logger.warn("Saw local status change event {}", statusChangeEvent);                    } else {                        logger.info("Saw local status change event {}", statusChangeEvent);                    }                    //状态变化时notify方法会被执行,此时上报最新状态到Eureka server                    instanceInfoReplicator.onDemandUpdate();                }            };            if (clientConfig.shouldOnDemandUpdateStatusChange()) {                //注册监听器                applicationInfoManager.registerStatusChangeListener(statusChangeListener);            }            //服务注册            instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
  • 上述代码表明,将自身信息上报到Eureka server的工作是通过调用instanceInfoReplicator的api完成的;

InstanceInfoReplicator的作用

  • 先看InstanceInfoReplicator源码的注释:
/** * A task for updating and replicating the local instanceinfo to the remote server. Properties of this task are: * - configured with a single update thread to guarantee sequential update to the remote server * - update tasks can be scheduled on-demand via onDemandUpdate() * - task processing is rate limited by burstSize * - a new update task is always scheduled automatically after an earlier update task. However if an on-demand task * is started, the scheduled automatic update task is discarded (and a new one will be scheduled after the new * on-demand update). * * @author dliu */
  • 我的理解:
  1. InstanceInfoReplicator是个任务类,负责将自身的信息周期性的上报到Eureka server;
  2. 有两个场景触发上报:周期性任务、服务状态变化(onDemandUpdate被调用),因此,在同一时刻有可能有两个上报的任务同时出现;
  3. 单线程执行上报的操作,如果有多个上报任务,也能确保是串行的;
  4. 有频率限制,通过burstSize参数来控制;
  5. 先创建的任务总是先执行,但是onDemandUpdate方法中创建的任务会将周期性任务给丢弃掉;

源码分析

  • 以前面对注释的理解作为主线,去看源码:
  • 先看构造方法,如下,中文注释位置需要注意:
InstanceInfoReplicator(DiscoveryClient discoveryClient, InstanceInfo instanceInfo, int replicationIntervalSeconds, int burstSize) {        this.discoveryClient = discoveryClient;        this.instanceInfo = instanceInfo;		//线程池,core size为1,使用DelayedWorkQueue队列        this.scheduler = Executors.newScheduledThreadPool(1,                new ThreadFactoryBuilder()                        .setNameFormat("DiscoveryClient-InstanceInfoReplicator-%d")                        .setDaemon(true)                        .build());        this.scheduledPeriodicRef = new AtomicReference<Future>();        this.started = new AtomicBoolean(false);        //RateLimiter是个限制频率的工具类,用来限制单位时间内的任务次数        this.rateLimiter = new RateLimiter(TimeUnit.MINUTES);        this.replicationIntervalSeconds = replicationIntervalSeconds;        this.burstSize = burstSize;        //通过周期间隔,和burstSize参数,计算每分钟允许的任务数        this.allowedRatePerMinute = 60 * this.burstSize / this.replicationIntervalSeconds;        logger.info("InstanceInfoReplicator onDemand update allowed rate per min is {}", allowedRatePerMinute);    }
  • 从以上代码可见,构造方法中准备好了线程池和频率限制工具,再算好了每分钟允许的任务数;
  • 在com.netflix.discovery.DiscoveryClient类的initScheduledTasks方法中,通过调用instanceInfoReplicator.start方法启动了周期性任务,现在来看此方法:
public void start(int initialDelayMs) {		//CAS操作,不但保证了只执行一次,多线程场景也能保证		if (started.compareAndSet(false, true)) {		        instanceInfo.setIsDirty();  // for initial register		        //提交一个任务,延时执行,注意第一个参数是this,因此延时结束时,InstanceInfoReplicator的run方法会被执行		        Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);		        //这个任务的Feature对象放在成员变量scheduledPeriodicRef中		        scheduledPeriodicRef.set(next);		}}
  • 延时时间到达时,会执行run方法:
public void run() {        try {            //更新信息,用于稍后的上报            discoveryClient.refreshInstanceInfo();            Long dirtyTimestamp = instanceInfo.isDirtyWithTime();            if (dirtyTimestamp != null) {                //上报                discoveryClient.register();                instanceInfo.unsetIsDirty(dirtyTimestamp);            }        } catch (Throwable t) {            logger.warn("There was a problem with the instance info replicator", t);        } finally {            //每次执行完毕都会创建一个延时执行的任务,就这样实现了周期性执行的逻辑            Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);            //每次创建的周期性任务,都要放入scheduledPeriodicRef,            //如果外部调用了onDemandUpdate,就能通过onDemandUpdate取得当前要执行的任务            scheduledPeriodicRef.set(next);        }    }
  • 以上代码汇总起来,就完成了周期性任务的逻辑,接下来看看被外部调用的onDemandUpdate方法:
public boolean onDemandUpdate() {        //没有达到频率限制才会执行        if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {            //提交一个任务            scheduler.submit(new Runnable() {                @Override                public void run() {                    logger.debug("Executing on-demand update of local InstanceInfo");                    //取出之前已经提交的任务                    Future latestPeriodic = scheduledPeriodicRef.get();                    //如果此任务未完成,就立即取消                    if (latestPeriodic != null && !latestPeriodic.isDone()) {                        logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");                        latestPeriodic.cancel(false);                    }					//通过调用run方法,令任务在延时后执行,相当于周期性任务中的一次                    InstanceInfoReplicator.this.run();                }            });            return true;        } else {            //如果超过了设置的频率限制,本次onDemandUpdate方法就提交任务了            logger.warn("Ignoring onDemand update due to rate limiter");            return false;        }    }
  • 如上述代码所示,可见之前注释中提到的功能都已实现;

  • 至此,InstanceInfoReplicator已分析完毕,可见这是个功能强大的辅助类,在应用信息上报到Eureka server时发挥了重要的作用,业务逻辑可以放心的提交上报请求,并发、频率超限等情况都被InstanceInfoReplicator处理好了;

欢迎关注华为云博客:程序员欣宸

学习路上,你不孤单,欣宸原创一路相伴…

原网站

版权声明
本文为[华为云]所创,转载请带上原文链接,感谢
https://bbs.huaweicloud.com/blogs/360409