当前位置:网站首页> Golang 實現 Redis(10): 本地原子性事務
Golang 實現 Redis(10): 本地原子性事務
2022-06-22 18:37:00 【Finley】
持續創作,加速成長!這是我參與「掘金日新計劃 · 6 月更文挑戰」的第27天,點擊查看活動詳情
為了支持多個命令的原子性執行 Redis 提供了事務機制。 Redis 官方文檔中稱事務帶有以下兩個重要的保證:
- 事務是一個單獨的隔離操作:事務中的所有命令都會序列化、按順序地執行。事務在執行的過程中,不會被其他客戶端發送來的命令請求所打斷。
- 事務是一個原子操作:事務中的命令要麼全部被執行,要麼全部都不執行
我們在使用事務的過程中可能會遇到兩類錯誤:
- 在命令入隊過程中出現語法錯誤
- 在命令執行過程中出現運行時錯誤,比如對 string 類型的 key 進行 lpush 操作
在遇到語法錯誤時 Redis 會中止命令入隊並丟弃事務。在遇到運行時錯誤時 Redis 僅會報錯然後繼續執行事務中剩下的命令,不會像大多數數據庫那樣回滾事務。對此,Redis 官方的解釋是:
Redis 命令只會因為錯誤的語法而失敗(並且這些問題不能在入隊時發現),或是命令用在了錯誤類型的鍵上面:這也就是說,從實用性的角度來說,失敗的命令是由編程錯誤造成的,而這些錯誤應該在開發的過程中被發現,而不應該出現在生產環境中。
因為不需要對回滾進行支持,所以 Redis 的內部可以保持簡單且快速。 有種觀點認為 Redis 處理事務的做法會產生 bug , 然而需要注意的是, 在通常情况下, 回滾並不能解决編程錯誤帶來的問題。 舉個例子, 如果你本來想通過 INCR 命令將鍵的值加上 1 , 卻不小心加上了 2 , 又或者對錯誤類型的鍵執行了 INCR , 回滾是沒有辦法處理這些情况的。鑒於沒有任何機制能避免程序員自己造成的錯誤, 並且這類錯誤通常不會在生產環境中出現, 所以 Redis 選擇了更簡單、更快速的無回滾方式來處理事務。
emmmm, 接下來我們嘗試在 Godis 中實現具有原子性、隔離性的事務吧。
事務的原子性具有兩個特點:1. 事務執行過程不可被其它事務(線程)插入 2. 事務要麼完全成功要麼完全不執行,不存在部分成功的狀態 事務的隔離性是指事務中操作的結果是否對其它並發事務可見。由於KV數據庫不存在幻讀問題,因此我們需要避免髒讀和不可重複度問題。
事務機制淺析
鎖
與 Redis 的單線程引擎不同 godis 的存儲引擎是並行的,因此需要設計鎖機制來保證執行多條命令執行時的原子性和隔離性。
我們在實現內存數據庫一文中提到:
實現一個常規命令需要提供3個函數:
- ExecFunc 是實際執行命令的函數
- PrepareFunc 在 ExecFunc 前執行,負責分析命令行讀寫了哪些 key 便於進行加鎖
- UndoFunc 僅在事務中被使用,負責准備 undo logs 以備事務執行過程中遇到錯誤需要回滾。
其中的 PrepareFunc 會分析命令行返回要讀寫的 key, 以 prepareMSet 為例:
// return writtenKeys, readKeys
func prepareMSet(args [][]byte) ([]string, []string) {
size := len(args) / 2
keys := make([]string, size)
for i := 0; i < size; i++ {
keys[i] = string(args[2*i])
}
return keys, nil
}
結合實現內存數據庫 中提到的 LockMap 即可完成加鎖。由於其它協程無法獲得相關 key 的鎖所以不可能插入到事務中,所以我們實現了原子性中不可被插入的特性。
事務需要把所有 key 一次性完成加鎖, 只有在事務提交或回滾時才能解鎖。不能用到一個 key 就加一次鎖用完就解鎖,這種方法可能導致髒讀:
| 時間 | 事務1 | 事務2 |
|---|---|---|
| t1 | 鎖定key A | |
| t2 | 修改key A | |
| t3 | 解鎖key A | |
| t4 | 鎖定key A | |
| t4 | 讀取key A | |
| t5 | 解鎖key A | |
| t6 | 提交 |
如上圖所示 t4 時刻, 事務 2 讀到了事務 1未提交的數據,出現了髒讀异常。
回滾
為了在遇到運行時錯誤時事務可以回滾(原子性),可用的回滾方式有兩種:
- 保存修改前的value, 在回滾時用修改前的value進行覆蓋
- 使用回滾命令來撤銷原命令的影響。舉例來說:鍵A原值為1,調用了
Incr A之後變為了2,我們可以再執行一次Set A 1命令來撤銷 incr 命令。
出於節省內存的考慮我們最終選擇了第二種方案。比如 HSet 命令只需要另一條 HSet 將 field 改回原值即可,若采用保存 value 的方法我們則需要保存整個 HashMap。類似情况的還有 LPushRPop 等命令。
有一些命令可能需要多條命令來回滾,比如回滾 Del 時不僅需要恢複對應的 key-value 還需要恢複 TTL 數據。或者 Del 命令删除了多個 key 時,也需要多條命令進行回滾。綜上我們給出 UndoFunc 的定義:
// UndoFunc returns undo logs for the given command line
// execute from head to tail when undo
type UndoFunc func(db *DB, args [][]byte) []CmdLine
我們以可以回滾任意操作的rollbackGivenKeys為例進行說明,當然使用rollbackGivenKeys的成本較高,在可能的情况下盡量實現針對性的 undo log.
func rollbackGivenKeys(db *DB, keys ...string) []CmdLine {
var undoCmdLines [][][]byte
for _, key := range keys {
entity, ok := db.GetEntity(key)
if !ok {
// 原來不存在 key 删掉
undoCmdLines = append(undoCmdLines,
utils.ToCmdLine("DEL", key),
)
} else {
undoCmdLines = append(undoCmdLines,
utils.ToCmdLine("DEL", key), // 先把新 key 删除掉
aof.EntityToCmd(key, entity).Args, // 把 DataEntity 序列化成命令行
toTTLCmd(db, key).Args,
)
}
}
return undoCmdLines
}
接下來看一下 EntityToCmd, 非常簡單易懂:
func EntityToCmd(key string, entity *database.DataEntity) *protocol.MultiBulkReply {
if entity == nil {
return nil
}
var cmd *protocol.MultiBulkReply
switch val := entity.Data.(type) {
case []byte:
cmd = stringToCmd(key, val)
case *List.LinkedList:
cmd = listToCmd(key, val)
case *set.Set:
cmd = setToCmd(key, val)
case dict.Dict:
cmd = hashToCmd(key, val)
case *SortedSet.SortedSet:
cmd = zSetToCmd(key, val)
}
return cmd
}
var hMSetCmd = []byte("HMSET")
func hashToCmd(key string, hash dict.Dict) *protocol.MultiBulkReply {
args := make([][]byte, 2+hash.Len()*2)
args[0] = hMSetCmd
args[1] = []byte(key)
i := 0
hash.ForEach(func(field string, val interface{}) bool {
bytes, _ := val.([]byte)
args[2+i*2] = []byte(field)
args[3+i*2] = bytes
i++
return true
})
return protocol.MakeMultiBulkReply(args)
}
Watch
Redis Watch 命令用於監視一個(或多個) key ,如果在事務執行之前這個(或這些) key 被其他命令所改動,那麼事務將被放弃。
實現 Watch 命令的核心是發現 key 是否被改動,我們使用簡單可靠的版本號方案:為每個 key 存儲一個版本號,版本號變化說明 key 被修改了:
// database/single_db.go
func (db *DB) GetVersion(key string) uint32 {
entity, ok := db.versionMap.Get(key)
if !ok {
return 0
}
return entity.(uint32)
}
// database/transaciton.go
func Watch(db *DB, conn redis.Connection, args [][]byte) redis.Reply {
watching := conn.GetWatching()
for _, bkey := range args {
key := string(bkey)
watching[key] = db.GetVersion(key) // 將當前版本號存在 conn 對象中
}
return protocol.MakeOkReply()
}
在執行事務前比較版本號:
// database/transaciton.go
func isWatchingChanged(db *DB, watching map[string]uint32) bool {
for key, ver := range watching {
currentVersion := db.GetVersion(key)
if ver != currentVersion {
return true
}
}
return false
}
源碼導讀
在了解事務相關機制後,我們可以來看一下事務執行的核心代碼 ExecMulti
func (db *DB) ExecMulti(conn redis.Connection, watching map[string]uint32, cmdLines []CmdLine) redis.Reply {
// 准備階段
// 使用 prepareFunc 獲取事務要讀寫的 key
writeKeys := make([]string, 0) // may contains duplicate
readKeys := make([]string, 0)
for _, cmdLine := range cmdLines {
cmdName := strings.ToLower(string(cmdLine[0]))
cmd := cmdTable[cmdName]
prepare := cmd.prepare
write, read := prepare(cmdLine[1:])
writeKeys = append(writeKeys, write...)
readKeys = append(readKeys, read...)
}
watchingKeys := make([]string, 0, len(watching))
for key := range watching {
watchingKeys = append(watchingKeys, key)
}
readKeys = append(readKeys, watchingKeys...)
// 將要讀寫的 key 和被 watch 的 key 一起加鎖
db.RWLocks(writeKeys, readKeys)
defer db.RWUnLocks(writeKeys, readKeys)
// 檢查被 watch 的 key 是否發生了改變
if isWatchingChanged(db, watching) { // watching keys changed, abort
return protocol.MakeEmptyMultiBulkReply()
}
// 執行階段
results := make([]redis.Reply, 0, len(cmdLines))
aborted := false
undoCmdLines := make([][]CmdLine, 0, len(cmdLines))
for _, cmdLine := range cmdLines {
// 在命令執行前再准備 undo log, 這樣才能保證例如用 decr 回滾 incr 命令的實現可以正常工作
undoCmdLines = append(undoCmdLines, db.GetUndoLogs(cmdLine))
result := db.execWithLock(cmdLine)
if protocol.IsErrorReply(result) {
aborted = true
// don't rollback failed commands
undoCmdLines = undoCmdLines[:len(undoCmdLines)-1]
break
}
results = append(results, result)
}
// 執行成功
if !aborted {
db.addVersion(writeKeys...)
return protocol.MakeMultiRawReply(results)
}
// 事務失敗進行回滾
size := len(undoCmdLines)
for i := size - 1; i >= 0; i-- {
curCmdLines := undoCmdLines[i]
if len(curCmdLines) == 0 {
continue
}
for _, cmdLine := range curCmdLines {
db.execWithLock(cmdLine)
}
}
return protocol.MakeErrReply("EXECABORT Transaction discarded because of previous errors.")
}
边栏推荐
- Filebeat collects log data and transfers it to redis. Different es indexes are created based on log fields through logstash
- Database industry analysis: from the global IT industry trend to the development of domestic databases
- [轻松学会shell编程]-4、单引号和双引号的区别、整形数值的运算、shell中数组定义和sed的详细用法
- Jenkins配置项目集成钉钉通知
- When online and offline integration accelerates and information docking channels are diversified, the traditional center will not be necessary
- Alibaba cloud cannot find the account security group id problem during the account transfer
- Preliminary controller input of oculus learning notes (1)
- AHA C language Chapter 6 God, a large string of numbers is approaching (lesson 26)
- 数组实现循环链表
- Tasks and responsibilities of the test team and basic concepts of testing
猜你喜欢

