当前位置:网站首页>A memory leak caused by timeout scheduling of context and goroutine implementation

A memory leak caused by timeout scheduling of context and goroutine implementation

2022-06-24 16:15:00 Johns

background

A project was launched recently , It is necessary to perform a single node pressure test before going online to estimate the deployment plan of each service . When using the pressure measurement master of Tencent cloud for pressure measurement , Found a very interesting situation . First, let's go to the monitoring chart :

Memory and cpu usage
Network card traffic trend chart

First of all, I am 10:00 Left and right 2 Secondary pressure measurement , Each pressure measurement does not exceed 10 minute . It can be downloaded from CPU usage see , Pressure measuring machine CPU Utilization is rising dramatically ,usage_bytes and rss Memory It also rose at that time , The problem is that after the pressure test CPU Usage has dropped , But our memory was not released in the next few hours . Obviously there must be something in the program hang A big chunk of memory .

So I use pprof The tool looks at the following performance indicators of the machine after the test 【 Be careful : I finished the pressure test 10 Minutes later 】:

pprof

You can see that there is actually 15318 individual goroutine In the use of , And heap There are 2979 Objects , from Network card traffic trend chart We know , After the pressure test, the network card traffic is basically normal .【 The reason why this is not 0 Because of my test environment, I use scripts to regularly put the test traffic , There should be no such interference flow during actual pressure measurement 】

Enter into goroutine Go inside the details , See where it is hang Live so much goroutine.

image.png

You can see /data/ggr/workspace/internal/xxx\_recommend/service/xxx\_recommend\_algo/xxx\_recommend\_algo.go:153 hang Live in the 14235 individual goroutine

And let's see 153 OK, what bad thing did you do

image.png

For the sake of convenience , I simplified the code to a test case , as follows :

package xxx_recommend_algo

import (
	"context"
	"errors"
	"testing"
	"time"
)

func TestxxxRecommendAlgo(t *testing.T) {

    // goroutine A
	go func() {
        //  Set up Context The timeout is 50ms
		backGroundCtx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
		defer cancel()
		// 5.2  adopt GRPC Get the scores of recommended items from the model service on the algorithm side ,  Set timeout time , If exceeded 30ms It is considered that the model has timed out 
		xxxRecommendChannel := make(chan *AlgoServingResponse)

        // goroutine B
		go getXXXRecommend(backGroundCtx, xxxRecommendChannel, t)
		select {
		case xxxRecommendResult := <-xxxRecommendChannel:
			if xxxRecommendResult.err != nil {
				return
			}
			for _, v := range xxxRecommendResult.scores {
				t.Log(v)
			}
		case <-backGroundCtx.Done():
			return
		}
		t.Log(backGroundCtx.Deadline())
	}()

	time.Sleep(time.Second * 10)
	t.Log("ok")
}

func getXXXRecommend(ctx context.Context, xxxRecommendResult chan *AlgoServingResponse, t *testing.T) {
	time.Sleep(time.Second) //  Simulate remote rpc request 
	t.Log("ok1")
	xxxRecommendResult <- &AlgoServingResponse{err: errors.New("error")}
	t.Log("ok2")
}
//  The algorithm recommendation service returns results 
type AlgoServingResponse struct {
	err    error
	scores map[string]int
}

analysis

This code mainly uses Context Implement a timeout call , If the algorithm is in 50ms If you don't return within ,goroutine A It will automatically time out , Instead of waiting for the algorithm to time out ,goroutine B Mainly responsible for rpc Call algorithm service . When the algorithm does not time out , Will not hang live goroutine B, But once the algorithm service times out , that goroutine B already return 了 , here goroutine B Return passage xxxRecommendResult Writing data , Then it will lead to goroutine B Has been blocked in the passage . As the number of timeouts increases , blocked goroutine More and more , It always leads to memory explosion .

We can run the current code , You'll find that ok2 Will never be printed out .

=== RUN   TestxxxRecommendAlgo
    xxx_recommend_algo_test.go:38: ok1
    xxx_recommend_algo_test.go:39: context deadline exceeded
    xxx_recommend_algo_test.go:33: ok
--- PASS: TestxxxRecommendAlgo (10.00s)
PASS

If main Do not exit , that goroutine B It's going to keep clogging up !!!

Solution 1

Check before writing data to the channel Context Whether it has timed out , If it's out of date , Just directly return, There is no need to modify elsewhere .

func getXXXRecommend(ctx context.Context, xxxRecommendResult chan *AlgoServingResponse, t *testing.T) {
	time.Sleep(time.Second) //  Simulate remote rpc request 
	t.Log("ok1")
  	if ctx.Err() == context.Canceled {
		xxxRecommendResult <- &AlgoServingResponse{err: errors.New("error")}
	}
	t.Log("ok2")
}

Solution 2

A better solution is to control the range of timeout control in the remote scheduling method , Change asynchronous to synchronous , Because I have only one scheduling method , There is no need to open a new one goroutine Go for a run .

package xxx_recommend_algo

import (
	"context"
	"errors"
	"testing"
	"time"
)

func TestxxxRecommendAlgo(t *testing.T) {

    // goroutine A
	go func() {
        //  Set up Context The timeout is 50ms
		backGroundCtx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
		defer cancel()
		// 5.2  adopt GRPC Get the scores of recommended items from the model service on the algorithm side ,  Set timeout time , If exceeded 30ms It is considered that the model has timed out 

		xxxRecommendResult := getXXXRecommend(backGroundCtx, xxxRecommendChannel, t)
		if xxxRecommendResult.err != nil{
			return nil, xxxRecommendResult.err
		}
		return xxxRecommendResult.scores, nil
	}
	time.Sleep(time.Second * 10)
	t.Log("ok")
}

func getXXXRecommend(ctx context.Context, xxxRecommendResult chan *AlgoServingResponse, t *testing.T) {
	backGroundCtx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
	defer cancel()
	//  Control the timeout to the minimum of real calls 
	clientConn, err := grpc.DialContext(backGroundCtx, "")
	if err == nil {
		xxxRecommendResult <- &AlgoServingResponse{err: errors.New("error")}
	}
	defer clientConn.close()
	...
	t.Log("ok1")
	xxxRecommendResult <- &AlgoServingResponse{err: errors.New("error")}
	t.Log("ok2")
}
//  The algorithm recommendation service returns results 
type AlgoServingResponse struct {
	err    error
	scores map[string]int
}

summary

【1】 A memory leak does not necessarily cause the program to crash immediately , But any leaks should be disposed of .

【2】Go Unbuffered channels in languages (unbuffered channel) It refers to the channel that does not have the ability to save any value before receiving . This type of channel requires sending goroutine And receiving goroutine At the same time be ready to , To complete the sending and receiving operations .

If two goroutine Not ready at the same time , The channel will cause the first to perform the send or receive operation goroutine Block waiting . The interaction between sending and receiving channels is synchronous in itself . None of these operations can exist alone without the other operation .

【3】Go The cache channel of the language will not block the receiver and sender under normal circumstances , But when the cache pool is full , Will block the transmission , Block the receiver when the cache pool is empty . This must be noted .

原网站

版权声明
本文为[Johns]所创,转载请带上原文链接,感谢
https://yzsam.com/2021/05/20210501152844185U.html