当前位置:网站首页>Go collaboration and pipeline to realize asynchronous batch consumption scheduling task

Go collaboration and pipeline to realize asynchronous batch consumption scheduling task

2022-06-24 17:17:00 pooky

Weekend. , I had a problem this week and didn't figure it out , Do it over the weekend

The topic is a bit tongue twisty It is not common to do this in real projects , There are many knowledge points involved , After finishing, I will learn .

Program requirements :

1: Receive tasks , Monitor and receive the latest task messages from the asynchronous message queue

2: Processing tasks , Each task may take varying amounts of time

Our routine is to start a long monitor serialization to execute one by one , If you can't, just drive more , This kind of manual intervention is relatively heavy , Sometimes it's not good to stare .

Program plan :

1: Accept messages asynchronously , Start a process to receive messages This does not need to open more to receive messages and will not become a bottleneck

2: Processing messages asynchronously , Enable the coroutine to process the corresponding message asynchronously , It should be noted here that a processing process is started for a message Or multiple messages open , It's worth thinking about .

3: Batch processing , Multiple messages start a processing process , Prevent opening too many processes ,

4: timeout handler , If the quantity limit of batch processing is not reached for a long time , Then we should deal with it in time

5: Limit too many coroutines , This is not really necessary , because go The so-called million process performance, but now that we have this example Then perfect it

Code implementation :

package main

import (
	"errors"
	"fmt"
	"math/rand"
	"runtime"
	"strconv"
	"time"
)

var msgChanLimit = 20 // Message pipeline size limit 
var handleId int      // Collaboration task customization id  Used to distinguish   View the corresponding task batch   Every time I deal with  ++
/*
periodType  Message generation time 
	1 1 One message per second will go to the timeout processing module ;
	2 1 One message per millisecond   It will go through the normal processing module ;
	3  Random 0 Seconds to  1 second   Simulate the situation that will be triggered 
*/
var periodType = 3

var batchNum = 6  //  Up to how much unprocessed information is stacked   Perform a batch process 
var batchTime = 3 //  In the under full message   How long is it   Perform a batch process 

var maxGoroutines = 10 // The maximum number of processes   This actually doesn't make much sense    But we still have one    Boundary prevention 

// Receive a message 
func getMsg(msgChan chan string) {
	msgId := 0
	for { // Loop to receive messages 
		msgId++
		msg := "Msg id:" + strconv.Itoa(msgId) + "; AddTime:" + time.Now().Format("2006-01-02 15:04:05")
		msgChan <- msg
		switch periodType {
		case 1:
			time.Sleep(1 * time.Second) //  second 
		case 2:
			time.Sleep(1 * time.Millisecond) //  millisecond 
		case 3:
			time.Sleep(time.Duration(rand.Intn(2000)) * time.Millisecond) //  Random 
		}
	}
}

//  Process the message 
func handleMsg(model int, handleId int, msgSet []string, guard chan struct{}) {
	for i, v := range msgSet {
		fmt.Println("HandleId:", handleId, ";Model", model, ">>> Idx:"+strconv.Itoa(i)+";Content:"+v)
		time.Sleep(1500 * time.Millisecond) // Simulating the actual processing of messages is time-consuming   The bigger the gap   There will be more pending processing in so many backgrounds 
	}
	<-guard // Release a limit 

}

// Receive messages without blocking 
func unBlockRead(ch chan string) (msg string, err error) {
	select {
	case msg = <-ch:
		return msg, nil
	case <-time.After(time.Microsecond):
		return "", errors.New("nil")
	}
}

func main() {
	guard := make(chan struct{}, maxGoroutines) // The number of guard processes is limited 
	msgChan := make(chan string, msgChanLimit)  // Receive a message channel size 
	go getMsg(msgChan)                          //  Start receiving messages 
	msgSet := make([]string, 0)                 //  Temporarily store the received message set 
	step := 0                                   // Second counter   Corresponding second 

	for { // Main logic processing   Start to deal with 
		if msg, err := unBlockRead(msgChan); err == nil { // Message received 
			msgSet = append(msgSet, msg)
			if len(msgSet) == batchNum { // The processing quantity is reached 
				handleId++
				guard <- struct{}{}
				go handleMsg(1, handleId, msgSet, guard) //  Handle the current msgSet
				msgSet = nil                             // Reset 
				step = 0
			}
		} else {
			if step > batchTime && len(msgSet) > 0 { //  Timeout and not empty 
				handleId++
				guard <- struct{}{}
				go handleMsg(2, handleId, msgSet, guard)
				msgSet = nil // Reset 
				step = 0
			} else {
				step++
				time.Sleep(1 * time.Second) // Take a second off  step++
			}
		}
	}

	//  Suspend main process   Prevent exit 
	for {
		runtime.GC()
	}
}

The whole code is like this , See the notes for details ( Hobby this thing the more the more)

Running effect

That's it , There are many small points involved , It's interesting

原网站

版权声明
本文为[pooky]所创,转载请带上原文链接,感谢
https://yzsam.com/2021/03/20210328173037159a.html

随机推荐