当前位置:网站首页>4、Nacos 配置中心源码解析之 服务端启动
4、Nacos 配置中心源码解析之 服务端启动
2022-07-25 19:50:00 【carl-zhao】
上一篇文章中我们使用 -Dnacos.standalone=true本地启动了 Nacos 服务器,并且可以在 http://localhost:8848/nacos 通过 nacos/nacos 用户名密码就可以访问 nacos 控制页面。下面我们就来大体看一下 Nacos 在启动的时候干了哪些核心的事。
1、Nacos 认证服务
Nacos 中的 nacos-console 项目依赖了与配置中心相关的以下几个模块:
- nacos-config:配置中心项目模块
- nacos-plugin-default-impl:Nacos 插件模块:主要是用户认证授权相关操作,这里使用的 Spring Security 来做安全认证。相关的配置类为:
NacosAuthConfig
1.1 内存数据库加载数据
JVM 启动参数添加 -Dnacos.standalone=true,这个时候在 DynamicDataSource#getDataSource 就会初始化 LocalDataSourceServiceImpl。
DynamicDataSource#getDataSource()
public synchronized DataSourceService getDataSource() {
try {
// Embedded storage is used by default in stand-alone mode
// In cluster mode, external databases are used by default
if (PropertyUtil.isEmbeddedStorage()) {
if (localDataSourceService == null) {
localDataSourceService = new LocalDataSourceServiceImpl();
localDataSourceService.init();
}
return localDataSourceService;
} else {
if (basicDataSourceService == null) {
basicDataSourceService = new ExternalDataSourceServiceImpl();
basicDataSourceService.init();
}
return basicDataSourceService;
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
LocalDataSourceServiceImpl#initialize 会使用 derby 这个内存数据库来保存数据。保存数据地址为:${user.home}/nacos/data/derby-data 。
然后调用 LocalDataSourceServiceImpl#reload 加载 console/resources/META-INF/schema.sql 里面的数据到内存数据库当中。其中就包括我们登陆使用的 nacos/nacos 用户名密码
1.2 nacos plugin 暴露用户认证
在 plugin-default-impl 模块中的com.alibaba.nacos.plugin.auth.impl.controller 包里面包含了以下几个 Http 服务:
PermissionController:权限操作服务:获取所有权限、添加角色权限、删除角色权限等接口RoleController:角色操作服务:获取所有角色、查询角色、添加角色、删除角色等接口UserController:用户操作服务:包括创建用户、删除用户、修改用户、获取所有用户、用户登陆等接口
Nacos 使用的 Spring Seurity 做的权限管理,默认支持 Nacos 类型权限也就是数据库管理权限以及 Ldap 权限管理。默认使用数据库 RBAC 权限管理。 Spring Security Web 配置类为:NacosAuthConfig。
1.3 用户登陆时序图

以上就是 Nacos 登陆认证时序图。用户的具体的信息会缓存到 ConcurrentHashMap 当中。通过 Spring 定时器 NacosUserDetailsServiceImpl#reload 定时从数据库刷新缓存中的数据。因为我们在初始化数据库的时候会调用以下 SQL 语句保存数据到内存数据库 derby 当中。
CREATE TABLE users (
username varchar(50) NOT NULL PRIMARY KEY,
password varchar(500) NOT NULL,
enabled boolean NOT NULL DEFAULT true
);
INSERT INTO users (username, password, enabled) VALUES
('nacos', '$2a$10$EuWPZHzz32dJN7jexM34MOeYirDdFAZm2kuWj7VEOJhhZkDrxfvUu', TRUE);
这条数据就是加载到NacosUserDetailsServiceImpl 的缓存 Map 中,这样我们就可以使用 nacos/nacos 用户名密码进行授权登陆了。
2、暴露 Http 服务给控制台
下面我们来讨论一下 console 服务启动之后暴露了哪些 http 服务给控制台,在这里我们排除 naming 服务只讨论配置服务相关的 rest 服务.
2.1 console 服务
我们先来看一下 console 服务中的 ConsoleConfig 这个配置类。
ConsoleConfig.java
@Component
@EnableScheduling
@PropertySource("/application.properties")
public class ConsoleConfig {
@Autowired
private ControllerMethodsCache methodsCache;
/** * Init. */
@PostConstruct
public void init() {
methodsCache.initClassMethod("com.alibaba.nacos.core.controller");
methodsCache.initClassMethod("com.alibaba.nacos.naming.controllers");
methodsCache.initClassMethod("com.alibaba.nacos.config.server.controller");
methodsCache.initClassMethod("com.alibaba.nacos.console.controller");
}
@Bean
public CorsFilter corsFilter() {
CorsConfiguration config = new CorsConfiguration();
config.setAllowCredentials(true);
config.addAllowedOrigin("*");
config.addAllowedHeader("*");
config.setMaxAge(18000L);
config.addAllowedMethod("*");
UrlBasedCorsConfigurationSource source = new UrlBasedCorsConfigurationSource();
source.registerCorsConfiguration("/**", config);
return new CorsFilter(source);
}
@Bean
public XssFilter xssFilter() {
return new XssFilter();
}
@Bean
public Jackson2ObjectMapperBuilderCustomizer jacksonObjectMapperCustomization() {
return jacksonObjectMapperBuilder -> jacksonObjectMapperBuilder.timeZone(ZoneId.systemDefault().toString());
}
}
首先配置两个 Filter 过滤器:CorsFilter 是为了解决 console 前后端分享接口调用跨域问题;XssFilter 为了解决跨域攻击问题。
ConsoleConfig#init 方法是解析传入包里面的 Controller 中 @RequestMapping 方法缓存 http 请求与 @ReqeustMapping 的方法映射。这样在 Filter 里面就可以过滤掉非法请求。
下面我们来分析一下 console 模块下的 Controller:
HealthController:Nacos 健康的操作,比如:服务健康检测,查看 Config 与 Naming 是否对外服务。NamespaceController:Nacos 命名空间相关的操作,比如:获取所有命名空间信息、获取命名空间详情、创建命名空间、修改命名空间、删除命名空间等。ServerStateController:Nacos 服务状态的操作,比如:查看服务状态
2.2 core 服务
下面我们来分析一下 core 模块下的 Controller:
NacosClusterController:Nacos 集群相关的操作,比如:获取自身节点信息,获取所有节点信息、获取所有节点地址、查看当前节点是否健康等。ServerLoaderController:Nacos 连接相关的操作,比如:获取当前服务的 Client 连接信息、获取当前服务的服务状态、当前服务重新连接客户端、获取当前服务指标等。CoreOpsController:Nacos 基础操作,比如:执行 raft 命令、获取 ID 生成规则相关信息、设置日志级别CoreOpsV2Controller:Nacos 基础操作v2,比如:执行 raft 命令、获取 ID 生成规则相关信息、设置日志级别NacosClusterV2Controller:Nacos 集群相关的操作v2,比如:获取自身节点信息,获取所有节点信息、获取所有节点地址等。
2.3 config 服务
下面我们来分析一下 config 模块下的 Controller:
CapacityController:获取租户的配置能力、修改租户的配置能力ClientMetricsController:获取配置中心集群的指标信息、获取当前节点所有客户端的指标CommunicationController:把指定的配置保存到本地文件当中、获取 grpc 连接信息、获取 http 长轮训状态ConfigController:发布配置信息、获取配置信息、删除配置信息、查询配置信息等ConfigOpsController:把当前所有配置保存到本地文件当中、设置日志级别、使用 derby 内存数据库动态传入 SQL 操作、通过文件动态上传配置HealthController:判断当前节点配置中心的健康情况HistoryController:分页查询配置的历史记录、ID查询配置的历史记录详情、ID查询配置的上一个历史记录详情、根据命名空间查询配置的历史记录ListenerController:根据 IP 获取所有的订单信息
3、暴露 Grpc 服务给客户端
在 console 模块启动的时候会启动两个 GRPC 服务端:一个是 SDK 使用;一个 Cluster 使用。它们两个暴露的服务都是一样的只不过是暴露的端口不一样:
- SDK 服务:暴露端口 9848,Nacos 的启动端口 8848 + 1000(
GrpcSdkServer#rpcPortOffset), - Cluster 服务:暴露端口 9849,Nacos 的启动端口 8848 + 1001(
GrpcClusterServer#rpcPortOffset),
GRPC 类结构如下:

GrpcClusterServer:Cluster 偏移端口、获取 Cluster RPC 线程池GrpcSdkServer:Sdk 偏移端口、获取 Sdk RPC 线程池BaseGrpcServer:服务启动的具体实现BaseRpcServer:RPC 基类实现,注册 Payload(GRPC请求响应类)、RPC服务启动
3.1 注册 Payload
Nacos 暴露的 Grpc 服务的请求参数与响应参数都基类都是 Payload,在基类 BaseRpcServer 中通过 static 代码块注册 Payload。
BaseRpcServer.java
static {
PayloadRegistry.init();
}
通过 Java 的 SPI 机制分别在 client 模块加载到 ClientPayloadPackageProvider 以及 config 加载到 ConfigPayloadPackageProvider。
client/META-INF/services/com.alibaba.nacos.common.remote.PayloadPackageProvider
public class ClientPayloadPackageProvider implements PayloadPackageProvider {
private final Set<String> scanPackage = new HashSet<>();
{
scanPackage.add("com.alibaba.nacos.api.naming.remote.request");
scanPackage.add("com.alibaba.nacos.api.remote.request");
scanPackage.add("com.alibaba.nacos.api.config.remote.request");
scanPackage.add("com.alibaba.nacos.api.naming.remote.response");
scanPackage.add("com.alibaba.nacos.api.config.remote.response");
scanPackage.add("com.alibaba.nacos.api.remote.response");
}
@Override
public Set<String> getScanPackage() {
return scanPackage;
}
}
下面是 config 的配置:
config/META-INF/services/com.alibaba.nacos.common.remote.PayloadPackageProvider
public class ConfigPayloadPackageProvider implements PayloadPackageProvider {
private final Set<String> scanPackage = new HashSet<>();
{
scanPackage.add("com.alibaba.nacos.api.remote.request");
scanPackage.add("com.alibaba.nacos.api.remote.response");
scanPackage.add("com.alibaba.nacos.api.config.remote.request");
scanPackage.add("com.alibaba.nacos.api.config.remote.response");
}
@Override
public Set<String> getScanPackage() {
return scanPackage;
}
}
然后把这两个类所定义扫描的包以类名以及类 Class 注册到 PayloadRegistry.REGISTRY_REQUEST 当中。当进行 GRPC 调用的时候提供给 GrpcUtils 进行请求响应对象转换。
3 .2 启动 grpc 服务
BaseRpcServer.java
@PostConstruct
public void start() throws Exception {
String serverName = getClass().getSimpleName();
Loggers.REMOTE.info("Nacos {} Rpc server starting at port {}", serverName, getServicePort());
startServer();
Loggers.REMOTE.info("Nacos {} Rpc server started at port {}", serverName, getServicePort());
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
Loggers.REMOTE.info("Nacos {} Rpc server stopping", serverName);
try {
BaseRpcServer.this.stopServer();
Loggers.REMOTE.info("Nacos {} Rpc server stopped successfully...", serverName);
} catch (Exception e) {
Loggers.REMOTE.error("Nacos {} Rpc server stopped fail...", serverName, e);
}
}));
}
这个类里面的逻辑比较简单:
- 定义了服务启动接口
startServer(),由子类实现 - 注册一个服务停止后调用的钩子函数
stopServer()
BaseGrpcServer.java
@Override
public void startServer() throws Exception {
final MutableHandlerRegistry handlerRegistry = new MutableHandlerRegistry();
// #1
ServerInterceptor serverInterceptor = new ServerInterceptor() {
@Override
public <T, S> ServerCall.Listener<T> interceptCall(ServerCall<T, S> call, Metadata headers,
ServerCallHandler<T, S> next) {
Context ctx = Context.current()
.withValue(CONTEXT_KEY_CONN_ID, call.getAttributes().get(TRANS_KEY_CONN_ID))
.withValue(CONTEXT_KEY_CONN_REMOTE_IP, call.getAttributes().get(TRANS_KEY_REMOTE_IP))
.withValue(CONTEXT_KEY_CONN_REMOTE_PORT, call.getAttributes().get(TRANS_KEY_REMOTE_PORT))
.withValue(CONTEXT_KEY_CONN_LOCAL_PORT, call.getAttributes().get(TRANS_KEY_LOCAL_PORT));
if (REQUEST_BI_STREAM_SERVICE_NAME.equals(call.getMethodDescriptor().getServiceName())) {
Channel internalChannel = getInternalChannel(call);
ctx = ctx.withValue(CONTEXT_KEY_CHANNEL, internalChannel);
}
return Contexts.interceptCall(ctx, call, headers, next);
}
};
// #2
addServices(handlerRegistry, serverInterceptor);
// #3
server = ServerBuilder.forPort(getServicePort()).executor(getRpcExecutor())
.maxInboundMessageSize(getInboundMessageSize()).fallbackHandlerRegistry(handlerRegistry)
.compressorRegistry(CompressorRegistry.getDefaultInstance())
.decompressorRegistry(DecompressorRegistry.getDefaultInstance())
.addTransportFilter(new ServerTransportFilter() {
@Override
public Attributes transportReady(Attributes transportAttrs) {
InetSocketAddress remoteAddress = (InetSocketAddress) transportAttrs
.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
InetSocketAddress localAddress = (InetSocketAddress) transportAttrs
.get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR);
int remotePort = remoteAddress.getPort();
int localPort = localAddress.getPort();
String remoteIp = remoteAddress.getAddress().getHostAddress();
Attributes attrWrapper = transportAttrs.toBuilder()
.set(TRANS_KEY_CONN_ID, System.currentTimeMillis() + "_" + remoteIp + "_" + remotePort)
.set(TRANS_KEY_REMOTE_IP, remoteIp).set(TRANS_KEY_REMOTE_PORT, remotePort)
.set(TRANS_KEY_LOCAL_PORT, localPort).build();
String connectionId = attrWrapper.get(TRANS_KEY_CONN_ID);
Loggers.REMOTE_DIGEST.info("Connection transportReady,connectionId = {} ", connectionId);
return attrWrapper;
}
@Override
public void transportTerminated(Attributes transportAttrs) {
String connectionId = null;
try {
connectionId = transportAttrs.get(TRANS_KEY_CONN_ID);
} catch (Exception e) {
// Ignore
}
if (StringUtils.isNotBlank(connectionId)) {
Loggers.REMOTE_DIGEST
.info("Connection transportTerminated,connectionId = {} ", connectionId);
connectionManager.unregister(connectionId);
}
}
}).build();
// #4
server.start();
}
- #1 其实是定义一个服务器拦截器,用于设置
connection id等信息 - #2 往 MutableHandlerRegistry 动态添加 GRPC 方法定义
- #3 通过上面的参数构建 GRPC 的 Server。
- #4 启动 GRPC 服务
下面我们来看一下 GRPC 是动态添加 GRPC 方法到 Server 里面的。
BaseGrpcServer.java
private void addServices(MutableHandlerRegistry handlerRegistry, ServerInterceptor... serverInterceptor) {
// unary common call register.
final MethodDescriptor<Payload, Payload> unaryPayloadMethod = MethodDescriptor.<Payload, Payload>newBuilder()
.setType(MethodDescriptor.MethodType.UNARY)
.setFullMethodName(MethodDescriptor.generateFullMethodName(REQUEST_SERVICE_NAME, REQUEST_METHOD_NAME))
.setRequestMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance()))
.setResponseMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance())).build();
final ServerCallHandler<Payload, Payload> payloadHandler = ServerCalls
.asyncUnaryCall((request, responseObserver) -> grpcCommonRequestAcceptor.request(request, responseObserver));
final ServerServiceDefinition serviceDefOfUnaryPayload = ServerServiceDefinition.builder(REQUEST_SERVICE_NAME)
.addMethod(unaryPayloadMethod, payloadHandler).build();
handlerRegistry.addService(ServerInterceptors.intercept(serviceDefOfUnaryPayload, serverInterceptor));
// bi stream register.
final ServerCallHandler<Payload, Payload> biStreamHandler = ServerCalls.asyncBidiStreamingCall(
(responseObserver) -> grpcBiStreamRequestAcceptor.requestBiStream(responseObserver));
final MethodDescriptor<Payload, Payload> biStreamMethod = MethodDescriptor.<Payload, Payload>newBuilder()
.setType(MethodDescriptor.MethodType.BIDI_STREAMING).setFullMethodName(MethodDescriptor
.generateFullMethodName(REQUEST_BI_STREAM_SERVICE_NAME, REQUEST_BI_STREAM_METHOD_NAME))
.setRequestMarshaller(ProtoUtils.marshaller(Payload.newBuilder().build()))
.setResponseMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance())).build();
final ServerServiceDefinition serviceDefOfBiStream = ServerServiceDefinition
.builder(REQUEST_BI_STREAM_SERVICE_NAME).addMethod(biStreamMethod, biStreamHandler).build();
handlerRegistry.addService(ServerInterceptors.intercept(serviceDefOfBiStream, serverInterceptor));
}
在这个方法当中 GRPC 定义了两个方法:
MethodType.UNARY类型的方法,请求参数与响应参数都是 Payload,然后方法名为Request/request。通过GrpcRequestAcceptor类来处理请求。MethodType.BIDI_STREAMING类型的方法,请求参数与响应参数都是 Payload,然后方法名为BiRequestStream/requestBiStream。通过GrpcBiStreamRequestAcceptor来处理请求
边栏推荐
- C merge set
- An idea of solving div adapting to screen
- Wxss template style and WXS scripting language for wechat applet development
- [artifact] screenshot + mapping tool snipaste
- When the V100 of mindpole 8 card is trained to 101 epochs, an error of reading data timeout is reported
- Mutual conversion of camera internal parameter matrix K and FOV
- 高端旗舰投影仪选购指南:当贝X3 Pro、当贝F5观影更沉浸!
- 安全基础6 ---漏洞复现
- ML的编程技巧:
- The query data returned by the print database is null or the default value. Does not match the value returned by the database
猜你喜欢
![[server data recovery] a data recovery case of a brand ProLiant server raid paralysis, database file loss, and database file backup damage](/img/89/92ace2f76beefd258d00d26cd921c9.png)
[server data recovery] a data recovery case of a brand ProLiant server raid paralysis, database file loss, and database file backup damage

