当前位置:网站首页>Etcd database source code analysis cluster communication initialization
Etcd database source code analysis cluster communication initialization
2022-06-26 01:10:00 【Tertium ferrugosum】

Message entrance
One etcd After the node runs , Yes 3 Two channels receive external messages , With kv Data addition, deletion, modification and query request processing as an example , Introduce this 3 The working mechanism of two channels .
- client Of http call : Will register to http Modular keysHandler Of ServeHTTP Method treatment . Parse the good message and call EtcdServer Of Do() Method treatment .( In the figure 2)
- client Of grpc call : When it is started, it will turn to grpc server register quotaKVServer object ,quotaKVServer It is in combination that kvServer This data structure .grpc After the message is parsed, it will call kvServer Of Range、Put、DeleteRange、Txn、Compact Other methods .kvServer It contains a RaftKV The interface of , from EtcdServer This structure implements . So the final call is to EtcdServer Of Range、Put、DeleteRange、Txn、Compact Other methods .( In the figure 1 ETCD Database source code analysis ——etcd gRPC service API)
- Between nodes grpc news : Every EtcdServer contained Transport structure ,Transport There will be one peers Of map, Every peer Encapsulates the communication mode from a node to another node . Include streamReader、streamWriter etc. , Used for sending and receiving messages .streamReader There is recvc and propc queue ,streamReader After processing the received message, the message will be pushed to the queue . from peer To deal with ,peer call raftNode Of Process Method to process messages .( In the figure 3、4 This article will introduce )
Cluster communication server
func StartEtcd(inCfg *Config) (e *Etcd, err error) {
// Verification configuration inCfg, In fact, that is config Structure of the embed.Config Type of ec member
serving := false // Identify whether the service is being provided
e = &Etcd{
cfg: *inCfg, stopc: make(chan struct{
})} // establish Etcd
// 1. Initialize the communication required by the internal communication server of the cluster peerListener
if e.Peers, err = configurePeerListeners(cfg); err != nil {
return e, err }
// buffer channel so goroutines on closed connections won't wait forever
e.errc = make(chan error, len(e.Peers)+len(e.Clients)+2*len(e.sctxs))
// 2. Start the cluster communication server
if err = e.servePeers(); err != nil {
return e, err }

Initialize the communication required by the internal communication server of the cluster peerListener
configurePeerListeners Function defined in server/embed/etcd.go In file , For configuration Config in LPUrls Every url Create a peerListener, As shown in the figure above peerListener As shown in the structure , This function initializes peerListener Structure will call transport.NewListenerWithOpts Function creation net.Listener, take close Function initialized to func(context.Context) error { return peers[i].Listener.Close() }, Used to invoke net.Listener.Close function .configurePeerListeners Function is not initialized peerListener Structure of the serve function .
func configurePeerListeners(cfg *Config) (peers []*peerListener, err error) {
if err = updateCipherSuites(&cfg.PeerTLSInfo, cfg.CipherSuites); err != nil {
return nil, err }
if err = cfg.PeerSelfCert(); err != nil {
cfg.logger.Fatal("failed to get peer self-signed certs", zap.Error(err)) }
if !cfg.PeerTLSInfo.Empty() {
cfg.logger.Info( "starting with peer TLS", zap.String("tls-info", fmt.Sprintf("%+v", cfg.PeerTLSInfo)), zap.Strings("cipher-suites", cfg.CipherSuites),) }
peers = make([]*peerListener, len(cfg.LPUrls)) // You need to configure Config Medium LPUrls Every url Apply for one peerListener
defer func() {
// Exit the cleanup function executed by this function
if err == nil {
return }
for i := range peers {
if peers[i] != nil && peers[i].close != nil {
cfg.logger.Warn( "closing peer listener", zap.String("address", cfg.LPUrls[i].String()), zap.Error(err),)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
peers[i].close(ctx)
cancel()
}
}
}()
for i, u := range cfg.LPUrls {
if u.Scheme == "http" {
// http agreement
if !cfg.PeerTLSInfo.Empty() {
cfg.logger.Warn("scheme is HTTP while key and cert files are present; ignoring key and cert files", zap.String("peer-url", u.String())) }
if cfg.PeerTLSInfo.ClientCertAuth {
cfg.logger.Warn("scheme is HTTP while --peer-client-cert-auth is enabled; ignoring client cert auth for this URL", zap.String("peer-url", u.String())) }
}
peers[i] = &peerListener{
close: func(context.Context) error {
return nil }} // For configuration Config Medium LPUrls Every url Create a peerListener
peers[i].Listener, err = transport.NewListenerWithOpts(u.Host, u.Scheme, transport.WithTLSInfo(&cfg.PeerTLSInfo), transport.WithSocketOpts(&cfg.SocketOpts), transport.WithTimeout(rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout),)
if err != nil {
return nil, err }
// once serve, overwrite with 'http.Server.Shutdown'
peers[i].close = func(context.Context) error {
return peers[i].Listener.Close() }
}
return peers, nil
}
Start the cluster communication server
servePeers Function defined in server/embed/etcd.go In file , First NewPeerHandler Would call newPeerHandler(lg,s,s.RaftHandler()...),RaftHandler() Returns the s.r.transport.Handler(), Final transport.Handler Function returns the registered pipelineHandler、streamHandler、snapHandler Of mux. For each peerListener Create a collaboration run http server, Use pipelineHandler、streamHandler、snapHandler; to update peerListener Of close function , For each peerListener Create a collaboration run cmux.Serve(), That is, run the registration configurePeerListeners Of transport.NewListenerWithOpts establish Listener Service for .
// configure peer handlers after rafthttp.Transport started
func (e *Etcd) servePeers() (err error) {
ph := etcdhttp.NewPeerHandler(e.GetLogger(), e.Server)
for _, p := range e.Peers {
// For each peerListener Create a collaboration run http server, Use pipelineHandler、streamHandler、snapHandler
u := p.Listener.Addr().String()
m := cmux.New(p.Listener) // Use configurePeerListeners Function initialization Listener establish cmx
srv := &http.Server{
Handler: ph, ReadTimeout: 5 * time.Minute, ErrorLog: defaultLog.New(io.Discard, "", 0), } // do not log user error establish http server
go srv.Serve(m.Match(cmux.Any())) // Create a collaboration run http server
p.serve = func() error {
// configurePeerListeners Function is not initialized peerListener Structure of the serve function , Initialize here
e.cfg.logger.Info( "cmux::serve", zap.String("address", u),)
return m.Serve()
}
p.close = func(ctx context.Context) error {
// gracefully shutdown http.Server. close open listeners, idle connections until context cancel or time-out Update close function , Because the objects to be closed here are different
e.cfg.logger.Info("stopping serving peer traffic", zap.String("address", u),)
srv.Shutdown(ctx)
e.cfg.logger.Info("stopped serving peer traffic", zap.String("address", u),)
m.Close()
return nil
}
}
for _, pl := range e.Peers {
// start peer servers in a goroutine
go func(l *peerListener) {
// Create a collaboration run cmux.Serve()
u := l.Addr().String()
e.cfg.logger.Info("serving peer traffic",zap.String("address", u),
)
e.errHandler(l.serve())
}(pl)
}
return nil
}
Cluster communication client
Initializing EtcdServer Running in the process etcd.NewServersrv(cfg) Function will eventually initialize the transport layer , What is described here is the Transport block diagram , The following processes are mainly carried out : establish rafthttp.Transport example 、 start-up rafthttp.Transport Instance and direction rafthttp.Transport Add the corresponding to each node in the cluster to the instance Peer Instance and Remote Example operation . The detailed process will be explained in the follow-up blog .
// TODO: move transport initialization near the definition of remote
tr := &rafthttp.Transport{
// establish rafthttp.Transport example
Logger: cfg.Logger, TLSInfo: cfg.PeerTLSInfo, DialTimeout: cfg.PeerDialTimeout(),
ID: b.cluster.nodeID, URLs: cfg.PeerURLs, ClusterID: b.cluster.cl.ID(),
Raft: srv, Snapshotter: b.ss, ServerStats: sstats, LeaderStats: lstats,
// Passed in here rafthttp.Raft The interface implementation is EtcdServer example .EtcdServer Yes Raft The implementation of the interface is relatively simple , It delegates the call directly to the underlying raftNode example
ErrorC: srv.errorc,
}
if err = tr.Start(); err != nil {
return nil, err } // start-up rafthttp.Transport example
// towards rafthttp.Transport Add the corresponding to each node in the cluster to the instance Peer Instance and Remote example
for _, m := range b.cluster.remotes {
// add all remotes into transport
if m.ID != b.cluster.nodeID {
tr.AddRemote(m.ID, m.PeerURLs) }
}
for _, m := range b.cluster.cl.Members() {
if m.ID != b.cluster.nodeID {
tr.AddPeer(m.ID, m.PeerURLs) }
}
srv.r.transport = tr
边栏推荐
- idea配置
- Phoenix index
- Balanced binary tree AVL
- Music spectrum display toy -- implementation and application of FFT in stm32
- Nacos注册中心
- 从查询数据库性能优化谈到redis缓存-谈一谈缓存的穿透、雪崩、击穿
- QT cmake pure C code calls the system console to input scanf and Chinese output garbled code
- Case: drawing Matplotlib dynamic graph
- [understanding of opportunity -30]: Guiguzi - internal "chapter - empathy, stand on the other side's position and narrow the psychological distance with the other side
- MySQL custom function instance
猜你喜欢

FIFO code implemented in C language

返回值为Object型方法调用equals()

下载安装Flume

Redis strings command

马斯克 VS 乔布斯,谁是21世纪最伟大的创业家

ADC acquisition noise and comparison between RMS filter and Kalman filter

idea配置

JS reverse case: cracking login password

ciscn_ 2019_ en_ two

Dgus new upgrade: fully support digital video playback function
随机推荐
ASP.NET cache缓存的用法
When you run the demo using the gin framework, there is an error "listen TCP: 8080: bind: an attempt was made to access a socket in a way forbidden"
A sharp tool for information collection, Google hacker syntax
Dgus new upgrade: fully support digital video playback function
Phoenix index
Mpu6050 reads the ID incorrectly and 0xd1 occurs (the correct ID should be 0x68 or 0x69). Solution.
Inheritance -- holy grail mode
[understanding of opportunity -30]: Guiguzi - internal "chapter - empathy, stand on the other side's position and narrow the psychological distance with the other side
JS reverse case: cracking login password
Endnote IEEE Transactions on industrial electronics/tie/tpel reference format template
Electronic training.
Making 3D romantic cool photo album [source code attached]
Android cache usage tool class
Web學習之TypeScript
Casually painted
[从零开始学习FPGA编程-44]:视野篇 - 集成电路助力数字化时代高质量发展-1-集成电路芯片主要形态
制作3D浪漫炫酷相册【附源码】
Web学习之TypeScript
Music spectrum display toy -- implementation and application of FFT in stm32
2022年电气试验考试试题模拟考试平台操作