当前位置:网站首页>Go medium high parallel communication mode: the underlying principle of channel pipeline

Go medium high parallel communication mode: the underlying principle of channel pipeline

2022-07-23 18:11:00 Hello, everyone. I'm a good classmate


1. channel Basic use of

  1. How to declare a pipeline ?

We can first use make(chan The type of data stored in the pipeline , Buffer size ) To make a statement , There are two main categories , One is the pipeline with buffer , One is the pipeline without buffer ( The pipeline without buffer can be declared by omitting the parameter of buffer size ), The specific declaration method is shown in the following code :

make(chan int)   //  No buffer 
make(chan bool, 0) //  Without buffer 
make(chan string, 2) //  Buffer zone (2 individual )
  1. Basic usage

Pipeline is actually a way of communication , Essentially used to send and receive data , Then the main usage of it is of course these two , The specific use is shown in the following code :

ch <- x   //  send data x
x =<- ch  //  Assign the data received in the pipeline to x
<-ch      //  Accept the data in the pipeline and discard  

It should be noted that if it is a pipeline without buffer , It is impossible to plug data into the pipeline , Unless there is a running process waiting to receive this data , To put data into the pipeline without buffer .( The pipeline with buffer zone is like a warehouse on a road , This warehouse can be used to store things that need to be transferred , When there is no warehouse and there is no receiver across the road , This road ( This pipe ) It is not allowed to send goods ( data )).

2. Memory and communication

First of all, we should remember a sentence : Don't communicate through shared memory , Second, the memory should be shared through communication . Let's take a code example to show what is the way of using shared memory to communicate , as follows :

func watch(p *int) {
    
	for true {
    
		if *p == 1 {
    
			fmt.Println(a...:"hello")
			break
		}
	}
}

func main() {
    
	i := 0
	go watch(&i)
	time.Sleep(time.Second)
	
	i = 1
	time.Sleep(time.Second)
}

// output:
// hello

The above code communicates through shared memory , Principal function direction watch Sent the address of the variable ,watch By constantly checking whether the variable of this address is 1 Then output and jump out of the loop .

So if you use pipes to communicate , What should the code look like ?

func watch(c chan int) {
    if <-c == 1 {
            
		fmt.Println("hello")    
	}
}func main() {
        
	c := make(chan int)
	go watch(c)​    
	time.Sleep(time.Second)​    
	c <- 1​
	time.Sleep(time.Second)
}

// output:
// hello

We found that we used a pipeline without buffer , And because pipeline is a communication model , When there is no data , The pipeline is blocked , Therefore, we will find that we do not use dead loops in the above code . Obviously, using pipes avoids watch Function loops many times , Save system resources .

We have a general understanding of what is communication through shared memory , What is shared through communication , Give a general reason why communication is used to share memory :

  • It can avoid the problems of collaboration competition and data conflict ( After all, shared memory makes multiple coroutines read the same piece of data , Using shared memory communication must be locked , In a sense, pipes can be regarded as unlocked )
  • A higher level of abstraction , Reduce the difficulty of development , Increase program readability ( As in our example , The way of communication avoids polling )
  • It is easier to decouple between modules , Enhance scalability and maintainability ( Shared memory is obviously not suitable for distributed environments , Because shared memory requires at least the same hard disk )

3. channel Bottom design

chanel As a communication pipeline , We also have a basic understanding of its functions in the above , He needs to send and receive data , Especially if this is a pipeline with buffer , It first needs a buffer for storing data , in addition to , When channel When the buffer is full , If a sender still sends data like it , He needs a queue to store the sending process , The same applies to the receiver . so , As a whole , One channel Three large pieces are needed : Intermediate buffer 、 Send waiting queue and accept waiting queue .
 Insert picture description here

These are just channel A rough form , stay go The bottom of the ,channel In essence, it is called hchan The structure of the body , The specific contents are as follows :

 Insert picture description here
 Insert picture description here

The first few data are actually a ring cache queue , This ring cache can greatly reduce the overhead of reclaiming memory . Let's take it out and understand , As shown in the figure below .

 Insert picture description here

Member variables meaning
qcount The number of data stored in the ring queue
dataqsiz The capacity of the ring queue
buf Point to a circular queue
elemsize The size of each data
elemtype Type of data

In addition, there are several remaining structures , We find the relevant source code as follows , actually waitq type ( Of this type elem Is a pointer to a variable used to receive data ) Two variables of recvq and sendq They are all linked lists , Functionally, it is a queue for storing the processes ,sendx and recvx It refers to the current working to sending / Accept the number of the linked list , It's a cursor .
 Insert picture description here

