当前位置:网站首页>Go collaboration and pipeline to realize asynchronous batch consumption scheduling task
Go collaboration and pipeline to realize asynchronous batch consumption scheduling task
2022-06-24 17:17:00 【pooky】
Weekend. , I had a problem this week and didn't figure it out , Do it over the weekend
The topic is a bit tongue twisty It is not common to do this in real projects , There are many knowledge points involved , After finishing, I will learn .
Program requirements :
1: Receive tasks , Monitor and receive the latest task messages from the asynchronous message queue
2: Processing tasks , Each task may take varying amounts of time
Our routine is to start a long monitor serialization to execute one by one , If you can't, just drive more , This kind of manual intervention is relatively heavy , Sometimes it's not good to stare .
Program plan :
1: Accept messages asynchronously , Start a process to receive messages This does not need to open more to receive messages and will not become a bottleneck
2: Processing messages asynchronously , Enable the coroutine to process the corresponding message asynchronously , It should be noted here that a processing process is started for a message Or multiple messages open , It's worth thinking about .
3: Batch processing , Multiple messages start a processing process , Prevent opening too many processes ,
4: timeout handler , If the quantity limit of batch processing is not reached for a long time , Then we should deal with it in time
5: Limit too many coroutines , This is not really necessary , because go The so-called million process performance, but now that we have this example Then perfect it
Code implementation :
package main
import (
"errors"
"fmt"
"math/rand"
"runtime"
"strconv"
"time"
)
var msgChanLimit = 20 // Message pipeline size limit
var handleId int // Collaboration task customization id Used to distinguish View the corresponding task batch Every time I deal with ++
/*
periodType Message generation time
1 1 One message per second will go to the timeout processing module ;
2 1 One message per millisecond It will go through the normal processing module ;
3 Random 0 Seconds to 1 second Simulate the situation that will be triggered
*/
var periodType = 3
var batchNum = 6 // Up to how much unprocessed information is stacked Perform a batch process
var batchTime = 3 // In the under full message How long is it Perform a batch process
var maxGoroutines = 10 // The maximum number of processes This actually doesn't make much sense But we still have one Boundary prevention
// Receive a message
func getMsg(msgChan chan string) {
msgId := 0
for { // Loop to receive messages
msgId++
msg := "Msg id:" + strconv.Itoa(msgId) + "; AddTime:" + time.Now().Format("2006-01-02 15:04:05")
msgChan <- msg
switch periodType {
case 1:
time.Sleep(1 * time.Second) // second
case 2:
time.Sleep(1 * time.Millisecond) // millisecond
case 3:
time.Sleep(time.Duration(rand.Intn(2000)) * time.Millisecond) // Random
}
}
}
// Process the message
func handleMsg(model int, handleId int, msgSet []string, guard chan struct{}) {
for i, v := range msgSet {
fmt.Println("HandleId:", handleId, ";Model", model, ">>> Idx:"+strconv.Itoa(i)+";Content:"+v)
time.Sleep(1500 * time.Millisecond) // Simulating the actual processing of messages is time-consuming The bigger the gap There will be more pending processing in so many backgrounds
}
<-guard // Release a limit
}
// Receive messages without blocking
func unBlockRead(ch chan string) (msg string, err error) {
select {
case msg = <-ch:
return msg, nil
case <-time.After(time.Microsecond):
return "", errors.New("nil")
}
}
func main() {
guard := make(chan struct{}, maxGoroutines) // The number of guard processes is limited
msgChan := make(chan string, msgChanLimit) // Receive a message channel size
go getMsg(msgChan) // Start receiving messages
msgSet := make([]string, 0) // Temporarily store the received message set
step := 0 // Second counter Corresponding second
for { // Main logic processing Start to deal with
if msg, err := unBlockRead(msgChan); err == nil { // Message received
msgSet = append(msgSet, msg)
if len(msgSet) == batchNum { // The processing quantity is reached
handleId++
guard <- struct{}{}
go handleMsg(1, handleId, msgSet, guard) // Handle the current msgSet
msgSet = nil // Reset
step = 0
}
} else {
if step > batchTime && len(msgSet) > 0 { // Timeout and not empty
handleId++
guard <- struct{}{}
go handleMsg(2, handleId, msgSet, guard)
msgSet = nil // Reset
step = 0
} else {
step++
time.Sleep(1 * time.Second) // Take a second off step++
}
}
}
// Suspend main process Prevent exit
for {
runtime.GC()
}
}
The whole code is like this , See the notes for details ( Hobby this thing the more the more)
Running effect
That's it , There are many small points involved , It's interesting
边栏推荐
- Zblog determines whether a plug-in installs the enabled built-in function code
- Daily algorithm & interview questions, 28 days of special training in large factories - the 15th day (string)
- 实现TypeScript运行时类型检查
- Building a cross public chain platform to solve DAPP development problems
- 5g brings opportunities and challenges. Are you ready to defend against DDoS?
- Activeindex selection and redirection in the menu bar on the right of easycvs
- Devops in digital transformation digital risk
- zblog判断某个插件是否安装启用的内置函数代码
- API documents are simple and beautiful. It only needs three steps to open
- After the collective breakthrough, where is the next step of China's public cloud?
猜你喜欢

Why do you develop middleware when you are young? "You can choose your own way"

Daily algorithm & interview questions, 28 days of special training in large factories - the 15th day (string)

MySQL learning -- table structure of SQL test questions
![[leetcode108] convert an ordered array into a binary search tree (medium order traversal)](/img/e1/0fac59a531040d74fd7531e2840eb5.jpg)
[leetcode108] convert an ordered array into a binary search tree (medium order traversal)
随机推荐
Example description and case of ansible playbook automated cluster server management
Cloud native monitoring practice (2) monitoring and collection of components outside the TKE cluster
TRCT test cloud + article online speed
Research on clock synchronization performance monitoring system based on 1588v2 Technology
A tutorial on how the zblog system obtains user related information based on user ID
Tencent cloud database mysql:sql flow restriction
Low education without food? As an old Android rookie in the past six years, I was the most difficult one
After the collective breakthrough, where is the next step of China's public cloud?
Issue 003 how to detect whether a sticky positioned element is in a pinned state
NFT元宇宙源码搭建解析与介绍
"Competition" and "opportunity" hidden in security operation in the cloud Era
FPGA project development: experience sharing of lmk04821 chip project development based on jesd204b (I)
Future banks need to think about today's structure with tomorrow's thinking
Tencent security officially released the IOT security capability map
How to troubleshoot and solve the problem that the ultra-low delay security live broadcast system webrtc client plays no audio in the browser?
Customizing security groups using BPF
CentOS 7 installing SQL server2017 (Linux)
In those years, I insisted on learning the motivation of programming
TCB series learning articles - using redis extension in cloud functions
Edit distance (linear dp+ violence matching)