当前位置:网站首页>System library golang Org/x/time/rate frequency limiter bug

System library golang Org/x/time/rate frequency limiter bug

2022-06-24 03:05:00 wish42

background

Recently, when using the frequency limiter, I found golang The frequency limiters in the auxiliary system library are bug, Share it and discuss it with you .

Test code :

package main

import (
	"fmt"
	"sync/atomic"
	"time"
	"golang.org/x/time/rate"
)

func main() {
	var succCount, failCount int64
	limit := rate.Every(100 * time.Millisecond)
	burst := 1
	limiter := rate.NewLimiter(limit, burst)
	start := time.Now()
	for i := 0; i < 5000; i++ {
		go func() {
			for {
				if limiter.Allow() {
					atomic.AddInt64(&succCount, 1)
				} else {
					atomic.AddInt64(&failCount, 1)
				}
			}
		}()
	}
	time.Sleep(2 * time.Second)
	elapsed := time.Since(start)
	fmt.Println("elapsed=", elapsed, "succCount=", atomic.LoadInt64(&succCount), "failCount=", atomic.LoadInt64(&failCount))
}

Output :

elapsed= 2.010675962s succCount= 24849 failCount= 6894827

The use of go edition :

go version go1.16.2 darwin/amd64

As can be seen from the above example , Set up qps Is passing every second 10 A request , However, in the case of multiple concurrent processes 2s Within a period of time, it passed 24849 A request . stay trpc When used in service scenarios, each request will also open a collaboration process for business logic processing , In such a scenario, it's just bug 了 .

reason

Let's take a deep look at the code :

see time/rate Source code ,Allow The implementation of the function is just AllowN(time.Now(), 1) Convenient implementation of :

// Allow is shorthand for AllowN(time.Now(), 1).
func (lim *Limiter) Allow() bool {
	return lim.AllowN(time.Now(), 1)
}

AllowN Call again reserveN Method :

// AllowN reports whether n events may happen at time now.
// Use this method if you intend to drop / skip events that exceed the rate limit.
// Otherwise use Reserve or Wait.
func (lim *Limiter) AllowN(now time.Time, n int) bool {
	return lim.reserveN(now, n, 0).ok
}

reserveN The implementation of is very interesting ,

// reserveN is a helper method for AllowN, ReserveN, and WaitN.
// maxFutureReserve specifies the maximum reservation wait duration allowed.
// reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN.
func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {
	lim.mu.Lock()

	if lim.limit == Inf {
		lim.mu.Unlock()
		return Reservation{
			ok:        true,
			lim:       lim,
			tokens:    n,
			timeToAct: now,
		}
	}

	now, last, tokens := lim.advance(now)

	// Calculate the remaining number of tokens resulting from the request.
	tokens -= float64(n)

	// Calculate the wait duration
	var waitDuration time.Duration
	if tokens < 0 {
		waitDuration = lim.limit.durationFromTokens(-tokens)
	}

	// Decide result
	ok := n <= lim.burst && waitDuration <= maxFutureReserve

	// Prepare reservation
	r := Reservation{
		ok:    ok,
		lim:   lim,
		limit: lim.limit,
	}
	if ok {
		r.tokens = n
		r.timeToAct = now.Add(waitDuration)
	}

	// Update state
	if ok {
		lim.last = now
		lim.tokens = tokens
		lim.lastEvent = r.timeToAct
	} else {
		lim.last = last
	}

	lim.mu.Unlock()
	return r
}

The more important method is advance The implementation of the :

// advance calculates and returns an updated state for lim resulting from the passage of time.
// lim is not changed.
func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {
	last := lim.last
	if now.Before(last) {
		last = now
	}

	// Avoid making delta overflow below when last is very old.
	maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens)
	elapsed := now.Sub(last)
	if elapsed > maxElapsed {
		elapsed = maxElapsed
	}

	// Calculate the new number of tokens, due to time that passed.
	delta := lim.limit.tokensFromDuration(elapsed)
	tokens := lim.tokens + delta
	if burst := float64(lim.burst); tokens > burst {
		tokens = burst
	}

	return now, last, tokens
}

This function returns three parameters newNow,newLast,newTokens.

You can see from the code that the first parameter newNow In fact, it is completely the incoming parameter now Straight back , So this first return value is actually unnecessary ;

The second parameter , Is to return to the last tokens The point in time that was updated , If the current incoming time point is before the last updated time point, the current incoming time point will also be returned ;

The third parameter newTokens It is converted into... According to the elapsed time between the current time point and the last updated time point token Quantity is returned .

Let's take a look back at the Chinese notes I added reserveN The implementation logic of :

