当前位置:网站首页>ETCD数据库源码分析——集群通信初始化

ETCD数据库源码分析——集群通信初始化

2022-06-25 23:03:00 肥叔菌

在这里插入图片描述
消息入口
一个etcd节点运行以后,有3个通道接收外界消息,以kv数据的增删改查请求处理为例,介绍这3个通道的工作机制。

  1. client的http调用:会通过注册到http模块的keysHandler的ServeHTTP方法处理。解析好的消息调用EtcdServer的Do()方法处理。(图中2)
  2. client的grpc调用:启动时会向grpc server注册quotaKVServer对象,quotaKVServer是以组合的方式增强了kvServer这个数据结构。grpc消息解析完以后会调用kvServer的Range、Put、DeleteRange、Txn、Compact等方法。kvServer中包含有一个RaftKV的接口,由EtcdServer这个结构实现。所以最后就是调用到EtcdServer的Range、Put、DeleteRange、Txn、Compact等方法。(图中1 ETCD数据库源码分析——etcd gRPC 服务 API)
  3. 节点之间的grpc消息:每个EtcdServer中包含有Transport结构,Transport中会有一个peers的map,每个peer封装了节点到其他某个节点的通信方式。包括streamReader、streamWriter等,用于消息的发送和接收。streamReader中有recvc和propc队列,streamReader处理完接收到的消息会将消息推到这个队列中。由peer去处理,peer调用raftNode的Process方法处理消息。(图中3、4 本文将介绍的)

集群通信服务端

