当前位置:网站首页>etcdv3·watch操作实现及相关重点说明
etcdv3·watch操作实现及相关重点说明
2022-07-22 21:27:00 【_七里香】
基于前文的封装,这里我们说明下watch相关的定义及操作。
为了应对这样的需求, 这里举个例子:
A会根据需要有不同的操作,而B需要监听A不同的操作变化来处理不同的事情,这时候如果是etcd那么问题即可迎刃而解。
再实际一点:如业务中,用户欠费后将会停止用户的某特权功能,则停止的一方就需要监听到底是什么时候欠费的,一旦欠费,则给予停止操作停止其特权。
目录
代码实现
func InitV3Client() (cli *clientv3.Client, err error) {
cli, err = clientv3.New(clientv3.Config{
Endpoints: []string{"192.168.31.103:32379"},
DialTimeout: 5 * time.Second,
Username: "root",
Password: "555",
})
return
}
操作触发源
var (
watchPrefix = "/a/b"
)
func Source(cli *clientv3.Client) {
var err error
for i := 0; ; i++ {
fmt.Println("i=", i)
if i%3 == 0 { // 模拟触发某个条件,则开始触发watch
key := fmt.Sprintf("%s/%v", watchPrefix, i)
if i%2 == 0 { // 假设是偶数的话进行DELETE操作(纯delete操作时注释掉下面的else)
fmt.Printf("触发了条件,准备DELETE: %v\n", key)
_, err = cli.Delete(context.Background(), key)
if err != nil {
fmt.Printf("DELETE 失败: %v\n", err)
continue
}
fmt.Printf("DELETE %v success\n", key)
} else {
//fmt.Printf("触发了条件,准备PUT: %v\n", key)
//_, err = cli.Put(context.Background(), key, strconv.Itoa(i))
//if err != nil {
// fmt.Printf("PUT 失败: %v\n", err)
// continue
//}
//
//fmt.Printf("PUT %v success\n", key)
}
}
time.Sleep(time.Second * 1)
}
}
watch监听
func WatchSource(cli *clientv3.Client) {
wCh := cli.Watch(context.Background(), watchPrefix, clientv3.WithPrefix())
for {
select {
case data := <-wCh:
for i, event := range data.Events { // 每次有几个事件,这里就循环几次
fmt.Printf("receive %v event, %v\n", i, event.Type) // event.Type表示事件类型,PUT还是DELETE
switch event.Type {
case clientv3.EventTypePut:
// do your sth
case clientv3.EventTypeDelete:
// do your sth
}
getKey, getValue := getKeyAndVal(event)
fmt.Printf("已监听到信号, %v=%v 进行了 %v 操作. 下一步通知操作方做相应处理\n", getKey, getValue, event.Type)
// 实际业务处理...
}
}
}
}
func getKeyAndVal(event *clientv3.Event) (getKey, getValue string) {
fmt.Printf("event.PrevKv==nil结果 %v, event.Kv==nil结果 %v\n", event.PrevKv == nil, event.Kv == nil)
fmt.Printf("event.PrevKv= %+v\n", event.PrevKv)
fmt.Printf("event.Kv= %+v\n", event.Kv)
if event.PrevKv != nil {
getKey, getValue = string(event.PrevKv.Key), string(event.PrevKv.Value)
}
if event.Type == mvccpb.PUT {
if event.Kv != nil {
getKey, getValue = string(event.Kv.Key), string(event.Kv.Value)
}
}
return
}启动
func main() {
cli, err := client.InitV3Client()
if err != nil {
fmt.Println(err)
return
}
// 验证方式1,手动在终端操作,这里监听结果
watch.WatchSource(cli)
// 验证方式2,全部在代码中自动运行
//go watch.WatchSource(cli)
//watch.Source(cli)
}
好,一个watch流程实现完了。
细节分析
接下来基于上面代码,我们总结一些细节,如什么时候能收到信号,收到信号后什么时候能拿到上一个版本的值?
我们以方式1做验证。
PUT一个新值看看:
[email protected]:~# docker exec etcd etcdctl put /a/b/3 3 --user=a:123
OK
结果:
receive 0 event, PUT
event.PrevKv==nil结果 true, event.Kv==nil结果 false
event.PrevKv= <nil>
event.Kv= key:"/a/b/3" create_revision:762 mod_revision:762 version:1 value:"3"
已监听到信号, /a/b/3=3 进行了 PUT 操作. 下一步通知操作方做相应处理再put /a/b/3 3一次试试:
receive 0 event, PUT
event.PrevKv==nil结果 true, event.Kv==nil结果 false
event.PrevKv= <nil>
event.Kv= key:"/a/b/3" create_revision:762 mod_revision:763 version:2 value:"3"
已监听到信号, /a/b/3=3 进行了 PUT 操作. 下一步通知操作方做相应处理可以看到version、mod_revision值做了+1操作,PrevKv未拿到值。
那么key不变,val变化呢?
[email protected]:~# docker exec etcd etcdctl put /a/b/3 13 --user=a:123
OK
看看结果:
receive 0 event, PUT
event.PrevKv==nil结果 true, event.Kv==nil结果 false
event.PrevKv= <nil>
event.Kv= key:"/a/b/3" create_revision:762 mod_revision:764 version:3 value:"13"
已监听到信号, /a/b/3=13 进行了 PUT 操作. 下一步通知操作方做相应处理可见event.PrevKv依旧无值。
删除一个已存在数据:
[email protected]:~# docker exec etcd etcdctl del /a/b/3 --user=a:123
1
看看结果:
receive 0 event, DELETE
event.PrevKv==nil结果 true, event.Kv==nil结果 false
event.PrevKv= <nil>
event.Kv= key:"/a/b/3" mod_revision:767
已监听到信号, /a/b/3= 进行了 DELETE 操作. 下一步通知操作方做相应处理删除一个不存在数据:
[email protected]:~# docker exec etcd etcdctl del /a/b/5 --user=a:123
0
receive 0 event, DELETE
event.PrevKv==nil结果 true, event.Kv==nil结果 false
event.PrevKv= <nil>
event.Kv= key:"/a/b/3" mod_revision:767
已监听到信号, /a/b/3= 进行了 DELETE 操作. 下一步通知操作方做相应处理PrevKv依旧为空。
难道,PrevKv总是无值吗?那要这个有什么意义? 不要急,我们做一个小改动:
wCh := cli.Watch(context.Background(), watchPrefix, clientv3.WithPrefix(),clientv3.WithPrevKV())重复上述流程进行验证:
PUT一个新值:
[email protected]:~# docker exec etcd etcdctl put /a/b/3 2 --user=a:123
OK
receive 0 event, PUT
event.PrevKv==nil结果 true, event.Kv==nil结果 false
event.PrevKv= <nil>
event.Kv= key:"/a/b/3" create_revision:806 mod_revision:806 version:1 value:"2"
已监听到信号, /a/b/3=2 进行了 PUT 操作. 下一步通知操作方做相应处理可以看到PrevKv为空。
再来一次:
receive 0 event, PUT
event.PrevKv==nil结果 false, event.Kv==nil结果 false
event.PrevKv= key:"/a/b/3" create_revision:799 mod_revision:800 version:2 value:"2"
event.Kv= key:"/a/b/3" create_revision:799 mod_revision:801 version:3 value:"2"
已监听到信号, /a/b/3=2 进行了 PUT 操作. 下一步通知操作方做相应处理
改一下value :
[email protected]:~# docker exec etcd etcdctl put /a/b/3 3 --user=a:123
OK
receive 0 event, PUT
event.PrevKv==nil结果 false, event.Kv==nil结果 false
event.PrevKv= key:"/a/b/3" create_revision:799 mod_revision:801 version:3 value:"2"
event.Kv= key:"/a/b/3" create_revision:799 mod_revision:802 version:4 value:"3"
已监听到信号, /a/b/3=3 进行了 PUT 操作. 下一步通知操作方做相应处理
重要总结
1,对于PUT操作
put过该key后再次put,相当于更新操作,此时PrevKv有值、可收到任何监听信号;
对key首次PUT时 PrevKv为空、可收到任何监听信号;
2,对于DELETE操作
该key存在时进行delete操作,此时PrevKvs有值,可收到任何监听信号;
若delete不存在的key,则收不到任何监听信号;
其实可以进行的骚操作很多,就看你的需要是什么了,再如我们给上面加了clientv3.WithPrefix(),这就代表操作时会按包含前缀对待,也就是说监听方监听的是以/a/b为前缀的key的变化,如果没有 clientv3.WithPrefix(),删除或PUT一个/a/b/5这样的值注定是要失败的、不会接收到任何监听信号。
边栏推荐
- Wechat campus second-hand book trading applet graduation design finished product (7) Interim inspection report
- 局域网SDN技术硬核内幕 9 从软件Overlay到硬件Overlay
- 开发过程中的总结 BaseService 为所有的 Controller或Service 提供一个公共获取 Service 的文件,减少重复注入
- 真人踩过的坑,告诉你避免自动化测试常犯的10个错误
- 我是如何在一周内拿到4份offer的?
- 局域网SDN硬核技术内幕 19 团结一切可以团结的力量
- 直播实录 | 37 手游如何用 StarRocks 实现用户画像分析
- Mysql无法访问,navicat提示:is not allowed to connect to this MySQL server
- ROS2常用命令行工具整理ROS2CLI
- Uniapp switches the tab bar to display different pages, remembers the page location and pulls up to get new data
猜你喜欢