// reserveN is a helper method for AllowN, ReserveN, and WaitN.
// maxFutureReserve specifies the maximum reservation wait duration allowed.
// reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN.
func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {
    //  Add a lock first , That's easy to understand 
	lim.mu.Lock()
    
    //  Judge limit Is it infinite , Go straight back to ok
	if lim.limit == Inf {
		lim.mu.Unlock()
		return Reservation{
			ok:        true,
			lim:       lim,
			tokens:    n,
			timeToAct: now,
		}
	}
    //  adopt advance Function to get now This time point can be used token Number 
	now, last, tokens := lim.advance(now)
    
	// Calculate the remaining number of tokens resulting from the request.
	tokens -= float64(n)

	// Calculate the wait duration
	var waitDuration time.Duration
	if tokens < 0 {
		waitDuration = lim.limit.durationFromTokens(-tokens)
	}

	// Decide result
	ok := n <= lim.burst && waitDuration <= maxFutureReserve

	// Prepare reservation
	r := Reservation{
		ok:    ok,
		lim:   lim,
		limit: lim.limit,
	}
	if ok {
		r.tokens = n
		r.timeToAct = now.Add(waitDuration)
	}
    //  Update status here , If ok To update the current time point and the fields to be updated , But if not ok Why do I need to update last Field 
	// Update state
	if ok {
		lim.last = now
		lim.tokens = tokens
		lim.lastEvent = r.timeToAct
	} else {
		lim.last = last
	}

	lim.mu.Unlock()
	return r
}

The above code comments have pointed out that if the acquisition does not ok Words , Only updated here lim Medium last Field , Let's make a textual research :

type Limiter struct {
	limit Limit
	burst int

	mu     sync.Mutex
	tokens float64
	// last is the last time the limiter's tokens field was updated
	last time.Time
	// lastEvent is the latest time of a rate-limited event (past or future)
	lastEvent time.Time
}

The notes here are very clear ,last It's above. tokens The point in time when the field is updated . So the above reserveN In the update last The operation of fields is very confusing .

So here, change the code of the system library to verify , take reserveN Modify the method :

// reserveN is a helper method for AllowN, ReserveN, and WaitN.
// maxFutureReserve specifies the maximum reservation wait duration allowed.
// reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN.
func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {
	lim.mu.Lock()

	if lim.limit == Inf {
		lim.mu.Unlock()
		return Reservation{
			ok:        true,
			lim:       lim,
			tokens:    n,
			timeToAct: now,
		}
	}
    
    //  Here to ignore advance The second field returned 
	now, _, tokens := lim.advance(now)

	// Calculate the remaining number of tokens resulting from the request.
	tokens -= float64(n)

	// Calculate the wait duration
	var waitDuration time.Duration
	if tokens < 0 {
		waitDuration = lim.limit.durationFromTokens(-tokens)
	}

	// Decide result
	ok := n <= lim.burst && waitDuration <= maxFutureReserve

	// Prepare reservation
	r := Reservation{
		ok:    ok,
		lim:   lim,
		limit: lim.limit,
	}
	if ok {
		r.tokens = n
		r.timeToAct = now.Add(waitDuration)
	}

	// Update state
	if ok {
		lim.last = now
		lim.tokens = tokens
		lim.lastEvent = r.timeToAct
	}
	//  If you don't ok Do nothing if 

	lim.mu.Unlock()
	return r
}

The modification points have been indicated in the notes , Let the above test code depend on the modified library code to re execute :

package main

import (
	"fmt"
	"sync/atomic"
	"time"
	//"golang.org/x/time/rate"

	rate "git.code.oa.com/gcd/go-utils/comm/trate"
)

func main() {
	var succCount, failCount int64
	limit := rate.Every(100 * time.Millisecond)
	burst := 1
	limiter := rate.NewLimiter(limit, burst)
	start := time.Now()
	for i := 0; i < 5000; i++ {
		go func() {
			for {
				if limiter.Allow() {
					atomic.AddInt64(&succCount, 1)
				} else {
					atomic.AddInt64(&failCount, 1)
				}
			}
		}()
	}
	time.Sleep(2 * time.Second)
	elapsed := time.Since(start)
	fmt.Println("elapsed=", elapsed, "succCount=", atomic.LoadInt64(&succCount), "failCount=", atomic.LoadInt64(&failCount))
}
elapsed= 2.009816654s succCount= 21 failCount= 7513617

progress

To do this in GitHub The provides a for the system library issue, Found early 2017 Someone discovered this problem and raised it in fix The advice of , But it has not been merged into master.

summary

If you use this frequency limiter , Be careful to avoid the pit .

原网站

版权声明
本文为[wish42]所创,转载请带上原文链接,感谢
https://yzsam.com/2021/10/20211019173626143z.html