当前位置:网站首页>Eureka's timedsupersortask class (periodic task with automatic interval adjustment)

Eureka's timedsupersortask class (periodic task with automatic interval adjustment)

2022-06-21 08:05:00 InfoQ

Welcome to visit mine GitHub

Here we classify and summarize all the original works of Xinchen ( Including supporting source code ):
https://github.com/zq2599/blog_demos

cause

  • One is based on Spring Cloud Application of framework , If registered to Eureka server, Then it will update the service list regularly , The code to start this scheduled task is in com.netflix.discovery.DiscoveryClient Class initScheduledTasks In the method , as follows ( From Engineering eureka-client, edition 1.7.0):

private void initScheduledTasks() {
 // Update service list
 if (clientConfig.shouldFetchRegistry()) {
 // registry cache refresh timer
 int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
 int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
 scheduler.schedule(
 new TimedSupervisorTask(
 "cacheRefresh",
 scheduler,
 cacheRefreshExecutor,
 registryFetchIntervalSeconds,
 TimeUnit.SECONDS,
 expBackOffBound,
 new CacheRefreshThread()
 ),
 registryFetchIntervalSeconds, TimeUnit.SECONDS);
 }
 ...
 // Leave out the rest of the code

  • In the above code ,scheduler yes ScheduledExecutorService Interface implementation , Its schedule Methodical
    Official documents
    As shown below :

null
  • The red box above shows : This method creates
    One time task
    , But in actual tests , If in CacheRefreshThread Class run Method , You will find that this method will be called periodically ;
  • So the question arises : Method
    schedule(Callable<V> callable,long delay,TimeUnit unit)
    The creation is clearly a
    One time task
    , but CacheRefreshThread By
    Periodic execution
    ;

To find the answer

  • The open run Method source code , Please pay attention to the following Chinese Notes :

public void run() {
 Future future = null;
 try {
 // Use Future, You can set the timeout for the child thread , In this way, the current thread does not have to wait infinitely
 future = executor.submit(task);
 threadPoolLevelGauge.set((long) executor.getActiveCount());
 // Specifies the maximum time to wait for a child thread
 future.get(timeoutMillis, TimeUnit.MILLISECONDS); // block until done or timeout
 //delay Is a very useful variable , We'll use that later , Remember here that every successful mission will delay Reset
 delay.set(timeoutMillis);
 threadPoolLevelGauge.set((long) executor.getActiveCount());
 } catch (TimeoutException e) {
 logger.error(&quot;task supervisor timed out&quot;, e);
 timeoutCounter.increment();

 long currentDelay = delay.get();
 // When the task thread times out , Just put delay Double the variable , But it will not exceed the maximum delay time set during external call
 long newDelay = Math.min(maxDelay, currentDelay * 2);
 // Set to the latest value , Consider multithreading , So use the CAS
 delay.compareAndSet(currentDelay, newDelay);
 } catch (RejectedExecutionException e) {
 // Once the blocking queue of the thread pool is full of pending tasks , A rejection policy was triggered , Will stop the scheduler
 if (executor.isShutdown() || scheduler.isShutdown()) {
 logger.warn(&quot;task supervisor shutting down, reject the task&quot;, e);
 } else {
 logger.error(&quot;task supervisor rejected the task&quot;, e);
 }

 rejectedCounter.increment();
 } catch (Throwable e) {
 // Once an unknown exception occurs , Stop the dispatcher
 if (executor.isShutdown() || scheduler.isShutdown()) {
 logger.warn(&quot;task supervisor shutting down, can't accept the task&quot;);
 } else {
 logger.error(&quot;task supervisor threw an exception&quot;, e);
 }

 throwableCounter.increment();
 } finally {
 // Here the mission is either done , Or something goes wrong , Use both cancel Methods to clean up tasks ;
 if (future != null) {
 future.cancel(true);
 }
 
 // As long as the scheduler doesn't stop , Then specify the waiting time and perform the same task again
 if (!scheduler.isShutdown()) {
 // Here's the reason for periodic tasks : As long as the scheduler is not stopped , Then create a one-time task , When the execution time is dealy Value ,
 // Suppose that the timeout passed in during an external call is 30 second ( Input parameters of construction method timeout), The maximum interval time is 50 second ( Input parameters of construction method expBackOffBound)
 // If the last task did not time out , Then in 30 Start a new mission in seconds ,
 // If the last task timed out , Then in 50 Start a new mission in seconds ( There is a double operation in exception handling , Multiply by two 60 Seconds exceeded the maximum interval 50 second )
 scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
 }
 }
 }

  • The truth is in the last line of code above :
    scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS)
    : After performing the task , Will call... Again schedule Method , Perform the same task after a specified time , This interval depends on whether the last task timed out , If it goes out of time, the interval will be larger ;
  • Summary : On the whole ,TimedSupervisorTask It's a periodic task at regular intervals , Once the timeout occurs, the interval time of the next cycle will be increased , If you continuously time out , So every time the interval is doubled , Until it reaches the upper limit set by the external parameters , Once the new task no longer times out , The interval time will automatically return to the initial value , And then there is CAS To control multithreading synchronization , Simple code , Ingenious design , It's worth learning ;

Welcome to your attention InfoQ: Xinchen, programmer

On the way to study , You are not alone , Xinchen's original works are accompanied all the way ...
原网站

版权声明
本文为[InfoQ]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/172/202206210754464577.html