当前位置:网站首页>System library golang Org/x/time/rate frequency limiter bug
System library golang Org/x/time/rate frequency limiter bug
2022-06-24 03:05:00 【wish42】
background
Recently, when using the frequency limiter, I found golang The frequency limiters in the auxiliary system library are bug, Share it and discuss it with you .
Test code :
package main
import (
"fmt"
"sync/atomic"
"time"
"golang.org/x/time/rate"
)
func main() {
var succCount, failCount int64
limit := rate.Every(100 * time.Millisecond)
burst := 1
limiter := rate.NewLimiter(limit, burst)
start := time.Now()
for i := 0; i < 5000; i++ {
go func() {
for {
if limiter.Allow() {
atomic.AddInt64(&succCount, 1)
} else {
atomic.AddInt64(&failCount, 1)
}
}
}()
}
time.Sleep(2 * time.Second)
elapsed := time.Since(start)
fmt.Println("elapsed=", elapsed, "succCount=", atomic.LoadInt64(&succCount), "failCount=", atomic.LoadInt64(&failCount))
}Output :
elapsed= 2.010675962s succCount= 24849 failCount= 6894827
The use of go edition :
go version go1.16.2 darwin/amd64
As can be seen from the above example , Set up qps Is passing every second 10 A request , However, in the case of multiple concurrent processes 2s Within a period of time, it passed 24849 A request . stay trpc When used in service scenarios, each request will also open a collaboration process for business logic processing , In such a scenario, it's just bug 了 .
reason
Let's take a deep look at the code :
see time/rate Source code ,Allow The implementation of the function is just AllowN(time.Now(), 1) Convenient implementation of :
// Allow is shorthand for AllowN(time.Now(), 1).
func (lim *Limiter) Allow() bool {
return lim.AllowN(time.Now(), 1)
}AllowN Call again reserveN Method :
// AllowN reports whether n events may happen at time now.
// Use this method if you intend to drop / skip events that exceed the rate limit.
// Otherwise use Reserve or Wait.
func (lim *Limiter) AllowN(now time.Time, n int) bool {
return lim.reserveN(now, n, 0).ok
}reserveN The implementation of is very interesting ,
// reserveN is a helper method for AllowN, ReserveN, and WaitN.
// maxFutureReserve specifies the maximum reservation wait duration allowed.
// reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN.
func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {
lim.mu.Lock()
if lim.limit == Inf {
lim.mu.Unlock()
return Reservation{
ok: true,
lim: lim,
tokens: n,
timeToAct: now,
}
}
now, last, tokens := lim.advance(now)
// Calculate the remaining number of tokens resulting from the request.
tokens -= float64(n)
// Calculate the wait duration
var waitDuration time.Duration
if tokens < 0 {
waitDuration = lim.limit.durationFromTokens(-tokens)
}
// Decide result
ok := n <= lim.burst && waitDuration <= maxFutureReserve
// Prepare reservation
r := Reservation{
ok: ok,
lim: lim,
limit: lim.limit,
}
if ok {
r.tokens = n
r.timeToAct = now.Add(waitDuration)
}
// Update state
if ok {
lim.last = now
lim.tokens = tokens
lim.lastEvent = r.timeToAct
} else {
lim.last = last
}
lim.mu.Unlock()
return r
}The more important method is advance The implementation of the :
// advance calculates and returns an updated state for lim resulting from the passage of time.
// lim is not changed.
func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {
last := lim.last
if now.Before(last) {
last = now
}
// Avoid making delta overflow below when last is very old.
maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens)
elapsed := now.Sub(last)
if elapsed > maxElapsed {
elapsed = maxElapsed
}
// Calculate the new number of tokens, due to time that passed.
delta := lim.limit.tokensFromDuration(elapsed)
tokens := lim.tokens + delta
if burst := float64(lim.burst); tokens > burst {
tokens = burst
}
return now, last, tokens
}This function returns three parameters newNow,newLast,newTokens.
You can see from the code that the first parameter newNow In fact, it is completely the incoming parameter now Straight back , So this first return value is actually unnecessary ;
The second parameter , Is to return to the last tokens The point in time that was updated , If the current incoming time point is before the last updated time point, the current incoming time point will also be returned ;
The third parameter newTokens It is converted into... According to the elapsed time between the current time point and the last updated time point token Quantity is returned .
Let's take a look back at the Chinese notes I added reserveN The implementation logic of :
// reserveN is a helper method for AllowN, ReserveN, and WaitN.
// maxFutureReserve specifies the maximum reservation wait duration allowed.
// reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN.
func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {
// Add a lock first , That's easy to understand
lim.mu.Lock()
// Judge limit Is it infinite , Go straight back to ok
if lim.limit == Inf {
lim.mu.Unlock()
return Reservation{
ok: true,
lim: lim,
tokens: n,
timeToAct: now,
}
}
// adopt advance Function to get now This time point can be used token Number
now, last, tokens := lim.advance(now)
// Calculate the remaining number of tokens resulting from the request.
tokens -= float64(n)
// Calculate the wait duration
var waitDuration time.Duration
if tokens < 0 {
waitDuration = lim.limit.durationFromTokens(-tokens)
}
// Decide result
ok := n <= lim.burst && waitDuration <= maxFutureReserve
// Prepare reservation
r := Reservation{
ok: ok,
lim: lim,
limit: lim.limit,
}
if ok {
r.tokens = n
r.timeToAct = now.Add(waitDuration)
}
// Update status here , If ok To update the current time point and the fields to be updated , But if not ok Why do I need to update last Field
// Update state
if ok {
lim.last = now
lim.tokens = tokens
lim.lastEvent = r.timeToAct
} else {
lim.last = last
}
lim.mu.Unlock()
return r
}The above code comments have pointed out that if the acquisition does not ok Words , Only updated here lim Medium last Field , Let's make a textual research :
type Limiter struct {
limit Limit
burst int
mu sync.Mutex
tokens float64
// last is the last time the limiter's tokens field was updated
last time.Time
// lastEvent is the latest time of a rate-limited event (past or future)
lastEvent time.Time
}The notes here are very clear ,last It's above. tokens The point in time when the field is updated . So the above reserveN In the update last The operation of fields is very confusing .
So here, change the code of the system library to verify , take reserveN Modify the method :
// reserveN is a helper method for AllowN, ReserveN, and WaitN.
// maxFutureReserve specifies the maximum reservation wait duration allowed.
// reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN.
func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {
lim.mu.Lock()
if lim.limit == Inf {
lim.mu.Unlock()
return Reservation{
ok: true,
lim: lim,
tokens: n,
timeToAct: now,
}
}
// Here to ignore advance The second field returned
now, _, tokens := lim.advance(now)
// Calculate the remaining number of tokens resulting from the request.
tokens -= float64(n)
// Calculate the wait duration
var waitDuration time.Duration
if tokens < 0 {
waitDuration = lim.limit.durationFromTokens(-tokens)
}
// Decide result
ok := n <= lim.burst && waitDuration <= maxFutureReserve
// Prepare reservation
r := Reservation{
ok: ok,
lim: lim,
limit: lim.limit,
}
if ok {
r.tokens = n
r.timeToAct = now.Add(waitDuration)
}
// Update state
if ok {
lim.last = now
lim.tokens = tokens
lim.lastEvent = r.timeToAct
}
// If you don't ok Do nothing if
lim.mu.Unlock()
return r
}The modification points have been indicated in the notes , Let the above test code depend on the modified library code to re execute :
package main
import (
"fmt"
"sync/atomic"
"time"
//"golang.org/x/time/rate"
rate "git.code.oa.com/gcd/go-utils/comm/trate"
)
func main() {
var succCount, failCount int64
limit := rate.Every(100 * time.Millisecond)
burst := 1
limiter := rate.NewLimiter(limit, burst)
start := time.Now()
for i := 0; i < 5000; i++ {
go func() {
for {
if limiter.Allow() {
atomic.AddInt64(&succCount, 1)
} else {
atomic.AddInt64(&failCount, 1)
}
}
}()
}
time.Sleep(2 * time.Second)
elapsed := time.Since(start)
fmt.Println("elapsed=", elapsed, "succCount=", atomic.LoadInt64(&succCount), "failCount=", atomic.LoadInt64(&failCount))
}elapsed= 2.009816654s succCount= 21 failCount= 7513617
progress
To do this in GitHub The provides a for the system library issue, Found early 2017 Someone discovered this problem and raised it in fix The advice of , But it has not been merged into master.
summary
If you use this frequency limiter , Be careful to avoid the pit .
边栏推荐
- Grpc: implement service end flow restriction
- Kibana report generation failed due to custom template
- 2022-2028 global high tibial osteotomy plate industry research and trend analysis report
- Ner's past, present and future Overview - past
- Create and mount large files
- Crawler series: using API
- How to query trademark registration? Where should I check?
- 2022-2028 global anti counterfeiting label industry research and trend analysis report
- Double 11 will arrive soon. Is your website ready?
- Storage crash MySQL database recovery case
猜你喜欢
![[summary of interview questions] zj6 redis](/img/4b/eadf66ca8d834f049f3546d348fa32.jpg)
[summary of interview questions] zj6 redis
![[51nod] 3216 Awards](/img/94/fdb32434d1343040d711c76568b281.jpg)
[51nod] 3216 Awards