微信小程序10-微搭模板

A good way to generate interface documents efficiently

C merge set

Legal mix of collations for operation 'Union' (bug record)

Software designer afternoon real topic: 2009-2022

Mobile phone touch picture slider rotation plug-in photoswipe.js
![Scala foundation [set 01]](/img/6b/0f5da7ea923ef3aa436d7af9c4425c.png)
Scala foundation [set 01]

Wechat applet 10 - wechat template

Beihang and other "deep learning event extraction" literature review paper, 27 page PDF describes the current trend
随机推荐
LP dual currency pledge liquidity mining DAPP system development logic analysis
Binarysearch basic binary search
Siemens PLM Teamcenter download, installation and use tutorial
Security foundation 6 - vulnerability recurrence
Bash does not add single quotes to your string
what is qml in qt
高效生成接口文档好方法
Deeplobv1 and V2
NPM semantic version control, solution console prop being mutated: "placement" error
Creative drop-down multi choice JS plug-in download
High number_ Chapter 3 learning experience and summary of multiple integral
[CSAPP Practice Problem 2.32] tsub_ OK (int x, int y) judge whether complement subtraction overflows
tiktok如何破零播放?
Hongke shares | how to solve blackmail software security vulnerabilities
Is there a "fingerprint" in the structure of AAAI 2022 | Gan? Generating network structure from forged image traceability
Siemens-PLM-TeamCenter下载、安装、使用教程
Split very long line of words into separate lines of max length
C merge set
Introduction to web security ICMP testing and defense
Hash undirected graph visualization