当前位置:网站首页>Go medium high parallel communication mode: the underlying principle of channel pipeline
Go medium high parallel communication mode: the underlying principle of channel pipeline
2022-07-23 18:11:00 【Hello, everyone. I'm a good classmate】
List of articles
1. channel Basic use of
- How to declare a pipeline ?
We can first use make(chan The type of data stored in the pipeline , Buffer size ) To make a statement , There are two main categories , One is the pipeline with buffer , One is the pipeline without buffer ( The pipeline without buffer can be declared by omitting the parameter of buffer size ), The specific declaration method is shown in the following code :
make(chan int) // No buffer
make(chan bool, 0) // Without buffer
make(chan string, 2) // Buffer zone (2 individual )
- Basic usage
Pipeline is actually a way of communication , Essentially used to send and receive data , Then the main usage of it is of course these two , The specific use is shown in the following code :
ch <- x // send data x
x =<- ch // Assign the data received in the pipeline to x
<-ch // Accept the data in the pipeline and discard
It should be noted that if it is a pipeline without buffer , It is impossible to plug data into the pipeline , Unless there is a running process waiting to receive this data , To put data into the pipeline without buffer .( The pipeline with buffer zone is like a warehouse on a road , This warehouse can be used to store things that need to be transferred , When there is no warehouse and there is no receiver across the road , This road ( This pipe ) It is not allowed to send goods ( data )).
2. Memory and communication
First of all, we should remember a sentence : Don't communicate through shared memory , Second, the memory should be shared through communication . Let's take a code example to show what is the way of using shared memory to communicate , as follows :
func watch(p *int) {
for true {
if *p == 1 {
fmt.Println(a...:"hello")
break
}
}
}
func main() {
i := 0
go watch(&i)
time.Sleep(time.Second)
i = 1
time.Sleep(time.Second)
}
// output:
// hello
The above code communicates through shared memory , Principal function direction watch Sent the address of the variable ,watch By constantly checking whether the variable of this address is 1 Then output and jump out of the loop .
So if you use pipes to communicate , What should the code look like ?
func watch(c chan int) {
if <-c == 1 {
fmt.Println("hello")
}
}
func main() {
c := make(chan int)
go watch(c)
time.Sleep(time.Second)
c <- 1
time.Sleep(time.Second)
}
// output:
// hello
We found that we used a pipeline without buffer , And because pipeline is a communication model , When there is no data , The pipeline is blocked , Therefore, we will find that we do not use dead loops in the above code . Obviously, using pipes avoids watch Function loops many times , Save system resources .
We have a general understanding of what is communication through shared memory , What is shared through communication , Give a general reason why communication is used to share memory :
- It can avoid the problems of collaboration competition and data conflict ( After all, shared memory makes multiple coroutines read the same piece of data , Using shared memory communication must be locked , In a sense, pipes can be regarded as unlocked )
- A higher level of abstraction , Reduce the difficulty of development , Increase program readability ( As in our example , The way of communication avoids polling )
- It is easier to decouple between modules , Enhance scalability and maintainability ( Shared memory is obviously not suitable for distributed environments , Because shared memory requires at least the same hard disk )
3. channel Bottom design
chanel As a communication pipeline , We also have a basic understanding of its functions in the above , He needs to send and receive data , Especially if this is a pipeline with buffer , It first needs a buffer for storing data , in addition to , When channel When the buffer is full , If a sender still sends data like it , He needs a queue to store the sending process , The same applies to the receiver . so , As a whole , One channel Three large pieces are needed : Intermediate buffer 、 Send waiting queue and accept waiting queue .
These are just channel A rough form , stay go The bottom of the ,channel In essence, it is called hchan The structure of the body , The specific contents are as follows :


The first few data are actually a ring cache queue , This ring cache can greatly reduce the overhead of reclaiming memory . Let's take it out and understand , As shown in the figure below .

| Member variables | meaning |
|---|---|
| qcount | The number of data stored in the ring queue |
| dataqsiz | The capacity of the ring queue |
| buf | Point to a circular queue |
| elemsize | The size of each data |
| elemtype | Type of data |
In addition, there are several remaining structures , We find the relevant source code as follows , actually waitq type ( Of this type elem Is a pointer to a variable used to receive data ) Two variables of recvq and sendq They are all linked lists , Functionally, it is a queue for storing the processes ,sendx and recvx It refers to the current working to sending / Accept the number of the linked list , It's a cursor .
There is also a key member mutex, This mutex is not used for queued sending / Receive data , It's used to protect hchan All members in the structure , All processes want to operate hchan All of them need to be locked . therefore channel It's not essentially unlocked , In terms of its own composition . But because of channel The lock is only in the external process hchan It can only be used after inserting data or taking data , So even if you use channel It can also realize high concurrency communication , In this sense, we can also say channel It's unlocked .
in addition to , I don't know whether the readers have found it , There is also a member closed, He is channel A status value of ,0 It means open ,1 It means closing .
That's all channel All members of , Let's see a summary chart as follows :

