当前位置:网站首页>Etcd database source code analysis cluster communication initialization
Etcd database source code analysis cluster communication initialization
2022-06-26 01:10:00 【Tertium ferrugosum】

Message entrance
One etcd After the node runs , Yes 3 Two channels receive external messages , With kv Data addition, deletion, modification and query request processing as an example , Introduce this 3 The working mechanism of two channels .
- client Of http call : Will register to http Modular keysHandler Of ServeHTTP Method treatment . Parse the good message and call EtcdServer Of Do() Method treatment .( In the figure 2)
- client Of grpc call : When it is started, it will turn to grpc server register quotaKVServer object ,quotaKVServer It is in combination that kvServer This data structure .grpc After the message is parsed, it will call kvServer Of Range、Put、DeleteRange、Txn、Compact Other methods .kvServer It contains a RaftKV The interface of , from EtcdServer This structure implements . So the final call is to EtcdServer Of Range、Put、DeleteRange、Txn、Compact Other methods .( In the figure 1 ETCD Database source code analysis ——etcd gRPC service API)
- Between nodes grpc news : Every EtcdServer contained Transport structure ,Transport There will be one peers Of map, Every peer Encapsulates the communication mode from a node to another node . Include streamReader、streamWriter etc. , Used for sending and receiving messages .streamReader There is recvc and propc queue ,streamReader After processing the received message, the message will be pushed to the queue . from peer To deal with ,peer call raftNode Of Process Method to process messages .( In the figure 3、4 This article will introduce )
Cluster communication server
func StartEtcd(inCfg *Config) (e *Etcd, err error) {
// Verification configuration inCfg, In fact, that is config Structure of the embed.Config Type of ec member
serving := false // Identify whether the service is being provided
e = &Etcd{
cfg: *inCfg, stopc: make(chan struct{
})} // establish Etcd
// 1. Initialize the communication required by the internal communication server of the cluster peerListener
if e.Peers, err = configurePeerListeners(cfg); err != nil {
return e, err }
// buffer channel so goroutines on closed connections won't wait forever
e.errc = make(chan error, len(e.Peers)+len(e.Clients)+2*len(e.sctxs))
// 2. Start the cluster communication server
if err = e.servePeers(); err != nil {
return e, err }

Initialize the communication required by the internal communication server of the cluster peerListener
configurePeerListeners Function defined in server/embed/etcd.go In file , For configuration Config in LPUrls Every url Create a peerListener, As shown in the figure above peerListener As shown in the structure , This function initializes peerListener Structure will call transport.NewListenerWithOpts Function creation net.Listener, take close Function initialized to func(context.Context) error { return peers[i].Listener.Close() }, Used to invoke net.Listener.Close function .configurePeerListeners Function is not initialized peerListener Structure of the serve function .
func configurePeerListeners(cfg *Config) (peers []*peerListener, err error) {
if err = updateCipherSuites(&cfg.PeerTLSInfo, cfg.CipherSuites); err != nil {
return nil, err }
if err = cfg.PeerSelfCert(); err != nil {
cfg.logger.Fatal("failed to get peer self-signed certs", zap.Error(err)) }
if !cfg.PeerTLSInfo.Empty() {
cfg.logger.Info( "starting with peer TLS", zap.String("tls-info", fmt.Sprintf("%+v", cfg.PeerTLSInfo)), zap.Strings("cipher-suites", cfg.CipherSuites),) }
peers = make([]*peerListener, len(cfg.LPUrls)) // You need to configure Config Medium LPUrls Every url Apply for one peerListener
defer func() {
// Exit the cleanup function executed by this function
if err == nil {
return }
for i := range peers {
if peers[i] != nil && peers[i].close != nil {
cfg.logger.Warn( "closing peer listener", zap.String("address", cfg.LPUrls[i].String()), zap.Error(err),)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
peers[i].close(ctx)
cancel()
}
}
}()
for i, u := range cfg.LPUrls {
if u.Scheme == "http" {
// http agreement
if !cfg.PeerTLSInfo.Empty() {
cfg.logger.Warn("scheme is HTTP while key and cert files are present; ignoring key and cert files", zap.String("peer-url", u.String())) }
if cfg.PeerTLSInfo.ClientCertAuth {
cfg.logger.Warn("scheme is HTTP while --peer-client-cert-auth is enabled; ignoring client cert auth for this URL", zap.String("peer-url", u.String())) }
}
peers[i] = &peerListener{
close: func(context.Context) error {
return nil }} // For configuration Config Medium LPUrls Every url Create a peerListener
peers[i].Listener, err = transport.NewListenerWithOpts(u.Host, u.Scheme, transport.WithTLSInfo(&cfg.PeerTLSInfo), transport.WithSocketOpts(&cfg.SocketOpts), transport.WithTimeout(rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout),)
if err != nil {
return nil, err }
// once serve, overwrite with 'http.Server.Shutdown'
peers[i].close = func(context.Context) error {
return peers[i].Listener.Close() }
}
return peers, nil
}
Start the cluster communication server
servePeers Function defined in server/embed/etcd.go In file , First NewPeerHandler Would call newPeerHandler(lg,s,s.RaftHandler()...),RaftHandler() Returns the s.r.transport.Handler(), Final transport.Handler Function returns the registered pipelineHandler、streamHandler、snapHandler Of mux. For each peerListener Create a collaboration run http server, Use pipelineHandler、streamHandler、snapHandler; to update peerListener Of close function , For each peerListener Create a collaboration run cmux.Serve(), That is, run the registration configurePeerListeners Of transport.NewListenerWithOpts establish Listener Service for .
// configure peer handlers after rafthttp.Transport started
func (e *Etcd) servePeers() (err error) {
ph := etcdhttp.NewPeerHandler(e.GetLogger(), e.Server)
for _, p := range e.Peers {
// For each peerListener Create a collaboration run http server, Use pipelineHandler、streamHandler、snapHandler
u := p.Listener.Addr().String()
m := cmux.New(p.Listener) // Use configurePeerListeners Function initialization Listener establish cmx
srv := &http.Server{
Handler: ph, ReadTimeout: 5 * time.Minute, ErrorLog: defaultLog.New(io.Discard, "", 0), } // do not log user error establish http server
go srv.Serve(m.Match(cmux.Any())) // Create a collaboration run http server
p.serve = func() error {
// configurePeerListeners Function is not initialized peerListener Structure of the serve function , Initialize here
e.cfg.logger.Info( "cmux::serve", zap.String("address", u),)
return m.Serve()
}
p.close = func(ctx context.Context) error {
// gracefully shutdown http.Server. close open listeners, idle connections until context cancel or time-out Update close function , Because the objects to be closed here are different
e.cfg.logger.Info("stopping serving peer traffic", zap.String("address", u),)
srv.Shutdown(ctx)
e.cfg.logger.Info("stopped serving peer traffic", zap.String("address", u),)
m.Close()
return nil
}
}
for _, pl := range e.Peers {
// start peer servers in a goroutine
go func(l *peerListener) {
// Create a collaboration run cmux.Serve()
u := l.Addr().String()
e.cfg.logger.Info("serving peer traffic",zap.String("address", u),
)
e.errHandler(l.serve())
}(pl)
}
return nil
}
Cluster communication client
Initializing EtcdServer Running in the process etcd.NewServersrv(cfg) Function will eventually initialize the transport layer , What is described here is the Transport block diagram , The following processes are mainly carried out : establish rafthttp.Transport example 、 start-up rafthttp.Transport Instance and direction rafthttp.Transport Add the corresponding to each node in the cluster to the instance Peer Instance and Remote Example operation . The detailed process will be explained in the follow-up blog .
// TODO: move transport initialization near the definition of remote
tr := &rafthttp.Transport{
// establish rafthttp.Transport example
Logger: cfg.Logger, TLSInfo: cfg.PeerTLSInfo, DialTimeout: cfg.PeerDialTimeout(),
ID: b.cluster.nodeID, URLs: cfg.PeerURLs, ClusterID: b.cluster.cl.ID(),
Raft: srv, Snapshotter: b.ss, ServerStats: sstats, LeaderStats: lstats,
// Passed in here rafthttp.Raft The interface implementation is EtcdServer example .EtcdServer Yes Raft The implementation of the interface is relatively simple , It delegates the call directly to the underlying raftNode example
ErrorC: srv.errorc,
}
if err = tr.Start(); err != nil {
return nil, err } // start-up rafthttp.Transport example
// towards rafthttp.Transport Add the corresponding to each node in the cluster to the instance Peer Instance and Remote example
for _, m := range b.cluster.remotes {
// add all remotes into transport
if m.ID != b.cluster.nodeID {
tr.AddRemote(m.ID, m.PeerURLs) }
}
for _, m := range b.cluster.cl.Members() {
if m.ID != b.cluster.nodeID {
tr.AddPeer(m.ID, m.PeerURLs) }
}
srv.r.transport = tr
边栏推荐
猜你喜欢
随机推荐
随便画画的
The kth largest element in the array
Essence and thoughts of 30 lectures on product thinking
Case: drawing Matplotlib dynamic graph
数字电路——加法器
Native DOM vs. virtual DOM
PHP performance optimization
C#线程池控制Semaphore
New library launched | cnopendata China new house information data
Template engine - FreeMarker first experience
Nacos注册中心
LabVIEW开发监控聚变实验脉冲电源
MOS管防倒灌电路设计及其过程分析
简单 deepclone
JS reverse case: cracking login password
[understanding of opportunity -30]: Guiguzi - internal "chapter - empathy, stand on the other side's position and narrow the psychological distance with the other side
关于HC-12无线射频模块使用
多接口调用,使用Promise.all、Promise.race和Promise.any
Nacos registry
Radio boxes are mutually exclusive and can be deselected at the same time