What is etcd and its application scenarios

2022-2028 global pilot night vision goggle industry research and trend analysis report

2022-2028 global aircraft front wheel steering system industry research and trend analysis report

2022-2028 global cell-based seafood industry research and trend analysis report
![[51nod] 2653 section XOR](/img/2d/cb4bf4e14939ce432cac6d35b6a41b.jpg)
[51nod] 2653 section XOR

2022-2028 global indoor pressure monitor and environmental monitor industry research and trend analysis report

2022-2028 global cancer biopsy instrument and kit industry research and trend analysis report

IOS development - multithreading - thread safety (3)
随机推荐
2022-2028 global medical coating materials industry research and trend analysis report
Hook principle
What is the use of cloud desktop security server configuration? What should I do?
Industry experts talk about "extortion virus": how does e-government build a moat?
How to choose the appropriate configuration server?
Create and mount large files
RI Geng series: tricks of using function pointers
The most comprehensive arrangement of safe operation solutions from various manufacturers
Crawler series: using API
The reason why SAS fortress cannot connect to the server
Permission maintenance topic: domain controller permission maintenance
[51nod] 3216 Awards
Grc: GRC interface is mixed with restful API
Visual AI, first!
Cloud call: one line of code is directly connected to wechat open interface capability
How much does it cost to rent a cloud game server? Which cloud game server is more reliable?
Is the cloud game edge computing server highly required? What problems will occur during the use of cloud game edge computing server?
How do I check the trademark registration number? Where do I need to check?
How to install the cloud desktop security server certificate? What can cloud desktops do?
Three Scheduling Strategies in yarn