当前位置:网站首页>Go medium high parallel communication mode: the underlying principle of channel pipeline
Go medium high parallel communication mode: the underlying principle of channel pipeline
2022-07-23 18:11:00 【Hello, everyone. I'm a good classmate】
List of articles
1. channel Basic use of
- How to declare a pipeline ?
We can first use make(chan The type of data stored in the pipeline , Buffer size ) To make a statement , There are two main categories , One is the pipeline with buffer , One is the pipeline without buffer ( The pipeline without buffer can be declared by omitting the parameter of buffer size ), The specific declaration method is shown in the following code :
make(chan int) // No buffer
make(chan bool, 0) // Without buffer
make(chan string, 2) // Buffer zone (2 individual )
- Basic usage
Pipeline is actually a way of communication , Essentially used to send and receive data , Then the main usage of it is of course these two , The specific use is shown in the following code :
ch <- x // send data x
x =<- ch // Assign the data received in the pipeline to x
<-ch // Accept the data in the pipeline and discard
It should be noted that if it is a pipeline without buffer , It is impossible to plug data into the pipeline , Unless there is a running process waiting to receive this data , To put data into the pipeline without buffer .( The pipeline with buffer zone is like a warehouse on a road , This warehouse can be used to store things that need to be transferred , When there is no warehouse and there is no receiver across the road , This road ( This pipe ) It is not allowed to send goods ( data )).
2. Memory and communication
First of all, we should remember a sentence : Don't communicate through shared memory , Second, the memory should be shared through communication . Let's take a code example to show what is the way of using shared memory to communicate , as follows :
func watch(p *int) {
for true {
if *p == 1 {
fmt.Println(a...:"hello")
break
}
}
}
func main() {
i := 0
go watch(&i)
time.Sleep(time.Second)
i = 1
time.Sleep(time.Second)
}
// output:
// hello
The above code communicates through shared memory , Principal function direction watch Sent the address of the variable ,watch By constantly checking whether the variable of this address is 1 Then output and jump out of the loop .
So if you use pipes to communicate , What should the code look like ?
func watch(c chan int) {
if <-c == 1 {
fmt.Println("hello")
}
}
func main() {
c := make(chan int)
go watch(c)
time.Sleep(time.Second)
c <- 1
time.Sleep(time.Second)
}
// output:
// hello
We found that we used a pipeline without buffer , And because pipeline is a communication model , When there is no data , The pipeline is blocked , Therefore, we will find that we do not use dead loops in the above code . Obviously, using pipes avoids watch Function loops many times , Save system resources .
We have a general understanding of what is communication through shared memory , What is shared through communication , Give a general reason why communication is used to share memory :
- It can avoid the problems of collaboration competition and data conflict ( After all, shared memory makes multiple coroutines read the same piece of data , Using shared memory communication must be locked , In a sense, pipes can be regarded as unlocked )
- A higher level of abstraction , Reduce the difficulty of development , Increase program readability ( As in our example , The way of communication avoids polling )
- It is easier to decouple between modules , Enhance scalability and maintainability ( Shared memory is obviously not suitable for distributed environments , Because shared memory requires at least the same hard disk )
3. channel Bottom design
chanel As a communication pipeline , We also have a basic understanding of its functions in the above , He needs to send and receive data , Especially if this is a pipeline with buffer , It first needs a buffer for storing data , in addition to , When channel When the buffer is full , If a sender still sends data like it , He needs a queue to store the sending process , The same applies to the receiver . so , As a whole , One channel Three large pieces are needed : Intermediate buffer 、 Send waiting queue and accept waiting queue .
These are just channel A rough form , stay go The bottom of the ,channel In essence, it is called hchan The structure of the body , The specific contents are as follows :


The first few data are actually a ring cache queue , This ring cache can greatly reduce the overhead of reclaiming memory . Let's take it out and understand , As shown in the figure below .

| Member variables | meaning |
|---|---|
| qcount | The number of data stored in the ring queue |
| dataqsiz | The capacity of the ring queue |
| buf | Point to a circular queue |
| elemsize | The size of each data |
| elemtype | Type of data |
In addition, there are several remaining structures , We find the relevant source code as follows , actually waitq type ( Of this type elem Is a pointer to a variable used to receive data ) Two variables of recvq and sendq They are all linked lists , Functionally, it is a queue for storing the processes ,sendx and recvx It refers to the current working to sending / Accept the number of the linked list , It's a cursor .
There is also a key member mutex, This mutex is not used for queued sending / Receive data , It's used to protect hchan All members in the structure , All processes want to operate hchan All of them need to be locked . therefore channel It's not essentially unlocked , In terms of its own composition . But because of channel The lock is only in the external process hchan It can only be used after inserting data or taking data , So even if you use channel It can also realize high concurrency communication , In this sense, we can also say channel It's unlocked .
in addition to , I don't know whether the readers have found it , There is also a member closed, He is channel A status value of ,0 It means open ,1 It means closing .
That's all channel All members of , Let's see a summary chart as follows :

