当前位置:网站首页>Use go to process millions of requests per minute

Use go to process millions of requests per minute

2022-06-24 12:15:00 RememberGo

WeChat official account : Wu Qinqiang's late night canteen

Introduce

I came across an article written in 15 Articles from , Tell the truth , The title really attracted me , But after reading it several times , It's really wonderful . About this article , I will not translate directly . The requirements of the project are The client sends the request , The server receives the request processing data ( The original text is to upload resources to Amazon S3 Resources ). That's the essence of it ,

I slightly changed the original business code , But it does not affect the core modules . In the first edition , For every one received Request, To start a goroutine To deal with , Respond quickly , Very routine operation .

The code is as follows

First edition

package main

import (
	"fmt"
	"log"
	"net/http"
	"time"
)

type Payload struct {
	//  It doesn't matter what you pass 
}

func (p *Payload) UpdateToS3() error {
	// Storage logic , The simulation takes time 
	time.Sleep(500 * time.Millisecond)
	fmt.Println(" Upload successful ")
	return nil
}

func payloadHandler(w http.ResponseWriter, r *http.Request) {
	//  Business filtering 
	//  request body analysis ......
	var p Payload
	go p.UpdateToS3()
	w.Write([]byte(" Successful operation "))
}

func main() {
	http.HandleFunc("/payload", payloadHandler)
	log.Fatal(http.ListenAndServe(":8099", nil))
}

What is the problem with this operation ? In general , No problem . But in the case of high concurrency , incorrect goroutine Number to control , Yours CPU Usage has skyrocketed , Memory usage soared , Until the program crashes .

If this operation lands in the database , for example mysql, So the corresponding , Your database server disk IO、 network bandwidth 、CPU load 、 The memory consumption will be very high , Run away together . therefore , Once something uncontrollable occurs in the program , It is often a sign of danger .

Chinese version

package main

import (
	"fmt"
	"log"
	"net/http"
	"time"
)

const MaxQueue = 400

var Queue chan Payload

func init() {
	Queue = make(chan Payload, MaxQueue)
}

type Payload struct {
	//  It doesn't matter what you pass 
}

func (p *Payload) UpdateToS3() error {
	// Storage logic , The simulation takes time 
	time.Sleep(500 * time.Millisecond)
	fmt.Println(" Upload successful ")
	return nil
}

func payloadHandler(w http.ResponseWriter, r *http.Request) {
	//  Business filtering 
	//  request body analysis ......
	var p Payload
	//go p.UpdateToS3()
	Queue <- p
	w.Write([]byte(" Successful operation "))
}

//  Processing tasks 
func StartProcessor() {
	for {
		select {
		case payload := <-Queue:
			payload.UpdateToS3()
		}
	}
}

func main() {
	http.HandleFunc("/payload", payloadHandler)
	// Open one alone g Receive and process tasks 
	go StartProcessor()
	log.Fatal(http.ListenAndServe(":8099", nil))
}

This edition uses the belt buffered Of channel To do that , In this way, unlimited goroutine, But it still hasn't solved the problem .

Processing a request is a synchronous operation , Only one task at a time , However, the speed at which Gaohe sends a request is far faster than that of processing . This situation , once channel After full , Subsequent requests will be blocked . And then you'll see , The response time will start to increase dramatically , There was even no response .

Final edition

package main

import (
"fmt"
"log"
"net/http"
"time"
)

const (
	MaxWorker = 100 // Set the value arbitrarily 
	MaxQueue  = 200 //  Set the value arbitrarily 
)

//  A buffer that can send work requests  channel
var JobQueue chan Job

func init() {
	JobQueue = make(chan Job, MaxQueue)
}

type Payload struct{}

type Job struct {
	PayLoad Payload
}

type Worker struct {
	WorkerPool chan chan Job
	JobChannel chan Job
	quit       chan bool
}

func NewWorker(workerPool chan chan Job) Worker {
	return Worker{
		WorkerPool: workerPool,
		JobChannel: make(chan Job),
		quit:       make(chan bool),
	}
}