缺失值處理
传输层 知识点总结

The world's first AR contact lens, the entrance of metauniverse is really opened this time?

High voltage direct current (HVDC) model based on converter (MMC) technology and voltage source converter (VSC) (implemented by MATLAB & Simulink)

写一本畅销书是怎样的一种体验

2022年T电梯修理复训题库及答案

项目经理们在哪个时刻特别想逃离工作?

知乎热问:一个程序员的水平能差到什么程度?

Unity中通过射线躲避障碍物寻路的一些初步探索

Explain the startup process of opengauss multithreading architecture in detail
随机推荐
啊哈C语言 第6章 天啊 一大串数正在接近(第26讲)
plsql变量赋值问题
腾讯云国际版云服务器欠费说明
Does CDC 2.2.1 monitoring sqlserver not support monitoring multiple databases?
math_ Angular function & inverse trigonometric function
<JVM上篇:内存与垃圾回收篇>08-对象实例化及直接内存
The world's first AR contact lens, the entrance of metauniverse is really opened this time?
Pytorch——报错解决:“torch/optim/adamw.py” beta1, UnboundLocalError: local variable ‘beta1‘
问下 cdc 2.2.1监控sqlServer是不支持监控多库的吗?
SOA面向服务的架构
Traitement des valeurs manquantes
Correct method of converting Inkscape into DXF file SVG exporting DXF file
Tasks and responsibilities of the test team and basic concepts of testing
Donghua University - Research on interpretable recommendation micro behavior with enhanced knowledge perception reasoning
Preliminary controller input of oculus learning notes (1)
直播预告 | 12位一作华人学者开启 ICLR 2022
Unity中通过射线躲避障碍物寻路的一些初步探索
Babbitt | yuancosmos daily must read: it is said that Tencent has established XR department, and yuancosmos sector has risen again. Many securities companies have issued reports to pay attention to th
Oracle中dbms_output.put_line的用法实例
At 19:30 today, the science popularization leader said that he would take you to explore how AI can stimulate human creativity