当前位置:网站首页>Use go to process millions of requests per minute
Use go to process millions of requests per minute
2022-06-24 12:15:00 【RememberGo】
WeChat official account : Wu Qinqiang's late night canteen
Introduce
I came across an article written in 15 Articles from , Tell the truth , The title really attracted me , But after reading it several times , It's really wonderful . About this article , I will not translate directly . The requirements of the project are The client sends the request , The server receives the request processing data ( The original text is to upload resources to Amazon S3 Resources ). That's the essence of it ,
I slightly changed the original business code , But it does not affect the core modules . In the first edition , For every one received Request, To start a goroutine To deal with , Respond quickly , Very routine operation .
The code is as follows
First edition
package main
import (
"fmt"
"log"
"net/http"
"time"
)
type Payload struct {
// It doesn't matter what you pass
}
func (p *Payload) UpdateToS3() error {
// Storage logic , The simulation takes time
time.Sleep(500 * time.Millisecond)
fmt.Println(" Upload successful ")
return nil
}
func payloadHandler(w http.ResponseWriter, r *http.Request) {
// Business filtering
// request body analysis ......
var p Payload
go p.UpdateToS3()
w.Write([]byte(" Successful operation "))
}
func main() {
http.HandleFunc("/payload", payloadHandler)
log.Fatal(http.ListenAndServe(":8099", nil))
} What is the problem with this operation ? In general , No problem . But in the case of high concurrency , incorrect goroutine Number to control , Yours CPU Usage has skyrocketed , Memory usage soared , Until the program crashes .
If this operation lands in the database , for example mysql, So the corresponding , Your database server disk IO、 network bandwidth 、CPU load 、 The memory consumption will be very high , Run away together . therefore , Once something uncontrollable occurs in the program , It is often a sign of danger .
Chinese version
package main
import (
"fmt"
"log"
"net/http"
"time"
)
const MaxQueue = 400
var Queue chan Payload
func init() {
Queue = make(chan Payload, MaxQueue)
}
type Payload struct {
// It doesn't matter what you pass
}
func (p *Payload) UpdateToS3() error {
// Storage logic , The simulation takes time
time.Sleep(500 * time.Millisecond)
fmt.Println(" Upload successful ")
return nil
}
func payloadHandler(w http.ResponseWriter, r *http.Request) {
// Business filtering
// request body analysis ......
var p Payload
//go p.UpdateToS3()
Queue <- p
w.Write([]byte(" Successful operation "))
}
// Processing tasks
func StartProcessor() {
for {
select {
case payload := <-Queue:
payload.UpdateToS3()
}
}
}
func main() {
http.HandleFunc("/payload", payloadHandler)
// Open one alone g Receive and process tasks
go StartProcessor()
log.Fatal(http.ListenAndServe(":8099", nil))
} This edition uses the belt buffered Of channel To do that , In this way, unlimited goroutine, But it still hasn't solved the problem .
Processing a request is a synchronous operation , Only one task at a time , However, the speed at which Gaohe sends a request is far faster than that of processing . This situation , once channel After full , Subsequent requests will be blocked . And then you'll see , The response time will start to increase dramatically , There was even no response .
Final edition
package main
import (
"fmt"
"log"
"net/http"
"time"
)
const (
MaxWorker = 100 // Set the value arbitrarily
MaxQueue = 200 // Set the value arbitrarily
)
// A buffer that can send work requests channel
var JobQueue chan Job
func init() {
JobQueue = make(chan Job, MaxQueue)
}
type Payload struct{}
type Job struct {
PayLoad Payload
}
type Worker struct {
WorkerPool chan chan Job
JobChannel chan Job
quit chan bool
}
func NewWorker(workerPool chan chan Job) Worker {
return Worker{
WorkerPool: workerPool,
JobChannel: make(chan Job),
quit: make(chan bool),
}
}
// Start Method to open a worker loop , Listen out channel, You can stop the cycle on demand
func (w Worker) Start() {
go func() {
for {
// Change the current worker Sign up to worker In line
w.WorkerPool <- w.JobChannel
select {
case job := <-w.JobChannel:
// The real business
// The simulation takes time
time.Sleep(500 * time.Millisecond)
fmt.Printf(" Upload successful :%v\n", job)
case <-w.quit:
return
}
}
}()
}
func (w Worker) stop() {
go func() {
w.quit <- true
}()
}
// Initialization operation
type Dispatcher struct {
// Sign up to dispatcher Of worker channel pool
WorkerPool chan chan Job
}
func NewDispatcher(maxWorkers int) *Dispatcher {
pool := make(chan chan Job, maxWorkers)
return &Dispatcher{WorkerPool: pool}
}
func (d *Dispatcher) Run() {
// Began to run n individual worker
for i := 0; i < MaxWorker; i++ {
worker := NewWorker(d.WorkerPool)
worker.Start()
}
go d.dispatch()
}
func (d *Dispatcher) dispatch() {
for {
select {
case job := <-JobQueue:
go func(job Job) {
// Try to get an available worker job channel, Block until there's something available worker
jobChannel := <-d.WorkerPool
// Distribute tasks to worker job channel in
jobChannel <- job
}(job)
}
}
}
// Receiving request , Sift tasks into JobQueue.
func payloadHandler(w http.ResponseWriter, r *http.Request) {
work := Job{PayLoad: Payload{}}
JobQueue <- work
_, _ = w.Write([]byte(" Successful operation "))
}
func main() {
// Create... Through the scheduler worker, Monitor from JobQueue The task of
d := NewDispatcher(MaxWorker)
d.Run()
http.HandleFunc("/payload", payloadHandler)
log.Fatal(http.ListenAndServe(":8099", nil))
} The end result is two levels channel, The first level is to put user request data into chan Job in , This channel job Equivalent to the task queue to be processed .
The other level is used to store tasks that can be processed work Cache queue , The type is chan chan Job. The scheduler puts the pending tasks into an idle cache queue ,work Will always process its cache queue . In this way , Implemented a worker pool . Draw a picture to help understand
First of all, after we receive a request , establish Job Mission , Put it in the task queue and wait work Pool treatment .
func payloadHandler(w http.ResponseWriter, r *http.Request) {
work := Job{PayLoad: Payload{}}
JobQueue <- work
_, _ = w.Write([]byte(" Successful operation "))
} Scheduler initialization work Behind the pool , stay dispatch in , Once we receive JobQueue The task of , Just try to get a usable worker, Distribute tasks to worker Of job channel in . Note that this process is not synchronous , It's every time you get one job, Just turn on one G To deal with . This ensures JobQueue There's no need to block , Corresponding to JobQueue In theory, there's no need to block writing tasks .
func (d *Dispatcher) Run() {
// Began to run n individual worker
for i := 0; i < MaxWorker; i++ {
worker := NewWorker(d.WorkerPool)
worker.Start()
}
go d.dispatch()
}
func (d *Dispatcher) dispatch() {
for {
select {
case job := <-JobQueue:
go func(job Job) {
// Try to get an available worker job channel, Block until there's something available worker
jobChannel := <-d.WorkerPool
// Distribute tasks to worker job channel in
jobChannel <- job
}(job)
}
}
} here " Out of control " Of G It is different from the above . It is only blocked for a very short time Chan state , When there is free worker Awakened , Then distribute the task , The entire lifecycle is much shorter than the above operations .
Last , It is strongly recommended that you read the original , The original address is :http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/
边栏推荐
- I'm in Shenzhen. Where can I open an account? Is it safe to open an account online now?
- A fault record of misoperation dhclient script
- @Requestbody annotation
- 【直播回顾】战码先锋第七期:三方应用开发者如何为开源做贡献
- 如何优雅的写 Controller 层代码?
- Is it safe to apply for new bonds to open an account
- "Meng Hua Lu" is about to have a grand finale. It's better to learn it first than to look ahead!
- Is GF Securities reliable? Is it safe to open a securities account?
- 11+的基于甲基化组和转录组综合分析识别葡萄膜黑色素瘤中新的预后 DNA 甲基化特征~
- How to develop hospital information system (his) with SMS notification and voice function
猜你喜欢

