当前位置:网站首页>Etcd watch principle
Etcd watch principle
2022-06-26 04:07:00 【Believe in the reason and follow the reason】
etcd watch principle

Superficial understanding , If there are mistakes , Please advise !
Service initialization
etcd It will register at startup WatchServer[1], pb.WatchServer Used for processing watch request
receive watch request
- every last watch Each stream creates a serverWatchStream Structure
- Open two goroutine, sendLoop Used for sending watch Message to flow ,recvLoop Accept the request
- select Block until flow is closed , Or timeout exit .
1. receive watch request recvLoop
recvLoop from gRPCStream read out req, Then process the types as CreateRequest, CancelRequest, ProgressRequest The situation of
- CreateRequest: Monitoring may be a range , So build key and RangeEnd. Handle StartRevision, If 0, Then use the current The system is up to date Rev+1. call mvcc Layer of watchStream.Watch, Return to one watchid, Put this id Package to watchResponse, then watchResponse writes ctrlStream
- CancelRequest: Or call mvcc Layer of watchableStore.Cancel Unsubscribe , Then clear the status information
- ProgressRequest: broadcast Broadcast the current system Rev edition
2. receive watch request sendLoop
stay watchid before , Maybe a message triggered , There is not yet id, So messages will pile up in pending in . The whole function mainly starts from mvcc.watchStream.Chan() Process the message of reading subscription in , Handle ctrlStream Control messages and processing progressTicker
- Chan(): If needPrevKV, It needs filling .watchid If it doesn't exist , Temporarily move to pending In line .Fragment Check whether subcontracting is required , Here the threshold is 1.5M, Call directly if you don't need it sws.gRPCStream.Send Just send . If there is data transmission ,sws.progress[wresp.WatchID] Set as false, No progress messages
- ctrlStream: Read control messages , Here, as long as it's access watchid, Then send the stacked pending news
- progressTicker: Periodically call RequestProgress Generate progress messages , Put the present Rev issue client
MVCC watch
This one is mainly about mvcc.watchStream, look down Watch How to achieve
It is mainly used to generate watchid, Just increase yourself . Call again watch Method to get a watcher and cancelFunc, take watcher and cancelFunc Put in watchStream
watch Method :
WatchStream Of watchableStore There are three in all group: synced, unsynced And victims, When client watch Time begins with historical records , That is to say, there are a lot of messages to be sent to client, It will be watcher The structure is thrown into unsynced In the group , Otherwise throw it to synced In the group . Why do you do this ? Because of the speed of message processing , The specific code will be explained later , Just remember that watcher Will flow through these three groups , Of course, the ideal situation is always synced In the group
MVCC Message generation
Bottom Txn use watchableStoreTxnWrite It encapsulates , Calling End Before committing the transaction , call notify Send the change message .
Traverse changes, Judgment type mvccpb.DELETE or mvccpb.PUT, And then it's packaged as envs event , call tw.s.notify Submit after sending .
newWatcherBatch For from synced Get the... To be sent in the group watcher, And then call w.send Send to channel Inside , If channel Full of , Then it means that it cannot be sent , take watcher from synced Delete... From group , To add to victim In the group , Follow up asynchronously goroutine syncVictimsLoop Handle . Let's see ,newWatcherBatch Realization
watcherSetByKey It is used to return that the ev.Kv.Key Of watcher, The internal implementation here uses adt Red and black trees , Fast range matching can be achieved . Interested can see the source code .
send Function first apply filter Filter again , Then send it to w.ch in , If full, return false. This w.ch Namely v3rpc The use of channel, Send when there is data http2 stream …
newWatchableStore Method
etcd Service startup creation watchableStore, stay newWatchableStore when , Two asynchronous... Are generated goroutine, syncWatchersLoop Is used to unsynced Of watcher become synced watcher, syncVictimsLoop Is used to victims Send out as many messages as possible .
Slow processing
1. Slow processing victim
call moveVictims Try to send a stack of messages
The code is simple , Try sending first victims The news , If it fails , Then put it in victims in . If it works , It also depends on the current system Rev Whether it is related to the watcher.minRev equal , Let's put it in synced The group is still unsynced In the group .
2. Slow processing unsynced
syncWatchersLoop Function loop call syncWatchers Handle unsynced Group data
- choose from unsynced Select... Of data to be sent watcher groups, Just see if the version is available , It's in [compactRev, curRev]
- UnsafeRange from boltdb Get all the keys/values
- Traverse watchers, Start sending qualified keys/values, If you succeed, you can start from unsynced Delete in , Add to synced in , Otherwise add to victims In line
Reference link : Dong Zerun's technical notes : Understand etcd watch Realization
边栏推荐
- Optimization - multi objective planning
- Ipvs0 network card of IPVS
- 软件调试测试的十大重要基本准则
- Conditional variables for thread synchronization
- BSC 及HT 等链的NFT 创造及绑定图片教程
- asp.net网页选择身份进行登录的简单代码,asp连接数据库,使用asp:Panel、asp:DropDownList控件
- WPF 值转换
- Syntax error of go language generic in IDE
- [Flink] Flink source code analysis - creation of jobgraph in batch mode
- Matplotlib line chart, text display, win10
猜你喜欢
Force buckle 515 Find the maximum value in each tree row
[Flink] Flink batch mode map side data aggregation normalizedkeysorter
Conditional variables for thread synchronization
[Nuggets' operation routine disclosure] the routine of being truly Nuggets
What should I do if the 51 SCM board cannot find the device in keil
Part 4: drawing quadrilateral
[Flink] Flink source code analysis - creation of jobgraph in batch mode
Read / write lock for thread synchronization
Analysis of camera memory memory leakage (II)
Unity移动端游戏性能优化简谱之 以引擎模块为划分的CPU耗时调优
随机推荐
matplotlib折线图,文字显示,win10
Analysis of camera memory memory leakage (II)
1. foundation closing
How to solve the problem that iterative semi supervised training is difficult to implement in ASR training? RTC dev Meetup
Ieda suddenly cannot find compact middle packages
I/O 虚拟化技术 — VFIO
2021 year end summary
[QT] dialog box
mysql自带的性能测试工具mysqlslap执行压力测试
Go time package: second, millisecond, nanosecond timestamp output
Getting started with flask
763. 划分字母区间
Mybatis的引入问题invalid
[learn FPGA programming from scratch -45]: vision chapter - integrated circuits help high-quality development in the digital era -2- market forecast
The stc-isp burning program for 51 single chip microcomputer always shows that "the target single chip microcomputer is being detected..." the cold start board does not respond
763. dividing alphabetic intervals
(15)Blender源码分析之闪屏窗口显示菜单功能
Camera-CreateCaptureSession
Unity移动端游戏性能优化简谱之 以引擎模块为划分的CPU耗时调优
(15) Blender source code analysis flash window display menu function