There is also a key member mutex, This mutex is not used for queued sending / Receive data , It's used to protect hchan All members in the structure , All processes want to operate hchan All of them need to be locked . therefore channel It's not essentially unlocked , In terms of its own composition . But because of channel The lock is only in the external process hchan It can only be used after inserting data or taking data , So even if you use channel It can also realize high concurrency communication , In this sense, we can also say channel It's unlocked .

in addition to , I don't know whether the readers have found it , There is also a member closed, He is channel A status value of ,0 It means open ,1 It means closing .

That's all channel All members of , Let's see a summary chart as follows :

 Insert picture description here

4. channel How to send data

At the beginning of the article, we use channel Demonstrate transmitting and sending data , In this part, let's know channel How to send data , When using the code, we directly use c<- Keywords to send data , It's in go Chinese is actually a grammatical sugar , In the compilation phase , The compiler will put c<- Turn into runtime.chansend1(), If you go to see go The source code , You'll find this chansend1 Actually, go back and call chansend Method .

We can channel The situations of sending are divided into the following :

  • Direct transmission
  • Put into cache
  • Sleep and wait

Let's discuss these situations at different points .

  1. Direct transmission

Before sending the data , At this time, there is a collaboration process waiting for introduction . In fact, at this time, the process waiting to receive data in the pipeline has not detected the arrival of data , At this time, the process will sleep and wait . At this time, the ring cache in the structure has no data , At this time, the collaboration waiting for data will be put into the corresponding hchan Go to the receiving queue in the structure , If there is data coming at this time, take out a waiting process from the receiving queue of the pipeline structure , Directly copy it to the receiving variable of the coroutine ( No need to put data into ring buffer ), Wake up the process .

  1. Put into cache

At this time, there is no sleep waiting process , There is also cache space in the ring cache in the pipe structure , If there is data reception at this time, put the data directly into the cache . When implementing, first get the cache address that can be stored in the ring queue , Then store the data in this address , Maintain relevant indexes at the same time . That's why we say channel Is the reason why there is no lock , As long as there is space in the cache , There will be no blocking .

  1. Sleep and wait

At this time, there is no process sleep waiting , And the ring queue cache is full , At this time, the sending process enters the sending queue , Sleep and wait . In terms of implementation , Send the process to package yourself as a sudog, Then put it into the sending queue , Finally, sleep and hchan Unlock the mutex in .

5. channel How to receive data

and channal Sending data is very similar , For receiving data <-c In fact, it is also a grammar sugar , In the compilation phase i<-c Will translate into runtime.chanrecv1(), in addition to , One more ok Parameters can be used to receive data in the pipeline , It can be used to judge whether data is received from the pipeline , such i,ok -< c It will be converted to runtime.chanrecv2(). Eventually both will call chanrecv().

We can channel There are four ways to receive data :

  • There is a waiting process , from coroutines receive
  • There is a waiting process , from cache receive
  • Receive cache
  • Blocking reception

Let's take a look at the following points :

  1. There is a waiting process , Receive from the process

Before data acceptance , There is already a collaboration waiting to be sent in sleep , And this channe There is no cache , At this time, the receiving process copies the data directly from the process and wakes up the process . In terms of implementation , The receiving process needs to determine whether there is a process waiting in the sending queue ( Get into recv Method ) And whether there is data in the ring cache , If both are satisfied, the data in the process in the transmission queue will be taken out and awakened .

  1. There is a waiting process , Receive from cache

Before accepting data , There is already a collaboration waiting to be sent in sleep , And now channel There is a cache in the ring queue of , Then the receiving process fetches a data from the ring queue buffer , Put the data of the sending process of the dormant process into the cache and wake up the sending process . In terms of implementation , The receiving process first determines whether there is a process waiting in the sending queue , If there is, enter recv In the method , Then judge Channel Is there a cache , If so, take a data from the cache , And put the data of the sending process into the cache and wake up the process .

  1. Receive cache

When there is no coordination process waiting in sleep , But there is content in the ring queue buffer , Just take the data from the cache . In terms of implementation, it is to determine whether there is no collaboration waiting in the send queue and at this time channel Cache has content , Just get a data directly from the cache .

  1. Blocking reception

At this time, there is no coordination process waiting in sleep , And there's no cache , The receiving process enters the receiving queue and waits for receiving data . In terms of implementation, it is to determine whether there is no process waiting in the send queue and there is no data in the cache at this time , If you receive the process, package yourself as sudog, Put yourself in the waiting queue for acceptance , And sleep .

原网站

版权声明
本文为[Hello, everyone. I'm a good classmate]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/204/202207231539284241.html