当前位置:网站首页>Etcd database source code analysis -- network layer server rafthandler between clusters
Etcd database source code analysis -- network layer server rafthandler between clusters
2022-06-28 03:22:00 【Tertium ferrugosum】
stay ETCD Database source code analysis —— Inter cluster network layer server interface In the article , start-up http.Server Will pass rafthttp.Transporter.Handler() Method is the specified URL Add the corresponding path Handler example , As shown in the figure below .streamHandler Responsible for handling Stream Requests on the message channel .pipelineHandler Responsible for handling Pipeline Request on channel ,snapHandler Responsible for handling Pipeline Request on channel .
pipelineHandler
pipelineHandler It is used by the server of the network layer between clusters to handle Pipeline Request on channel ,tr For the current pipeline Instance related rafthttp.Transport example ,r For the bottom Raft example ,cid For the current cluster ID.
type pipelineHandler struct {
lg *zap.Logger
localID types.ID
tr Transporter
r Raft
cid types.ID
}
pipelineHandler Implemented in the http.Server.Handler Interface ServeHTTP Method , Is its core method of processing requests . The method is passed by Read the request from the peer node to get the corresponding message instance , Then give it to the bottom etcd-raft Module processing .
func (h *pipelineHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
// Start with a series of checks , for example : Check the requested Method Is it POST, Detection cluster ID Is it legal or not
w.Header().Set("Allow", "POST")
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
return
}
w.Header().Set("X-Etcd-Cluster-ID", h.cid.String()) // Set the... In the response header X-Etcd-Cluster-ID For the current cluster ID
if err := checkClusterCompatibilityFromHeader(h.lg, h.localID, r.Header, h.cid); err != nil {
http.Error(w, err.Error(), http.StatusPreconditionFailed)
return
}
addRemoteFromRequest(h.tr, r) // according to http Add a remote... To the request header peer
// Limit the number of bytes read continuously from the bottom layer each time , The default is 64KB, Because snapshot data can be very large , To prevent read timeouts , Only part of the data can be read into the buffer at a time , Finally, all the data are spliced together , Get complete snapshot data
// Limit the data size that could be read from the request body, which ensures that read from
// connection will not time out accidentally due to possible blocking in underlying implementation.
limitedr := pioutil.NewLimitedBufferReader(r.Body, connReadLimitByte)
b, err := io.ReadAll(limitedr) // Read HTTP Requested Body The whole content of
if err != nil {
h.lg.Warn("failed to read Raft message",zap.String("local-member-id", h.localID.String()),zap.Error(err),)
http.Error(w, "error reading raft message", http.StatusBadRequest)
recvFailures.WithLabelValues(r.RemoteAddr).Inc()
return
}
var m raftpb.Message
if err := m.Unmarshal(b); err != nil {
// Deserialized raftpb.Message example
h.lg.Warn("failed to unmarshal Raft message",zap.String("local-member-id", h.localID.String()),zap.Error(err),
)
http.Error(w, "error unmarshalling raft message", http.StatusBadRequest)
recvFailures.WithLabelValues(r.RemoteAddr).Inc()
return
}
receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(len(b)))
// Give the read message instance to the underlying Raft State machine for processing <---------------------------------
if err := h.r.Process(context.TODO(), m); err != nil {
switch v := err.(type) {
case writerToResponse: v.WriteTo(w)
default:
h.lg.Warn("failed to process Raft message",zap.String("local-member-id", h.localID.String()),zap.Error(err),
)
http.Error(w, "error processing raft message", http.StatusInternalServerError)
w.(http.Flusher).Flush()
// disconnect the http stream
panic(err)
}
return
}
// Return the appropriate status code to the peer node , Indicates that the request has been processed
// Write StatusNoContent header after the message has been processed by raft, which facilitates the client to report MsgSnap status.
w.WriteHeader(http.StatusNoContent)
}
// addRemoteFromRequest adds a remote peer according to an http request header addRemoteFromRequest According to a http Add a remote... To the request header peer
func addRemoteFromRequest(tr Transporter, r *http.Request) {
if from, err := types.IDFromString(r.Header.Get("X-Server-From")); err == nil {
if urls := r.Header.Get("X-PeerURLs"); urls != "" {
// If... In the request package X-PeerURLs Field has opposite end URL, You need the server to call AddRemote Function to add new URL
tr.AddRemote(from, strings.Split(urls, ","))
}
}
}
streamHandler
streamHandler The structure is mainly responsible for receiving the peer-to-peer network connection , Match it to the corresponding streamWriter Instance . such ,streamWriter You can start sending messages to the peer node .Stream The message channel maintains HTTP A long connection , Mainly responsible for transmitting a small amount of data 、 Send more frequent messages , therefore streamHandler.ServeHTTP The main objective is to outgoingConn example (streamWriter) With the corresponding peer Instance binding .
type streamHandler struct {
lg *zap.Logger
tr *Transport // relation rafthttp.Transport example
peerGetter peerGetter // In its interface Get Method will change the specified node ID Get the corresponding peer example
r Raft // Bottom raft example
id types.ID // Current node ID
cid types.ID // Current cluster ID
}
streamHandler Implemented in the http.Server.Handler Interface ServeHTTP Method , Is its core method of processing requests .
func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method != "GET" {
// Detection request Method Is it GET
w.Header().Set("Allow", "GET")
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
return
}
w.Header().Set("X-Server-Version", version.Version)
w.Header().Set("X-Etcd-Cluster-ID", h.cid.String())
if err := checkClusterCompatibilityFromHeader(h.lg, h.tr.ID, r.Header, h.cid); err != nil {
http.Error(w, err.Error(), http.StatusPreconditionFailed)
return
}
// Detect cluster's ID
var t streamType
switch path.Dir(r.URL.Path) {
case streamTypeMsgAppV2.endpoint(h.lg):
t = streamTypeMsgAppV2
case streamTypeMessage.endpoint(h.lg):
t = streamTypeMessage
default:
h.lg.Debug("ignored unexpected streaming request path",zap.String("local-member-id", h.tr.ID.String()),zap.String("remote-peer-id-stream-handler", h.id.String()),zap.String("path", r.URL.Path),)
http.Error(w, "invalid path", http.StatusNotFound)
return
}
// Get the... Of the peer node ID
fromStr := path.Base(r.URL.Path)
from, err := types.IDFromString(fromStr)
if err != nil {
h.lg.Warn("failed to parse path into ID",zap.String("local-member-id", h.tr.ID.String()),zap.String("remote-peer-id-stream-handler", h.id.String()),zap.String("path", fromStr),zap.Error(err),
)
http.Error(w, "invalid from", http.StatusNotFound)
return
}
if h.r.IsIDRemoved(uint64(from)) {
// Check whether the opposite end has been removed
h.lg.Warn("rejected stream from remote peer because it was removed",zap.String("local-member-id", h.tr.ID.String()),zap.String("remote-peer-id-stream-handler", h.id.String()),zap.String("remote-peer-id-from", from.String()),
)
http.Error(w, "removed member", http.StatusGone)
return
}
p := h.peerGetter.Get(from) // According to the opposite end node ID Get the corresponding Peer example
if p == nil {
// This may happen in following cases:
// 1. user starts a remote peer that belongs to a different cluster with the same cluster ID.
// 2. local etcd falls behind of the cluster, and cannot recognize the members that joined after its current progress.
if urls := r.Header.Get("X-PeerURLs"); urls != "" {
h.tr.AddRemote(from, strings.Split(urls, ","))}
h.lg.Warn("failed to find remote peer in cluster",zap.String("local-member-id", h.tr.ID.String()),zap.String("remote-peer-id-stream-handler", h.id.String()),zap.String("remote-peer-id-from", from.String()),zap.String("cluster-id", h.cid.String()),
)
http.Error(w, "error sender not found", http.StatusNotFound)
return
}
wto := h.id.String() // Get the... Of the current node ID
if gto := r.Header.Get("X-Raft-To"); gto != wto {
// Check whether the requested target node is the current node
h.lg.Warn("ignored streaming request; ID mismatch",zap.String("local-member-id", h.tr.ID.String()),zap.String("remote-peer-id-stream-handler", h.id.String()),zap.String("remote-peer-id-header", gto),zap.String("remote-peer-id-from", from.String()),zap.String("cluster-id", h.cid.String()),
)
http.Error(w, "to field mismatch", http.StatusPreconditionFailed)
return
}
w.WriteHeader(http.StatusOK)
w.(http.Flusher).Flush() // call Flush Method to send the response data to the peer node
c := newCloseNotifier()
conn := &outgoingConn{
t: t,Writer: w,Flusher: w.(http.Flusher),Closer: c,localID: h.tr.ID,peerID: from,} // establish outgoingConn example
p.attachOutgoingConn(conn) // take outgoingConn Instance and corresponding streamWriter Instance binding
<-c.closeNotify()
}
snapHandler
snapHandler The structure is used to receive snapshot data from the peer node .tr For related rafthttp.Transport example ,r For the bottom Raft example ,cid For the current cluster ID.snapshotter Responsible for saving snapshot data to a local file , See ETCD Source code analysis ——snap.New Function introduction .
type snapshotHandler struct {
lg *zap.Logger
tr Transporter
r Raft
snapshotter *snap.Snapshotter
localID types.ID
cid types.ID
}
ServeHTTP In the service of HTTP Request to receive and process snapshot messages . If the request sender does not close the underlying TCP Death in case of connection , The handler will continue to wait for the request body , until TCP keepalive After a few minutes, the connection was found disconnected . This is acceptable , because 1. Through others TCP Snapshot messages sent by the connection can still be received and processed .2. This should rarely happen , So there is no further optimization .snapshotHandler.ServeHTTP Method besides Read the snapshot data sent by the peer node , Generate the corresponding snapshot file locally , And pass the snapshot data through Raft Interfaces are passed to the underlying etcd-raft Module processing .
// ServeHTTP serves HTTP request to receive and process snapshot message.
//
// If request sender dies without closing underlying TCP connection,
// the handler will keep waiting for the request body until TCP keepalive
// finds out that the connection is broken after several minutes.
// This is acceptable because
// 1. snapshot messages sent through other TCP connections could still be
// received and processed.
// 2. this case should happen rarely, so no further optimization is done.
func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
start := time.Now()
// First, a series of tests are carried out
if r.Method != "POST" {
w.Header().Set("Allow", "POST")
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
snapshotReceiveFailures.WithLabelValues(unknownSnapshotSender).Inc()
return
}
w.Header().Set("X-Etcd-Cluster-ID", h.cid.String())
if err := checkClusterCompatibilityFromHeader(h.lg, h.localID, r.Header, h.cid); err != nil {
http.Error(w, err.Error(), http.StatusPreconditionFailed)
snapshotReceiveFailures.WithLabelValues(unknownSnapshotSender).Inc()
return
}
addRemoteFromRequest(h.tr, r) // according to http Add a remote... To the request header peer
dec := &messageDecoder{
r: r.Body}
// let snapshots be very large since they can exceed 512MB for large installations
// Limit the maximum number of bytes read from the underlying connection each time , The default is 64KB, Because snapshot data can be very large , To prevent read timeouts , Only part of the data can be read into the buffer at a time , Finally, all the data are spliced together , Get complete snapshot data
m, err := dec.decodeLimit(snapshotLimitByte)
from := types.ID(m.From).String()
if err != nil {
msg := fmt.Sprintf("failed to decode raft message (%v)", err)
h.lg.Warn("failed to decode Raft message",zap.String("local-member-id", h.localID.String()),zap.String("remote-snapshot-sender-id", from),zap.Error(err),
)
http.Error(w, msg, http.StatusBadRequest)
recvFailures.WithLabelValues(r.RemoteAddr).Inc()
snapshotReceiveFailures.WithLabelValues(from).Inc()
return
}
msgSize := m.Size()
receivedBytes.WithLabelValues(from).Add(float64(msgSize))
// Detect the type of message read , Is it MsgSnap
if m.Type != raftpb.MsgSnap {
h.lg.Warn("unexpected Raft message type",zap.String("local-member-id", h.localID.String()),zap.String("remote-snapshot-sender-id", from),zap.String("message-type", m.Type.String()),
)
http.Error(w, "wrong raft message type", http.StatusBadRequest)
snapshotReceiveFailures.WithLabelValues(from).Inc()
return
}
snapshotReceiveInflights.WithLabelValues(from).Inc()
defer func() {
snapshotReceiveInflights.WithLabelValues(from).Dec() }()
h.lg.Info("receiving database snapshot",zap.String("local-member-id", h.localID.String()),zap.String("remote-snapshot-sender-id", from),zap.Uint64("incoming-snapshot-index", m.Snapshot.Metadata.Index),zap.Int("incoming-snapshot-message-size-bytes", msgSize),zap.String("incoming-snapshot-message-size", humanize.Bytes(uint64(msgSize))),
)
// save incoming database snapshot. Use Snapshotter Save the snapshot data to a local file
n, err := h.snapshotter.SaveDBFrom(r.Body, m.Snapshot.Metadata.Index)
if err != nil {
msg := fmt.Sprintf("failed to save KV snapshot (%v)", err)
h.lg.Warn("failed to save incoming database snapshot",zap.String("local-member-id", h.localID.String()),zap.String("remote-snapshot-sender-id", from),zap.Uint64("incoming-snapshot-index", m.Snapshot.Metadata.Index),zap.Error(err),)
http.Error(w, msg, http.StatusInternalServerError)
snapshotReceiveFailures.WithLabelValues(from).Inc()
return
}
receivedBytes.WithLabelValues(from).Add(float64(n))
downloadTook := time.Since(start)
h.lg.Info("received and saved database snapshot",zap.String("local-member-id", h.localID.String()),zap.String("remote-snapshot-sender-id", from),zap.Uint64("incoming-snapshot-index", m.Snapshot.Metadata.Index),zap.Int64("incoming-snapshot-size-bytes", n),zap.String("incoming-snapshot-size", humanize.Bytes(uint64(n))),zap.String("download-took", downloadTook.String()),
)
// call Raft.Process Method , take MsgSnap Messages are delivered to the underlying etcd-raft Module processing
if err := h.r.Process(context.TODO(), m); err != nil {
switch v := err.(type) {
// Process may return writerToResponse error when doing some additional checks before calling raft.Node.Step.
case writerToResponse: v.WriteTo(w)
default:
msg := fmt.Sprintf("failed to process raft message (%v)", err)
h.lg.Warn("failed to process Raft message",zap.String("local-member-id", h.localID.String()),zap.String("remote-snapshot-sender-id", from),zap.Error(err),
)
http.Error(w, msg, http.StatusInternalServerError)
snapshotReceiveFailures.WithLabelValues(from).Inc()
}
return
}
// Write StatusNoContent header after the message has been processed by raft, which facilitates the client to report MsgSnap status.
w.WriteHeader(http.StatusNoContent)
snapshotReceive.WithLabelValues(from).Inc()
snapshotReceiveSeconds.WithLabelValues(from).Observe(time.Since(start).Seconds())
}
边栏推荐
- 【Kotlin】在Android官方文档中对其语法的基本介绍和理解
- [issue 21] face to face experience of golang engineer recruited by Zhihu Society
- apache、iis6、ii7独立ip主机屏蔽限制ip访问
- 栈的基本操作(C语言实现)
- Relative path writing of files
- Apache, IIS6 and ii7 independent IP hosts screen and intercept spider crawling (applicable to VPS virtual machine servers)
- 劲爆!YOLOv6又快又准的目标检测框架开源啦(附源代码下载)
- be fond of the new and tired of the old? Why do it companies prefer to spend 20K on recruiting rather than raise salaries to retain old employees
- Single page application (SPA) hash route and historical API route
- 2-5 basic configuration -win2003 add attack surface
猜你喜欢