Insurance app aging service evaluation analysis 2022 issue 06
Database migration tool flyway vs liquibase (II)

如何优雅的写 Controller 层代码?

《opencv学习笔记》-- 图像的载入和保存

Opencv learning notes - regions of interest (ROI) and image blending

How to write controller layer code gracefully?

《opencv学习笔记》-- 离散傅里叶变换

Tools and methods - use code formatting tools in source insight

电商红包雨是如何实现的?拿去面试用(典型高并发)

万名校园开发者花式玩AI,亮点看这张图就够啦!
随机推荐
Axi low power interface
Is it safe to open an account for how many new bonds you can apply for
PHP短信通知+语音播报自动双呼
The solution of distributed system: directory, message queue, transaction system and others
Using the collaboration database query of Poole in laravel5.6
夜晚读书 -- 关于微服务和容器
电商红包雨是如何实现的?拿去面试用(典型高并发)
【206】使用php语言去生成go语言的代码
d的10个0符
5分+的单基因泛癌纯生信思路!
How is the e-commerce red envelope realized? For interview (typical high concurrency)
嵌入式必学!硬件资源接口详解——基于ARM AM335X开发板 (上)
It's so difficult for me. Have you met these interview questions?
Influence of DEX optimization on arouter lookup path
Oxylabs live online: website capture demo
How stupid of me to hire a bunch of programmers who can only "Google"!
Is it safe to apply for new bonds to open an account
【Go语言刷题篇】Go从0到入门4:切片的高级用法、初级复习与Map入门学习
What are the low threshold financial products in 2022? Not much money
为什么虚拟机ping的通主机,主机ping不通虚拟机