4. channel How to send data
At the beginning of the article, we use channel Demonstrate transmitting and sending data , In this part, let's know channel How to send data , When using the code, we directly use c<- Keywords to send data , It's in go Chinese is actually a grammatical sugar , In the compilation phase , The compiler will put c<- Turn into runtime.chansend1(), If you go to see go The source code , You'll find this chansend1 Actually, go back and call chansend Method .
We can channel The situations of sending are divided into the following :
- Direct transmission
- Put into cache
- Sleep and wait
Let's discuss these situations at different points .
- Direct transmission
Before sending the data , At this time, there is a collaboration process waiting for introduction . In fact, at this time, the process waiting to receive data in the pipeline has not detected the arrival of data , At this time, the process will sleep and wait . At this time, the ring cache in the structure has no data , At this time, the collaboration waiting for data will be put into the corresponding hchan Go to the receiving queue in the structure , If there is data coming at this time, take out a waiting process from the receiving queue of the pipeline structure , Directly copy it to the receiving variable of the coroutine ( No need to put data into ring buffer ), Wake up the process .
- Put into cache
At this time, there is no sleep waiting process , There is also cache space in the ring cache in the pipe structure , If there is data reception at this time, put the data directly into the cache . When implementing, first get the cache address that can be stored in the ring queue , Then store the data in this address , Maintain relevant indexes at the same time . That's why we say channel Is the reason why there is no lock , As long as there is space in the cache , There will be no blocking .
- Sleep and wait
At this time, there is no process sleep waiting , And the ring queue cache is full , At this time, the sending process enters the sending queue , Sleep and wait . In terms of implementation , Send the process to package yourself as a sudog, Then put it into the sending queue , Finally, sleep and hchan Unlock the mutex in .
5. channel How to receive data
and channal Sending data is very similar , For receiving data <-c In fact, it is also a grammar sugar , In the compilation phase i<-c Will translate into runtime.chanrecv1(), in addition to , One more ok Parameters can be used to receive data in the pipeline , It can be used to judge whether data is received from the pipeline , such i,ok -< c It will be converted to runtime.chanrecv2(). Eventually both will call chanrecv().
We can channel There are four ways to receive data :
- There is a waiting process , from coroutines receive
- There is a waiting process , from cache receive
- Receive cache
- Blocking reception
Let's take a look at the following points :
- There is a waiting process , Receive from the process
Before data acceptance , There is already a collaboration waiting to be sent in sleep , And this channe There is no cache , At this time, the receiving process copies the data directly from the process and wakes up the process . In terms of implementation , The receiving process needs to determine whether there is a process waiting in the sending queue ( Get into recv Method ) And whether there is data in the ring cache , If both are satisfied, the data in the process in the transmission queue will be taken out and awakened .
- There is a waiting process , Receive from cache
Before accepting data , There is already a collaboration waiting to be sent in sleep , And now channel There is a cache in the ring queue of , Then the receiving process fetches a data from the ring queue buffer , Put the data of the sending process of the dormant process into the cache and wake up the sending process . In terms of implementation , The receiving process first determines whether there is a process waiting in the sending queue , If there is, enter recv In the method , Then judge Channel Is there a cache , If so, take a data from the cache , And put the data of the sending process into the cache and wake up the process .
- Receive cache
When there is no coordination process waiting in sleep , But there is content in the ring queue buffer , Just take the data from the cache . In terms of implementation, it is to determine whether there is no collaboration waiting in the send queue and at this time channel Cache has content , Just get a data directly from the cache .
- Blocking reception
At this time, there is no coordination process waiting in sleep , And there's no cache , The receiving process enters the receiving queue and waits for receiving data . In terms of implementation, it is to determine whether there is no process waiting in the send queue and there is no data in the cache at this time , If you receive the process, package yourself as sudog, Put yourself in the waiting queue for acceptance , And sleep .
边栏推荐
- Phpstrom shortcut key
- docker安装redis并以配置文件方式启动
- IDEA这些既好用又好玩的三十多个宝贝插件你还不知道吗?「建议收藏」
- Calculus of variations
- “如今,99.9% 以上的代码都是垃圾!”
- 配置Gom引擎登录器出现错误提示:没有发现必备补丁文件!
- MongoDB分组取每组中一条数据
- MinGW-w64的安装及配置教程
- WARNING: Your password has expired. Password change required but no TTY available.
- Activity Registration: how to quickly start the open source tapdata live data platform on a zero basis?
猜你喜欢

Mutual certification of product compatibility between tapdata and Youxuan database

MySQL 7 kinds of join (Figure)

Salary high voltage line

网分测花岗岩介电常数测试方案

Trust counts the number of occurrences of words in the file

leetcode:剑指 Offer II 115. 重建序列【图论思维 + 入度考虑 + 拓扑排序】

Solutions to sap Hana database backup failure

New opportunities for cultural tourism in the era of digital intelligence? China Mobile Migu creates "the first island in the yuan universe"

Data crawling and display of e-commerce platform based on scratch

配置Gom引擎登錄器出現錯誤提示:沒有發現必備補丁文件!
随机推荐
MySQL massive write problem optimization scheme MySQL parameter tuning
Did not find the necessary patch file 'newopui.pak'?
几种运维工具的对比
c语言--通讯录的实现与ScreenToGif
Qt多线程实例与connect第五个参数[通俗易懂]
Use moment to get the date of the current day and the next day
一次线上频繁FullGC的排查
Sentinel 介绍与微服务整合
Keras之二分类问题
New opportunities for cultural tourism in the era of digital intelligence? China Mobile Migu creates "the first island in the yuan universe"
MySQL uses commands to export and import in Windows
Seata
Seal player IP and machine code and unlock the blocked tutorial
数字安全巨头Entrust披露六月遭到勒索软件团伙攻击
Data crawling and display of e-commerce platform based on scratch
能与PowerDesigner媲美的数据库建模工具PDMan[通俗易懂]
Distributed transaction solution
Microservice avalanche problems and Solutions
Keras II classification problem
Dyn keyword in rust