当前位置:网站首页>Nacos source code - configure automatic update

Nacos source code - configure automatic update

2022-06-24 11:16:00 prepared

Nacos It is Alibaba's open source centralized distributed configuration center 、 Distributed registry as one distributed solution .

Its advantages :

  1. Provide command space , It is convenient to manage the configuration of different environments ;
  2. Provide web Interface , Easy to manage configurations and services ;
  3. Support configuration version management , Roll back ;
  4. Support service management , Go online manually 、 Offline service .

Other advantages .

1 How to use Nacos Automatically update configuration

1.1 There are two ways to configure automatic updates

The first way

  1. Attributes use @Value annotation
  2. Class uses @RefreshScope annotation
@RefreshScope
@RequestMapping("config")
public class ConfigController {

    @Value("${useLocalCache:false}")
    private boolean useLocalCache;
}

The second way

  1. Use @NacosValue annotation , Automatic updates are configured to true
@Controller
@RequestMapping("config")
public class ConfigController {

    @NacosValue(value = "${useLocalCache:false}", autoRefreshed = true)
    private boolean useLocalCache;

    @RequestMapping(value = "/get", method = GET)
    @ResponseBody
    public boolean get() {
        return useLocalCache;
    }
}

2 Nacos Configuration update source code analysis

First map

Nacos Configure the update flowchart

Specific steps

1、 First step ( Update the database ) Relatively simple , It is to judge whether to add or modify the configuration , Then modify or add database response information ( According to the database used is embedded derby still mysql Using different implementations ).

2、 More complicated is the notification ( Update files in the file system )

2.1 adopt Publish subscribe mode , Publish configuration change events

2.2 After the subscriber receives the message , call controller request (communication/dataChange)

2.3 This controller request , Start a Asynchronous task , This asynchronous task updates the configuration data to the specified configuration file (nacos Under the table of contents )

After through Release - A subscription model Notify other services , After other services receive the notification , Update configuration .

PS:

1、controller request http://ip:port/nacos/v1/cs/communication/dataChange?dataId=example&group=DEFAULT_GROUP

2、 The configuration store has two places , One is in the file system , The other is in the configured database ( It may be embedded derby Database and MySQL database )

3、 Some basic configuration information , For example, the configuration file MD5 value 、dataId、groupName etc. , Are saved in ConcurrentHashMap Stored cache

Source code 1: Add asynchronous task update profile code

/**
 * Add DumpTask to TaskManager, it will execute asynchronously.
 */
public void dump(String dataId, String group, String tenant, long lastModified, String handleIp, boolean isBeta) {
    String groupKey = GroupKey2.getKey(dataId, group, tenant);
    String taskKey = String.join("+", dataId, group, tenant, String.valueOf(isBeta));
    dumpTaskMgr.addTask(taskKey, new DumpTask(groupKey, lastModified, handleIp, isBeta));
    DUMP_LOG.info("[dump-task] add task. groupKey={}, taskKey={}", groupKey, taskKey);
}

And then through persistService Read the corresponding records in the database

Source code 2: Read the latest configuration data in the database

ConfigInfo cf = persistService.findConfigInfo(dataId, group, tenant);

What will be read , Write to local file nacos/distribution/target/nacos-server-1.4.2/nacos/data/config-data/${GROUP_NAME}/${dataId} in , And update the configuration file md5 value .

Source code 3: Save the configuration file to the file directory , And update the file MD5 value

/**
 * Save config file and update md5 value in cache.
 *
 * @param dataId         dataId string value.
 * @param group          group string value.
 * @param tenant         tenant string value.
 * @param content        content string value.
 * @param lastModifiedTs lastModifiedTs.
 * @param type           file type.
 * @return dumpChange success or not.
 */
public static boolean dump(String dataId, String group, String tenant, String content, long lastModifiedTs,
        String type) {
    String groupKey = GroupKey2.getKey(dataId, group, tenant);
    CacheItem ci = makeSure(groupKey);
    ci.setType(type);
    final int lockResult = tryWriteLock(groupKey);
    assert (lockResult != 0);
    
    if (lockResult < 0) {
        DUMP_LOG.warn("[dump-error] write lock failed. {}", groupKey);
        return false;
    }
    
    try {
        final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);
        
        if (md5.equals(ConfigCacheService.getContentMd5(groupKey))) {
            DUMP_LOG.warn("[dump-ignore] ignore to save cache file. groupKey={}, md5={}, lastModifiedOld={}, "
                            + "lastModifiedNew={}", groupKey, md5, ConfigCacheService.getLastModifiedTs(groupKey),
                    lastModifiedTs);
        } else if (!PropertyUtil.isDirectRead()) {
        	//  Save data to a file 
            DiskUtil.saveToDisk(dataId, group, tenant, content);
        }
        updateMd5(groupKey, md5, lastModifiedTs);
        return true;
    } catch (IOException ioe) {
        DUMP_LOG.error("[dump-exception] save disk error. " + groupKey + ", " + ioe.toString(), ioe);
        if (ioe.getMessage() != null) {
            String errMsg = ioe.getMessage();
            if (NO_SPACE_CN.equals(errMsg) || NO_SPACE_EN.equals(errMsg) || errMsg.contains(DISK_QUATA_CN) || errMsg
                    .contains(DISK_QUATA_EN)) {
                // Protect from disk full.
                FATAL_LOG.error(" Disk full suicide exit ", ioe);
                System.exit(0);
            }
        }
        return false;
    } finally {
        releaseWriteLock(groupKey);
    }
}

