当前位置:网站首页>Go collaboration and pipeline to realize asynchronous batch consumption scheduling task
Go collaboration and pipeline to realize asynchronous batch consumption scheduling task
2022-06-24 17:17:00 【pooky】
Weekend. , I had a problem this week and didn't figure it out , Do it over the weekend
The topic is a bit tongue twisty It is not common to do this in real projects , There are many knowledge points involved , After finishing, I will learn .
Program requirements :
1: Receive tasks , Monitor and receive the latest task messages from the asynchronous message queue
2: Processing tasks , Each task may take varying amounts of time
Our routine is to start a long monitor serialization to execute one by one , If you can't, just drive more , This kind of manual intervention is relatively heavy , Sometimes it's not good to stare .
Program plan :
1: Accept messages asynchronously , Start a process to receive messages This does not need to open more to receive messages and will not become a bottleneck
2: Processing messages asynchronously , Enable the coroutine to process the corresponding message asynchronously , It should be noted here that a processing process is started for a message Or multiple messages open , It's worth thinking about .
3: Batch processing , Multiple messages start a processing process , Prevent opening too many processes ,
4: timeout handler , If the quantity limit of batch processing is not reached for a long time , Then we should deal with it in time
5: Limit too many coroutines , This is not really necessary , because go The so-called million process performance, but now that we have this example Then perfect it
Code implementation :
package main import ( "errors" "fmt" "math/rand" "runtime" "strconv" "time" ) var msgChanLimit = 20 // Message pipeline size limit var handleId int // Collaboration task customization id Used to distinguish View the corresponding task batch Every time I deal with ++ /* periodType Message generation time 1 1 One message per second will go to the timeout processing module ; 2 1 One message per millisecond It will go through the normal processing module ; 3 Random 0 Seconds to 1 second Simulate the situation that will be triggered */ var periodType = 3 var batchNum = 6 // Up to how much unprocessed information is stacked Perform a batch process var batchTime = 3 // In the under full message How long is it Perform a batch process var maxGoroutines = 10 // The maximum number of processes This actually doesn't make much sense But we still have one Boundary prevention // Receive a message func getMsg(msgChan chan string) { msgId := 0 for { // Loop to receive messages msgId++ msg := "Msg id:" + strconv.Itoa(msgId) + "; AddTime:" + time.Now().Format("2006-01-02 15:04:05") msgChan <- msg switch periodType { case 1: time.Sleep(1 * time.Second) // second case 2: time.Sleep(1 * time.Millisecond) // millisecond case 3: time.Sleep(time.Duration(rand.Intn(2000)) * time.Millisecond) // Random } } } // Process the message func handleMsg(model int, handleId int, msgSet []string, guard chan struct{}) { for i, v := range msgSet { fmt.Println("HandleId:", handleId, ";Model", model, ">>> Idx:"+strconv.Itoa(i)+";Content:"+v) time.Sleep(1500 * time.Millisecond) // Simulating the actual processing of messages is time-consuming The bigger the gap There will be more pending processing in so many backgrounds } <-guard // Release a limit } // Receive messages without blocking func unBlockRead(ch chan string) (msg string, err error) { select { case msg = <-ch: return msg, nil case <-time.After(time.Microsecond): return "", errors.New("nil") } } func main() { guard := make(chan struct{}, maxGoroutines) // The number of guard processes is limited msgChan := make(chan string, msgChanLimit) // Receive a message channel size go getMsg(msgChan) // Start receiving messages msgSet := make([]string, 0) // Temporarily store the received message set step := 0 // Second counter Corresponding second for { // Main logic processing Start to deal with if msg, err := unBlockRead(msgChan); err == nil { // Message received msgSet = append(msgSet, msg) if len(msgSet) == batchNum { // The processing quantity is reached handleId++ guard <- struct{}{} go handleMsg(1, handleId, msgSet, guard) // Handle the current msgSet msgSet = nil // Reset step = 0 } } else { if step > batchTime && len(msgSet) > 0 { // Timeout and not empty handleId++ guard <- struct{}{} go handleMsg(2, handleId, msgSet, guard) msgSet = nil // Reset step = 0 } else { step++ time.Sleep(1 * time.Second) // Take a second off step++ } } } // Suspend main process Prevent exit for { runtime.GC() } }
The whole code is like this , See the notes for details ( Hobby this thing the more the more)
Running effect
That's it , There are many small points involved , It's interesting
边栏推荐
- As for IOT safety, 20 CSOs from major manufacturers say
- Cloud native monitoring configuration self built alertmanager to realize alarm
- Prometheus deployment
- How important is it to document the project? I was chosen by the top 100 up leaders and stood up again
- Robot toolbox matlab robotics toolbox
- Five steps to effectively monitor network traffic
- Solution to the problem that qlineedit setting qdoublevalidator setting range is invalid
- Analysis and introduction of NFT meta universe source code construction
- [kotlin] constructor summary
- FPGA systematic learning notes serialization_ Day10 [sequential logic, competitive adventure, synchronous reset, asynchronous reset]
猜你喜欢
[leetcode108] convert an ordered array into a binary search tree (medium order traversal)
MySQL learning -- table structure of SQL test questions
Why do you develop middleware when you are young? "You can choose your own way"
Daily algorithm & interview questions, 28 days of special training in large factories - the 15th day (string)
随机推荐
[2021 taac & Ti-One] frequently asked questions related to Ti-One products
test
H265/webvr video web page without plug-in player easyplayer Solution to the problem of cumulative delay of FLV video played by JS
Development analysis of main chain system
Elastic searchable snapshot function (frozen Tier 3)
[playing with Tencent cloud] a solution to the impassability of cross-border access to foreign websites using Tencent cloud CVM
How to perform concurrent stress testing on RTSP video streams distributed by audio and video streaming servers?
liver failure! My friend made a programming navigation website!
Cloud native monitoring practice (2) monitoring and collection of components outside the TKE cluster
H265 video streaming web page without plug-in player easywasmlayer Troubleshooting and solution of JS unable to set cover photo
How does the easynvr/easygbs live video platform use Wireshark to capture and analyze data locally?
[version upgrade] Tencent cloud firewall version 2.1.0 was officially released!
Cloud native monitoring via blackbox_ Exporter monitoring website
跟着Vam一起学习Typescript(第一期)
How to build RTSP test URL in Intranet Environment
[play with Tencent cloud] TSF User Guide
Can yangjianyun's new media operation in 2021 bear all the expectations of the enterprise's private domain traffic demand?
Edit distance (linear dp+ violence matching)
[go language development] start to develop Meitu station from 0 - Lesson 5 [receive pictures and upload]
[tke] nodelocaldnschache is used in IPVS forwarding mode