2022就业季惊喜来袭!正版Adobe软件,终于能正经白嫖一把了

Electronic bidding procurement mall system: optimize traditional procurement business and speed up enterprise digital upgrading

基于ROS的导航框架

Overview of multisensor fusion -- FOV and bev

Here comes the genuine Adobe software! Adobe's only genuine family bucket subscription in the world costs only 0 yuan / year

一文深入浅出理解国产开源木兰许可系列协议

6-14漏洞利用-rpcbind漏洞利用

Wechat hotel reservation applet graduation project (5) assignment

Z-Wave 800: Se firmware upgrade

93.(leaflet篇)leaflet态势标绘-进攻方向修改
随机推荐
About redis, do you update the database first or the cache first?
How to use the order flow analysis tool (in)
The new idea 2022.2 was officially released, and the new features are really fragrant
开幕在即 | “万物互联,使能千行百业”2022开放原子全球开源峰会OpenAtom OpenHarmony分论坛
4G传输模块的功能应用
Custom view: levitation ball and accelerator ball
Application of workflow engine in vivo marketing automation
squid代理服务+ip代理池
局域网SDN硬核技术内幕 20 亢龙有悔——规格与限制(上)
LAN SDN technology hard core insider 5 implementation of virtualized network
Wechat campus second-hand book trading applet graduation design finished product (4) opening report
LAN SDN technology hard core insider 4 from computing virtualization to network virtualization
[technology popularization] alliance chain layer2- on a new possibility
93. (leaflet chapter) leaflet situation plotting - modification of attack direction
沃尔沃xc90的安全性如何?一起来看看吧
Problems encountered in punching
关于Redis,是先更新数据库,还是先更新缓存?
局域网SDN技术硬核内幕 10 云网融合的红娘EVPN
Inside the hard core of LAN SDN technology - evpn implementation of 16 three from thing to person user roaming in the park
LAN SDN technology hard core insider 8 from layer 2 switching to layer 3 routing