Gateway microservice routing failed to load microservice static resources

__getitem__和__setitem__

CI & CD 不可不知!

视频编解码性能优化与实现

2022 operation of simulated examination platform of special operation certificate examination question bank for safety management personnel of hazardous chemical business units

Tardigrade:Trino 解决 ETL 场景的方案

Question bank and answers of special operation certificate for R1 quick opening pressure vessel operation in 2022

剑指 Offer 47. 礼物的最大价值(DP)

PPT制作小技巧

建立自己的网站(17)
随机推荐
"Everyday Mathematics" serial 53: February 21
[today in history] June 18: JD was born; The online store platform Etsy was established; Facebook releases Libra white paper
RichView TRVStyle
In the digital era, enterprises must do well in user information security
Apache——阿帕奇简介
[plug in -statistical] statistics the number of code lines and related data
简单ELK配置实现生产级别的日志采集和查询实践
The first in the industry! MOS sub evaluation model for subjective video quality experience that can run on mobile devices!
如何获取GC(垃圾回收器)的STW(暂停)时间?
Usage details of staticlayout
2022年R1快開門式壓力容器操作特種作業證考試題庫及答案
2022 electrician (elementary) recurrent training question bank and online simulation examination
You got 8K in the 3-year function test, but were overtaken by the new tester. In fact, you are pretending to work hard
2022年R1快开门式压力容器操作特种作业证考试题库及答案
What is the best and safest software to download when buying stocks?
crond BAD FILE MODE /etc/cron.d
RichView TRVStyle TextStyles
【522. 最长特殊序列 II】
栈的基本操作(C语言实现)
Heartless sword Chinese English bilingual poem 004 Sword