当前位置:网站首页>ETCD数据库源码分析——集群通信初始化
ETCD数据库源码分析——集群通信初始化
2022-06-25 23:03:00 【肥叔菌】

消息入口
一个etcd节点运行以后,有3个通道接收外界消息,以kv数据的增删改查请求处理为例,介绍这3个通道的工作机制。
- client的http调用:会通过注册到http模块的keysHandler的ServeHTTP方法处理。解析好的消息调用EtcdServer的Do()方法处理。(图中2)
- client的grpc调用:启动时会向grpc server注册quotaKVServer对象,quotaKVServer是以组合的方式增强了kvServer这个数据结构。grpc消息解析完以后会调用kvServer的Range、Put、DeleteRange、Txn、Compact等方法。kvServer中包含有一个RaftKV的接口,由EtcdServer这个结构实现。所以最后就是调用到EtcdServer的Range、Put、DeleteRange、Txn、Compact等方法。(图中1 ETCD数据库源码分析——etcd gRPC 服务 API)
- 节点之间的grpc消息:每个EtcdServer中包含有Transport结构,Transport中会有一个peers的map,每个peer封装了节点到其他某个节点的通信方式。包括streamReader、streamWriter等,用于消息的发送和接收。streamReader中有recvc和propc队列,streamReader处理完接收到的消息会将消息推到这个队列中。由peer去处理,peer调用raftNode的Process方法处理消息。(图中3、4 本文将介绍的)
集群通信服务端
func StartEtcd(inCfg *Config) (e *Etcd, err error) {
// 校验配置inCfg,其实就是config结构体的embed.Config类型的ec成员
serving := false // 标识是否正在提供服务
e = &Etcd{
cfg: *inCfg, stopc: make(chan struct{
})} // 创建Etcd
// 1. 初始化集群内部通信服务端通信所需的peerListener
if e.Peers, err = configurePeerListeners(cfg); err != nil {
return e, err }
// buffer channel so goroutines on closed connections won't wait forever
e.errc = make(chan error, len(e.Peers)+len(e.Clients)+2*len(e.sctxs))
// 2. 启动集群通信服务端
if err = e.servePeers(); err != nil {
return e, err }

初始化集群内部通信服务端通信所需的peerListener
configurePeerListeners函数定义在server/embed/etcd.go文件中,为配置Config中LPUrls每个url创建一个peerListener,如上图中的peerListener结构体所示,该函数为初始化peerListener结构体会调用transport.NewListenerWithOpts函数创建net.Listener,将close函数初始化为func(context.Context) error { return peers[i].Listener.Close() },用于调用net.Listener.Close函数。configurePeerListeners函数并没有初始化peerListener结构体的serve函数。
func configurePeerListeners(cfg *Config) (peers []*peerListener, err error) {
if err = updateCipherSuites(&cfg.PeerTLSInfo, cfg.CipherSuites); err != nil {
return nil, err }
if err = cfg.PeerSelfCert(); err != nil {
cfg.logger.Fatal("failed to get peer self-signed certs", zap.Error(err)) }
if !cfg.PeerTLSInfo.Empty() {
cfg.logger.Info( "starting with peer TLS", zap.String("tls-info", fmt.Sprintf("%+v", cfg.PeerTLSInfo)), zap.Strings("cipher-suites", cfg.CipherSuites),) }
peers = make([]*peerListener, len(cfg.LPUrls)) // 需要为配置Config中的LPUrls每个url申请一个peerListener
defer func() {
// 退出该函数执行的清理函数
if err == nil {
return }
for i := range peers {
if peers[i] != nil && peers[i].close != nil {
cfg.logger.Warn( "closing peer listener", zap.String("address", cfg.LPUrls[i].String()), zap.Error(err),)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
peers[i].close(ctx)
cancel()
}
}
}()
for i, u := range cfg.LPUrls {
if u.Scheme == "http" {
// http协议
if !cfg.PeerTLSInfo.Empty() {
cfg.logger.Warn("scheme is HTTP while key and cert files are present; ignoring key and cert files", zap.String("peer-url", u.String())) }
if cfg.PeerTLSInfo.ClientCertAuth {
cfg.logger.Warn("scheme is HTTP while --peer-client-cert-auth is enabled; ignoring client cert auth for this URL", zap.String("peer-url", u.String())) }
}
peers[i] = &peerListener{
close: func(context.Context) error {
return nil }} // 为配置Config中的LPUrls每个url创建一个peerListener
peers[i].Listener, err = transport.NewListenerWithOpts(u.Host, u.Scheme, transport.WithTLSInfo(&cfg.PeerTLSInfo), transport.WithSocketOpts(&cfg.SocketOpts), transport.WithTimeout(rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout),)
if err != nil {
return nil, err }
// once serve, overwrite with 'http.Server.Shutdown'
peers[i].close = func(context.Context) error {
return peers[i].Listener.Close() }
}
return peers, nil
}
启动集群通信服务端
servePeers函数定义在server/embed/etcd.go文件中,首先NewPeerHandler会调用newPeerHandler(lg,s,s.RaftHandler()...),RaftHandler()会返回s.r.transport.Handler(),最终transport.Handler函数返回了注册了pipelineHandler、streamHandler、snapHandler的mux。对于每个peerListener创建一个协程运行http server,使用pipelineHandler、streamHandler、snapHandler;更新peerListener的close函数,对每个peerListener创建协程运行cmux.Serve(),也就是运行注册configurePeerListeners的transport.NewListenerWithOpts创建Listener的服务。
// configure peer handlers after rafthttp.Transport started
func (e *Etcd) servePeers() (err error) {
ph := etcdhttp.NewPeerHandler(e.GetLogger(), e.Server)
for _, p := range e.Peers {
// 对于每个peerListener创建一个协程运行http server,使用pipelineHandler、streamHandler、snapHandler
u := p.Listener.Addr().String()
m := cmux.New(p.Listener) // 使用configurePeerListeners函数初始化的Listener创建cmx
srv := &http.Server{
Handler: ph, ReadTimeout: 5 * time.Minute, ErrorLog: defaultLog.New(io.Discard, "", 0), } // do not log user error 创建http server
go srv.Serve(m.Match(cmux.Any())) // 创建协程运行http server
p.serve = func() error {
// configurePeerListeners函数并没有初始化peerListener结构体的serve函数,这里进行初始化
e.cfg.logger.Info( "cmux::serve", zap.String("address", u),)
return m.Serve()
}
p.close = func(ctx context.Context) error {
// gracefully shutdown http.Server. close open listeners, idle connections until context cancel or time-out 更新一下close函数,因为这里需要关闭的对象不一样了
e.cfg.logger.Info("stopping serving peer traffic", zap.String("address", u),)
srv.Shutdown(ctx)
e.cfg.logger.Info("stopped serving peer traffic", zap.String("address", u),)
m.Close()
return nil
}
}
for _, pl := range e.Peers {
// start peer servers in a goroutine
go func(l *peerListener) {
// 创建协程运行cmux.Serve()
u := l.Addr().String()
e.cfg.logger.Info("serving peer traffic",zap.String("address", u),
)
e.errHandler(l.serve())
}(pl)
}
return nil
}
集群通信客户端
在初始化EtcdServer流程中运行的etcd.NewServersrv(cfg)函数最终会进行传输层的初始化,这里描述的就是首图中的Transport框图,如下流程主要进行:创建rafthttp.Transport实例、启动rafthttp.Transport实例和向rafthttp.Transport实例中添加集群中各个节点对应的Peer实例和Remote实例操作。详细流程将在后续博客中讲解。
// TODO: move transport initialization near the definition of remote
tr := &rafthttp.Transport{
// 创建rafthttp.Transport实例
Logger: cfg.Logger, TLSInfo: cfg.PeerTLSInfo, DialTimeout: cfg.PeerDialTimeout(),
ID: b.cluster.nodeID, URLs: cfg.PeerURLs, ClusterID: b.cluster.cl.ID(),
Raft: srv, Snapshotter: b.ss, ServerStats: sstats, LeaderStats: lstats,
// 在这里传入的rafthttp.Raft接口实现是EtcdServer实例。EtcdServer对Raft接口的实现比较简单,它会直接将调用委托给底层的raftNode实例
ErrorC: srv.errorc,
}
if err = tr.Start(); err != nil {
return nil, err } // 启动rafthttp.Transport实例
// 向rafthttp.Transport实例中添加集群中各个节点对应的Peer实例和Remote实例
for _, m := range b.cluster.remotes {
// add all remotes into transport
if m.ID != b.cluster.nodeID {
tr.AddRemote(m.ID, m.PeerURLs) }
}
for _, m := range b.cluster.cl.Members() {
if m.ID != b.cluster.nodeID {
tr.AddPeer(m.ID, m.PeerURLs) }
}
srv.r.transport = tr
边栏推荐
- 事物/现象/事情/东西/情况/表象
- Preorder and middle order traversal of forest
- Stream data
- Post ordered clue binary tree
- Endnote IEEE TRANSACTIONS ON INDUSTRIAL ELECTRONICS/TIE/TPEL 参考文献格式模板
- WordPress
- 统一网关Gateway
- Final review [machine learning]
- Chapter VI exercises (678) [microcomputer principles] [exercises]
- Sqlserver is case sensitive
猜你喜欢

随便画画的

Leetcode 513. Find the value in the lower left corner of the tree

Vscode shortcut

Electronic training.

Qt优秀开源项目之九:qTox

FPGA notes -- implementation of FPGA floating point operation

Template engine - FreeMarker first experience

Balanced binary tree AVL

. Net using access 2010 database

"Method not allowed", 405 problem analysis and solution
随机推荐
The maze of God's perspective in robot vision
Megacli common command collation
Establish a j-link GDB cross debugging environment for Px4
计算机网络知识总结(面试)
Optimized three-dimensional space positioning method and its fast implementation in C language
事物/现象/事情/东西/情况/表象
[TSP problem] solving traveling salesman problem based on Hopfield neural network with matlab code
认识map
卡通shader
ASP.NET cache缓存的用法
C thread pool control semaphore
案例:绘制Matplotlib动态图
How product managers control the progress of product development
渗透工具-Burpsuite
Idea view unit test coverage
Permission design = function permission + Data permission
安卓缓存使用工具类
Dynamic verification code
Blob
智慧家——全家具功能