/**
 * Save configuration information to disk.
 */
public static void saveToDisk(String dataId, String group, String tenant, String content) throws IOException {
    File targetFile = targetFile(dataId, group, tenant);
    FileUtils.writeStringToFile(targetFile, content, Constants.ENCODE);
}

3 Source code highlights

3.1 Publish subscribe mode

The publish subscribe mode is used in scenarios where multiple subscribers need to be notified when an event occurs . In development , It can also have different names , You should be able to recognize , such as :Subject-Observer( Classic example )、Publisher-Subscriber(Nacos)、Producer-Consumer(Kafka)、Dispatcher-Listener.

Issue change event

/**
 * Request publisher publish event Publishers load lazily, calling publisher.
 *
 * @param eventType class Instances type of the event type.
 * @param event     event instance.
 */
private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {
    if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {
        return INSTANCE.sharePublisher.publish(event);
    }
    
    final String topic = ClassUtils.getCanonicalName(eventType);
    
    EventPublisher publisher = INSTANCE.publisherMap.get(topic);
    if (publisher != null) {
        return publisher.publish(event);
    }
    LOGGER.warn("There are no [{}] publishers for this event, please register", topic);
    return false;
}

Circular subscriber list , Notify subscribers .

/**
 * Receive and notifySubscriber to process the event.
 *
 * @param event {@link Event}.
 */
void receiveEvent(Event event) {
    final long currentEventSequence = event.sequence();
    
    // Notification single event listener
    for (Subscriber subscriber : subscribers) {
        // Whether to ignore expiration events
        if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) {
            LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire",
                    event.getClass());
            continue;
        }
        
        // Because unifying smartSubscriber and subscriber, so here need to think of compatibility.
        // Remove original judge part of codes.
        notifySubscriber(subscriber, event);
    }
}
    

Process subscriber tasks through the thread pool , Because here the subscriber needs to call HTTP Request to process updates , therefore You can decouple through the thread pool .

@Override
public void notifySubscriber(final Subscriber subscriber, final Event event) {
    
    LOGGER.debug("[NotifyCenter] the {} will received by {}", event, subscriber);
    
    final Runnable job = new Runnable() {
        @Override
        public void run() {
            subscriber.onEvent(event);
        }
    };
    
    final Executor executor = subscriber.executor();
    
    if (executor != null) {
        executor.execute(job);
    } else {
        try {
            job.run();
        } catch (Throwable e) {
            LOGGER.error("Event callback exception : {}", e);
        }
    }
}

3.2 Task manager

There are competing resources — Single profile , That is to be sure Group Under the dataId File or database records .

  1. Failure to retry
/**
 * process tasks in execute engine.
 */
protected void processTasks() {
    Collection<Object> keys = getAllTaskKeys();
    for (Object taskKey : keys) {
        AbstractDelayTask task = removeTask(taskKey);
        if (null == task) {
            continue;
        }
        NacosTaskProcessor processor = getProcessor(taskKey);
        if (null == processor) {
            getEngineLog().error("processor not found for task, so discarded. " + task);
            continue;
        }
        try {
            // ReAdd task if process failed
            if (!processor.process(task)) {
                retryFailedTask(taskKey, task);
            }
        } catch (Throwable e) {
            getEngineLog().error("Nacos task execute error : " + e.toString(), e);
            retryFailedTask(taskKey, task);
        }
    }
}

3.3. Add task processing concurrency

Using the reentry lock ReentrantLock

protected final ReentrantLock lock = new ReentrantLock();

@Override
public void addTask(Object key, AbstractDelayTask newTask) {
    lock.lock();
    try {
        AbstractDelayTask existTask = tasks.get(key);
        if (null != existTask) {
            newTask.merge(existTask);
        }
        tasks.put(key, newTask);
    } finally {
        lock.unlock();
    }
}
  1. Cache task processor objects , When necessary, it can be directly obtained through the local cache
private final ConcurrentHashMap<Object, NacosTaskProcessor> taskProcessors = new ConcurrentHashMap<Object, NacosTaskProcessor>();

3.4 Multiple implementations of one interface , Choose... According to the conditions

If you want to implement a variety of different database operations ,Condition It's very useful , For example, the embedded derby database , Also support MySQL, adopt Condition You can use different databases to implement .

Some business systems in traditional enterprises need to support a variety of different databases , Different customer sites may use different databases , In this way, customization can be reduced 、 Reduce the temporary customized development due to different customer databases on site .

@Conditional(value = ConditionOnEmbeddedStorage.class)
@Component
public class EmbeddedStoragePersistServiceImpl implements PersistService {
@Conditional(value = ConditionOnExternalStorage.class)
@Component
public class ExternalStoragePersistServiceImpl implements PersistService {

If this article helps you , Welcome to my official account. 【 Brother and girl 】 , Take you deep JAVA The world of ~

原网站

版权声明
本文为[prepared]所创,转载请带上原文链接,感谢
https://yzsam.com/2021/06/20210604151113180h.html