4. channel How to send data
At the beginning of the article, we use channel Demonstrate transmitting and sending data , In this part, let's know channel How to send data , When using the code, we directly use c<- Keywords to send data , It's in go Chinese is actually a grammatical sugar , In the compilation phase , The compiler will put c<- Turn into runtime.chansend1(), If you go to see go The source code , You'll find this chansend1 Actually, go back and call chansend Method .
We can channel The situations of sending are divided into the following :
- Direct transmission
- Put into cache
- Sleep and wait
Let's discuss these situations at different points .
- Direct transmission
Before sending the data , At this time, there is a collaboration process waiting for introduction . In fact, at this time, the process waiting to receive data in the pipeline has not detected the arrival of data , At this time, the process will sleep and wait . At this time, the ring cache in the structure has no data , At this time, the collaboration waiting for data will be put into the corresponding hchan Go to the receiving queue in the structure , If there is data coming at this time, take out a waiting process from the receiving queue of the pipeline structure , Directly copy it to the receiving variable of the coroutine ( No need to put data into ring buffer ), Wake up the process .
- Put into cache
At this time, there is no sleep waiting process , There is also cache space in the ring cache in the pipe structure , If there is data reception at this time, put the data directly into the cache . When implementing, first get the cache address that can be stored in the ring queue , Then store the data in this address , Maintain relevant indexes at the same time . That's why we say channel Is the reason why there is no lock , As long as there is space in the cache , There will be no blocking .
- Sleep and wait
At this time, there is no process sleep waiting , And the ring queue cache is full , At this time, the sending process enters the sending queue , Sleep and wait . In terms of implementation , Send the process to package yourself as a sudog, Then put it into the sending queue , Finally, sleep and hchan Unlock the mutex in .
5. channel How to receive data
and channal Sending data is very similar , For receiving data <-c In fact, it is also a grammar sugar , In the compilation phase i<-c Will translate into runtime.chanrecv1(), in addition to , One more ok Parameters can be used to receive data in the pipeline , It can be used to judge whether data is received from the pipeline , such i,ok -< c It will be converted to runtime.chanrecv2(). Eventually both will call chanrecv().
We can channel There are four ways to receive data :
- There is a waiting process , from coroutines receive
- There is a waiting process , from cache receive
- Receive cache
- Blocking reception
Let's take a look at the following points :
- There is a waiting process , Receive from the process
Before data acceptance , There is already a collaboration waiting to be sent in sleep , And this channe There is no cache , At this time, the receiving process copies the data directly from the process and wakes up the process . In terms of implementation , The receiving process needs to determine whether there is a process waiting in the sending queue ( Get into recv Method ) And whether there is data in the ring cache , If both are satisfied, the data in the process in the transmission queue will be taken out and awakened .
- There is a waiting process , Receive from cache
Before accepting data , There is already a collaboration waiting to be sent in sleep , And now channel There is a cache in the ring queue of , Then the receiving process fetches a data from the ring queue buffer , Put the data of the sending process of the dormant process into the cache and wake up the sending process . In terms of implementation , The receiving process first determines whether there is a process waiting in the sending queue , If there is, enter recv In the method , Then judge Channel Is there a cache , If so, take a data from the cache , And put the data of the sending process into the cache and wake up the process .
- Receive cache
When there is no coordination process waiting in sleep , But there is content in the ring queue buffer , Just take the data from the cache . In terms of implementation, it is to determine whether there is no collaboration waiting in the send queue and at this time channel Cache has content , Just get a data directly from the cache .
- Blocking reception
At this time, there is no coordination process waiting in sleep , And there's no cache , The receiving process enters the receiving queue and waits for receiving data . In terms of implementation, it is to determine whether there is no process waiting in the send queue and there is no data in the cache at this time , If you receive the process, package yourself as sudog, Put yourself in the waiting queue for acceptance , And sleep .
边栏推荐
- Use of computed in projects
- CSDN custom T-shirts are waiting for you to get, and the benefits of new programmer are coming!
- An online frequent fullgc troubleshooting
- 分析一个 .NET 写的 某 RFID标签系统 CPU暴涨
- 网页基础模版
- Do you still have certificates to participate in the open source community?
- ctf MISC 学习总结「建议收藏」
- TP & smart use diary
- MySQL六十六问,两万字+五十图详解含(答案解析)
- 封玩家IP和机器码以及解开被封的教程
猜你喜欢

(十一)STM32——IO引脚复用与映射

leetcode:剑指 Offer II 115. 重建序列【图论思维 + 入度考虑 + 拓扑排序】

rust求数组中最大值

Seata

MYSQL基础及性能优化

Seal player IP and machine code and unlock the blocked tutorial

Type-C to OTG (USB2.0 data transmission) + PD charging protocol chip ledrui ldr6028/ldr6023ss

MySQL 7 kinds of join (Figure)

CSDN custom T-shirts are waiting for you to get, and the benefits of new programmer are coming!

Multithreaded programming
随机推荐
为什么香港服务器可以免备案
MYSQL基础及性能优化
微服务雪崩问题及解决方案
参与开源社区还有证书拿?
Start multiple redis instances on a Linux machine
MySQL operation
能与PowerDesigner媲美的数据库建模工具PDMan[通俗易懂]
Solution to connection rejected error in idea download sources
Why can Hong Kong servers be exempted from filing
CSDN custom T-shirts are waiting for you to get, and the benefits of new programmer are coming!
(十一)STM32——IO引脚复用与映射
Keras II classification problem
传奇架设 GEE引擎教程 配置微端
leetcode:剑指 Offer II 115. 重建序列【图论思维 + 入度考虑 + 拓扑排序】
MySQL uses commands to export and import in Windows
LeetCode_ Dynamic programming_ Medium_ 120. Triangle minimum path sum
Detailed explanation of curl command [easy to understand]
rust求两数之和
Type-C to OTG (USB2.0 data transmission) + PD charging protocol chip ledrui ldr6028/ldr6023ss
前置放大器和功率放大器有什么区别?