func StartEtcd(inCfg *Config) (e *Etcd, err error) {
    
	// 校验配置inCfg,其实就是config结构体的embed.Config类型的ec成员
	serving := false // 标识是否正在提供服务
	e = &Etcd{
    cfg: *inCfg, stopc: make(chan struct{
    })} // 创建Etcd
	// 1. 初始化集群内部通信服务端通信所需的peerListener
	if e.Peers, err = configurePeerListeners(cfg); err != nil {
     return e, err }	
	// buffer channel so goroutines on closed connections won't wait forever
	e.errc = make(chan error, len(e.Peers)+len(e.Clients)+2*len(e.sctxs))
	// 2. 启动集群通信服务端
	if err = e.servePeers(); err != nil {
     return e, err }	

在这里插入图片描述
初始化集群内部通信服务端通信所需的peerListener
configurePeerListeners函数定义在server/embed/etcd.go文件中,为配置Config中LPUrls每个url创建一个peerListener,如上图中的peerListener结构体所示,该函数为初始化peerListener结构体会调用transport.NewListenerWithOpts函数创建net.Listener,将close函数初始化为func(context.Context) error { return peers[i].Listener.Close() },用于调用net.Listener.Close函数。configurePeerListeners函数并没有初始化peerListener结构体的serve函数。

func configurePeerListeners(cfg *Config) (peers []*peerListener, err error) {
    
	if err = updateCipherSuites(&cfg.PeerTLSInfo, cfg.CipherSuites); err != nil {
     return nil, err }
	if err = cfg.PeerSelfCert(); err != nil {
     cfg.logger.Fatal("failed to get peer self-signed certs", zap.Error(err)) }
	if !cfg.PeerTLSInfo.Empty() {
     cfg.logger.Info( "starting with peer TLS", zap.String("tls-info", fmt.Sprintf("%+v", cfg.PeerTLSInfo)), zap.Strings("cipher-suites", cfg.CipherSuites),) }

	peers = make([]*peerListener, len(cfg.LPUrls)) // 需要为配置Config中的LPUrls每个url申请一个peerListener
	defer func() {
      // 退出该函数执行的清理函数
		if err == nil {
     return }
		for i := range peers {
    
			if peers[i] != nil && peers[i].close != nil {
    
				cfg.logger.Warn( "closing peer listener", zap.String("address", cfg.LPUrls[i].String()), zap.Error(err),)
				ctx, cancel := context.WithTimeout(context.Background(), time.Second)
				peers[i].close(ctx)
				cancel()
			}
		}
	}()

	for i, u := range cfg.LPUrls {
    
		if u.Scheme == "http" {
       // http协议
			if !cfg.PeerTLSInfo.Empty() {
     cfg.logger.Warn("scheme is HTTP while key and cert files are present; ignoring key and cert files", zap.String("peer-url", u.String())) }
			if cfg.PeerTLSInfo.ClientCertAuth {
     cfg.logger.Warn("scheme is HTTP while --peer-client-cert-auth is enabled; ignoring client cert auth for this URL", zap.String("peer-url", u.String())) }
		}
		peers[i] = &peerListener{
    close: func(context.Context) error {
     return nil }} // 为配置Config中的LPUrls每个url创建一个peerListener
		peers[i].Listener, err = transport.NewListenerWithOpts(u.Host, u.Scheme, transport.WithTLSInfo(&cfg.PeerTLSInfo), transport.WithSocketOpts(&cfg.SocketOpts), transport.WithTimeout(rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout),)
		if err != nil {
     return nil, err }
		// once serve, overwrite with 'http.Server.Shutdown'
		peers[i].close = func(context.Context) error {
     return peers[i].Listener.Close() }
	}
	return peers, nil
}

启动集群通信服务端
servePeers函数定义在server/embed/etcd.go文件中,首先NewPeerHandler会调用newPeerHandler(lg,s,s.RaftHandler()...),RaftHandler()会返回s.r.transport.Handler(),最终transport.Handler函数返回了注册了pipelineHandler、streamHandler、snapHandler的mux。对于每个peerListener创建一个协程运行http server,使用pipelineHandler、streamHandler、snapHandler;更新peerListener的close函数,对每个peerListener创建协程运行cmux.Serve(),也就是运行注册configurePeerListeners的transport.NewListenerWithOpts创建Listener的服务。

// configure peer handlers after rafthttp.Transport started
func (e *Etcd) servePeers() (err error) {
    
	ph := etcdhttp.NewPeerHandler(e.GetLogger(), e.Server)

	for _, p := range e.Peers {
      // 对于每个peerListener创建一个协程运行http server,使用pipelineHandler、streamHandler、snapHandler
		u := p.Listener.Addr().String()
		m := cmux.New(p.Listener)  // 使用configurePeerListeners函数初始化的Listener创建cmx
		srv := &http.Server{
     Handler:     ph, ReadTimeout: 5 * time.Minute, ErrorLog:    defaultLog.New(io.Discard, "", 0),  } // do not log user error 创建http server
		go srv.Serve(m.Match(cmux.Any())) // 创建协程运行http server
		p.serve = func() error {
      // configurePeerListeners函数并没有初始化peerListener结构体的serve函数,这里进行初始化
			e.cfg.logger.Info( "cmux::serve", zap.String("address", u),)
			return m.Serve()
		}
		p.close = func(ctx context.Context) error {
     // gracefully shutdown http.Server. close open listeners, idle connections until context cancel or time-out 更新一下close函数,因为这里需要关闭的对象不一样了
			e.cfg.logger.Info("stopping serving peer traffic", zap.String("address", u),)
			srv.Shutdown(ctx)
			e.cfg.logger.Info("stopped serving peer traffic", zap.String("address", u),)
			m.Close()
			return nil
		}
	}
	
	for _, pl := range e.Peers {
     // start peer servers in a goroutine
		go func(l *peerListener) {
      // 创建协程运行cmux.Serve()
			u := l.Addr().String()
			e.cfg.logger.Info("serving peer traffic",zap.String("address", u),
			)
			e.errHandler(l.serve())
		}(pl)
	}
	return nil
}

集群通信客户端

在初始化EtcdServer流程中运行的etcd.NewServersrv(cfg)函数最终会进行传输层的初始化,这里描述的就是首图中的Transport框图,如下流程主要进行:创建rafthttp.Transport实例、启动rafthttp.Transport实例和向rafthttp.Transport实例中添加集群中各个节点对应的Peer实例和Remote实例操作。详细流程将在后续博客中讲解。

	// TODO: move transport initialization near the definition of remote
	tr := &rafthttp.Transport{
      // 创建rafthttp.Transport实例
		Logger:      cfg.Logger, TLSInfo:     cfg.PeerTLSInfo, DialTimeout: cfg.PeerDialTimeout(),
		ID:          b.cluster.nodeID, URLs:        cfg.PeerURLs, ClusterID:   b.cluster.cl.ID(),
		Raft:        srv, Snapshotter: b.ss, ServerStats: sstats, LeaderStats: lstats,
		// 在这里传入的rafthttp.Raft接口实现是EtcdServer实例。EtcdServer对Raft接口的实现比较简单,它会直接将调用委托给底层的raftNode实例
		ErrorC:      srv.errorc,
	}
	if err = tr.Start(); err != nil {
     return nil, err }	 // 启动rafthttp.Transport实例
	// 向rafthttp.Transport实例中添加集群中各个节点对应的Peer实例和Remote实例
	for _, m := range b.cluster.remotes {
      // add all remotes into transport
		if m.ID != b.cluster.nodeID {
     tr.AddRemote(m.ID, m.PeerURLs) }
	}
	for _, m := range b.cluster.cl.Members() {
    
		if m.ID != b.cluster.nodeID {
     tr.AddPeer(m.ID, m.PeerURLs) }
	}
	srv.r.transport = tr
原网站

版权声明
本文为[肥叔菌]所创,转载请带上原文链接,感谢
https://feishujun.blog.csdn.net/article/details/125458626