// Start  Method to open a  worker  loop , Listen out  channel, You can stop the cycle on demand 
func (w Worker) Start() {
	go func() {
		for {
			//  Change the current  worker  Sign up to  worker  In line 
			w.WorkerPool <- w.JobChannel
			select {
			case job := <-w.JobChannel:
				// 	 The real business 
				//	 The simulation takes time 
				time.Sleep(500 * time.Millisecond)
				fmt.Printf(" Upload successful :%v\n", job)
			case <-w.quit:
				return
			}
		}
	}()
}

func (w Worker) stop() {
	go func() {
		w.quit <- true
	}()
}

//  Initialization operation 

type Dispatcher struct {
	//  Sign up to  dispatcher  Of  worker channel  pool 
	WorkerPool chan chan Job
}

func NewDispatcher(maxWorkers int) *Dispatcher {
	pool := make(chan chan Job, maxWorkers)
	return &Dispatcher{WorkerPool: pool}
}

func (d *Dispatcher) Run() {
	//  Began to run  n  individual  worker
	for i := 0; i < MaxWorker; i++ {
		worker := NewWorker(d.WorkerPool)
		worker.Start()
	}
	go d.dispatch()
}

func (d *Dispatcher) dispatch() {
	for {
		select {
		case job := <-JobQueue:
			go func(job Job) {
				//  Try to get an available  worker job channel, Block until there's something available  worker
				jobChannel := <-d.WorkerPool
				//  Distribute tasks to  worker job channel  in 
				jobChannel <- job
			}(job)
		}
	}
}

//  Receiving request , Sift tasks into JobQueue.
func payloadHandler(w http.ResponseWriter, r *http.Request) {
	work := Job{PayLoad: Payload{}}
	JobQueue <- work
	_, _ = w.Write([]byte(" Successful operation "))
}

func main() {
	//  Create... Through the scheduler worker, Monitor from  JobQueue The task of 
	d := NewDispatcher(MaxWorker)
	d.Run()
	http.HandleFunc("/payload", payloadHandler)
	log.Fatal(http.ListenAndServe(":8099", nil))
}

The end result is two levels channel, The first level is to put user request data into chan Job in , This channel job Equivalent to the task queue to be processed .

The other level is used to store tasks that can be processed work Cache queue , The type is chan chan Job. The scheduler puts the pending tasks into an idle cache queue ,work Will always process its cache queue . In this way , Implemented a worker pool . Draw a picture to help understand

First of all, after we receive a request , establish Job Mission , Put it in the task queue and wait work Pool treatment .

func payloadHandler(w http.ResponseWriter, r *http.Request) {
	work := Job{PayLoad: Payload{}}
	JobQueue <- work
	_, _ = w.Write([]byte(" Successful operation "))
}

Scheduler initialization work Behind the pool , stay dispatch in , Once we receive JobQueue The task of , Just try to get a usable worker, Distribute tasks to worker Of job channel in . Note that this process is not synchronous , It's every time you get one job, Just turn on one G To deal with . This ensures JobQueue There's no need to block , Corresponding to JobQueue In theory, there's no need to block writing tasks .

func (d *Dispatcher) Run() {
	//  Began to run  n  individual  worker
	for i := 0; i < MaxWorker; i++ {
		worker := NewWorker(d.WorkerPool)
		worker.Start()
	}
	go d.dispatch()
}

func (d *Dispatcher) dispatch() {
	for {
		select {
		case job := <-JobQueue:
			go func(job Job) {
				//  Try to get an available  worker job channel, Block until there's something available  worker
				jobChannel := <-d.WorkerPool
				//  Distribute tasks to  worker job channel  in 
				jobChannel <- job
			}(job)
		}
	}
}

here " Out of control " Of G It is different from the above . It is only blocked for a very short time Chan state , When there is free worker Awakened , Then distribute the task , The entire lifecycle is much shorter than the above operations .

Last , It is strongly recommended that you read the original , The original address is :http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/

原网站

版权声明
本文为[RememberGo]所创,转载请带上原文链接,感谢
https://yzsam.com/2021/06/20210604113032551V.html