当前位置:网站首页>Golang--channel and select

Golang--channel and select

2022-06-26 02:40:00 It artist rookie

channal principle 1
channal principle 2
channal principle 3

Why? channel Can be in different g Send a message in the , For users, there is no need to worry about concurrency ?

In fact, that is hchan Mutex lock is used internally to ensure concurrency security

ch := make(chan struct{
    })

Back to hchan Pointer to type ; Create a channel In essence, you get a runtime.hchan The pointer to , Follow up on this chan The operation of , Nothing more than to perform corresponding operations on structure fields .

chan The essence of hchan

type hchan struct {
    
	qcount   uint           //  The number of current elements in the queue 
	dataqsiz uint           //  The capacity of the queue  , It is immutable ( Never write after the channel is created ), It is therefore safe to read it at any time during channel operation .
	buf      unsafe.Pointer //  The pointing length is  dataqsiz  The underlying array of , Only if  channel  It makes sense to be buffered .
	elemsize uint16 // Size of each object in the queue 
	closed   uint32 //channal Whether to shut down ==>  be equal to 0 Is not closed 
	elemtype *_type //  Types of elements in the queue 
	sendx    uint   //  The index position of the sent element in the circular queue .
	recvx    uint   //  The index position of the received element in the circular queue .
	recvq    waitq  //  Of the recipient  sudog  Waiting in line ( Blocking waiting when buffer is insufficient  goroutine).
	sendq    waitq  //  The sender's  sudog  Waiting in line .
	lock mutex // The mutex 
}
type waitq struct {
    
	first *sudog
	last  *sudog
}

sudog yes Go Language is used to store the process whose status is blocked goroutine Two way linked list abstraction , You can directly understand it as a waiting goroutine That's all right. .
sudog It's a runtime structure , Its main function is to represent a... In the waiting list Goroutine, It stores information about this blocking and two points to the front and back respectively sudog The pointer .

type sudog struct {
    
	// The following fields are protected by the hchan.lock of the
	// channel this sudog is blocking on. shrinkstack depends on
	// this for sudogs involved in channel ops.

	g *g // Point to the current  goroutine.

	next *sudog // Point to next  g.
	prev *sudog // Point to previous  g.
	elem unsafe.Pointer // data element (may point to stack)

	// The following fields are never accessed concurrently.
	// For channels, waitlink is only accessed by g.
	// For semaphores, all fields (including the ones above)
	// are only accessed when holding a semaRoot lock.

	acquiretime int64
	releasetime int64
	ticket      uint32

	// isSelect indicates g is participating in a select, so
	// g.selectDone must be CAS'd to win the wake-up race.
	isSelect bool

	// success indicates whether communication over channel c
	// succeeded. It is true if the goroutine was awoken because a
	// value was delivered over channel c, and false if awoken
	// because c was closed.
	success bool

	parent   *sudog // semaRoot binary tree
	waitlink *sudog // g.waiting list or semaRoot
	waittail *sudog // semaRoot
	c        *hchan // channel
}

makechan

func makechan(t *chantype, size int) *hchan {
    
	elem := t.elem

	// compiler checks this but be safe.
	if elem.size >= 1<<16 {
    
		throw("makechan: invalid channel element type")
	}
	if hchanSize%maxAlign != 0 || elem.align > maxAlign {
    
		throw("makechan: bad alignment")
	}

	mem, overflow := math.MulUintptr(elem.size, uintptr(size))
	if overflow || mem > maxAlloc-hchanSize || size < 0 {
    
		panic(plainError("makechan: size out of range"))
	}

	var c *hchan
	switch {
    
	case mem == 0: // The size of the queue or element is  0  Under the circumstances , Will call  mallocgc  Method to allocate a continuous memory space .
		c = (*hchan)(mallocgc(hchanSize, nil, true))
		// Race detector uses this location for synchronization.
		c.buf = c.raceaddr()
	case elem.ptrdata == 0:  // At present  channel  The stored element does not have a pointer reference , Together with  hchan  Allocate a continuous memory space with the underlying array at the same time .
		c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
		c.buf = add(unsafe.Pointer(c), hchanSize)
	default: // Default current  channel  Stored elements have pointer references ------ The default allocation matches the contiguous memory space .
		// Elements contain pointers.
		c = new(hchan)
		c.buf = mallocgc(mem, elem, true)
	}

	c.elemsize = uint16(elem.size)
	c.elemtype = elem
	c.dataqsiz = uint(size)
	lockInit(&c.lock, lockRankHchan)

	if debugChan {
    
		print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
	}
	return c
}
// channel  The creation of is called  mallocgc  Method , That is to say  channel  It's all created on the heap . therefore  channel  Yes, it will be  GC  The recycling , Naturally, it is not always necessary  close  Method to display and close .

chansend send data

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    
	if c == nil {
     // In limine  chansend  Method will first judge the current  channel  Is it  nil. if  nil, Logically speaking, it means to  nil channel  send data , Will call  gopark  Method makes the current  Goroutine  Sleep , A deadlock crash occurs , Appearance is appearance  panic  Event to quickly fail .
		if !block {
    
			return false
		}
		gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}

	if debugChan {
    
		print("chansend: chan=", c, "\n")
	}

	if raceenabled {
    
		racereadpc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(chansend))
	}

// And then there's the non blocking  channel  Make an upper limit judgment , See if it fails quickly .
// The failure scenario is as follows :
// If not blocked and not closed , At the same time, the underlying data  dataqsiz  The size is  0( Buffer has no element ), Will return a failure ..
// if  qcount  And  dataqsiz  Same size ( Buffer full ) when , Will return a failure .
	if !block && c.closed == 0 && full(c) {
    
		return false
	}

	var t0 int64
	if blockprofilerate > 0 {
    
		t0 = cputicks()
	}
// When above channel  The prepositional judgment of . At the completion of the  channel  After the pre judgment of , Before entering the processing of sending data ,channel  Will be locked 
	lock(&c.lock)
// Before the official start of sending , After lock up , Would be right  channel  Make a state judgment ( Whether to shut down ):
	if c.closed != 0 {
    
		unlock(&c.lock)
		panic(plainError("send on closed channel"))
	}
// Direct transmission ===> At present  channel  There are blocking waiting recipients , Then you just need to send it directly .
	if sg := c.recvq.dequeue(); sg != nil {
     // adopt  dequeue  from  recvq  The first to fall into wait  Goroutine  And send it data directly 
		send(c, sg, ep, func() {
     unlock(&c.lock) }, 3)
		return true
	}
// Buffered transmission ===> Indirect transmission , Judge  channel  Whether there is still space in the buffer :
	if c.qcount < c.dataqsiz {
    
		// Space is available in the channel buffer. Enqueue the element to send.
		// call  chanbuf  Method , In this way, the data in the underlying buffer is obtained  sendx  The element pointer value of the index .
		qp := chanbuf(c, c.sendx)
		if raceenabled {
    
			racenotify(c, c.sendx, nil)
		}
		// call  typedmemmove  Method , Copy the data to be sent to the buffer .
		typedmemmove(c.elemtype, qp, ep)
		c.sendx++ // After copying the data , Yes  sendx  The index increases by itself  1. At the same time, if  sendx  And  dataqsiz  Same size , Then return  0( The circular queue ).
		if c.sendx == c.dataqsiz {
    
			c.sendx = 0
		}
		c.qcount++ // After the self increment is completed , The total number of queues increases at the same time  1. Unlock the mutex , Return results .
		unlock(&c.lock)
		return true
	}

	if !block {
     // If there is no logic to enter the buffer for processing , Then it will judge whether it is currently blocked  channel, If non blocking , Will unlock and directly return to failure .
		unlock(&c.lock)
		return false
	}

// Blocking transmission 
	gp := getg()// call  getg  Method to get the current  goroutine  The pointer to , Used for subsequent data transmission .
	mysg := acquireSudog()// call  acquireSudog  Method to get  sudog  Structure , And set the current  sudog  Specific data information and status to be sent .
	mysg.releasetime = 0
	if t0 != 0 {
    
		mysg.releasetime = -1
	}
	// No stack splits between assigning elem and enqueuing mysg
	// on gp.waiting where copystack can find it.
	mysg.elem = ep
	mysg.waitlink = nil
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.waiting = mysg
	gp.param = nil
	c.sendq.enqueue(mysg)// call  c.sendq.enqueue  Method will just get  sudog  Join the waiting queue to be sent .
	// Signal to anyone trying to shrink our stack that we're about
	// to park on a channel. The window between when this G's status
	// changes and when we set gp.activeStackChans is not safe for
	// stack shrinking.
	atomic.Store8(&gp.parkingOnChan, 1)
	// call  gopark  Method to suspend the current  goroutine( The execution location will be recorded ), Status as  waitReasonChanSend, Block waiting  channel.
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
	// Ensure the value being sent is kept alive until the
	// receiver copies it out. The sudog has a pointer to the
	// stack object, but sudogs aren't considered as roots of the
	// stack tracer.
	// call  KeepAlive  Method to ensure that the data value to be sent is active , Until the receiver copies it . That is, allocated on the heap , To avoid being  GC  Recycling .
	KeepAlive(ep)

	//  Wake up from here , And resume the blocked sending operation 
	if mysg != gp.waiting {
    
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	gp.activeStackChans = false
	closed := !mysg.success
	gp.param = nil
	if mysg.releasetime > 0 {
    
		blockevent(mysg.releasetime-t0, 2)
	}
	mysg.c = nil
	releaseSudog(mysg)
	if closed {
    
		if c.closed == 0 {
    
			throw("chansend: spurious wakeup")
		}
		panic(plainError("send on closed channel"))
	}
	return true
}
// This completes... For all categories  channel  Data transmission management .
func full(c *hchan) bool {
    
	// c.dataqsiz is immutable (never written after the channel is created)
	// so it is safe to read at any time during channel operation.
	if c.dataqsiz == 0 {
    
		// Assumes that a pointer read is relaxed-atomic.
		return c.recvq.first == nil
	}
	// Assumes that a uint read is relaxed-atomic.
	return c.qcount == c.dataqsiz
}

send

send The method undertakes to channel The function of sending specific data :

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    
	if raceenabled {
    
		if c.dataqsiz == 0 {
    
			racesync(c, sg)
		} else {
    
			// Pretend we go through the buffer, even though
			// we copy directly. Note that we need to increment
			// the head/tail locations only when raceenabled.
			racenotify(c, c.recvx, nil)
			racenotify(c, c.recvx, sg)
			c.recvx++
			if c.recvx == c.dataqsiz {
    
				c.recvx = 0
			}
			c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
		}
	}
	if sg.elem != nil {
    
		sendDirect(c.elemtype, sg, ep)// call  sendDirect  Method copies the data to be sent directly to the memory address of the variable to be received ( Execution stack ).
		// for example :msg := <-ch  sentence , That is to remove data from  ch  Directly copied to  msg  Memory address of .
		sg.elem = nil
	}
	gp := sg.g// call  sg.g  attribute ,  from  sudog  To get the data waiting to be received  goroutine, And pass the parameters required for subsequent wake-up .
	unlockf()
	gp.param = unsafe.Pointer(sg)
	sg.success = true
	if sg.releasetime != 0 {
    
		sg.releasetime = cputicks()
	}
	goready(gp, skip+1)// call  goready  Method wakes up the to receive data  goroutine, Expect from  _Gwaiting  The status schedule is  _Grunnable.
}

receive data runtime.chanrecv

Send and receive channel Is relative , That is, its core implementation is also relative

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    

	if debugChan {
    
		print("chanrecv: chan=", c, "\n")
	}
// if  channel  yes  nil channel, And for blocking reception, call  gopark  Method to suspend the current  goroutine.
// if  channel  Non blocking mode , Then return directly .
	if c == nil {
    
		if !block {
    
			return
		}
		gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}
// For non blocking mode  channel  A quick failure check will be performed , testing  channel  Are you ready to receive .
	// Empty check 
	if !block && empty(c) {
     
// Turn off the check 
		if atomic.Load(&c.closed) == 0 {
    
			return
		}
		// The channel is irreversibly closed . Recheck whether the channel has any data to be received , This data may arrive between the empty check and the close check above . So check again 
		//channel  It has been closed and there is no cached data , It will be cleaned up  ep  Pointer and returns .
		if empty(c) {
    
			
			if raceenabled {
    
				raceacquire(c.raceaddr())
			}
			if ep != nil {
    
				typedmemclr(c.elemtype, ep)
			}
			return true, false
		}
	}

	var t0 int64
	if blockprofilerate > 0 {
    
		t0 = cputicks()
	}
// Direct reception ==》 If I found  channel  When there is a waiting sender blocking on , Then receive it directly :
	lock(&c.lock)

	if c.closed != 0 && c.qcount == 0 {
    
		if raceenabled {
    
			raceacquire(c.raceaddr())
		}
		unlock(&c.lock)
		if ep != nil {
    
			typedmemclr(c.elemtype, ep)
		}
		return true, false
	}

	if sg := c.sendq.dequeue(); sg != nil {
    
		recv(c, sg, ep, func() {
     unlock(&c.lock) }, 3)
		return true, true
	}
// Buffered reception ==》 If I found  channel  When there are elements in the buffer of :
	if c.qcount > 0 {
    
		// call  chanbuf  Methods according to the  recvx  Get the data at the index position of , Find the element to receive for processing .
		qp := chanbuf(c, c.recvx)
		if raceenabled {
    
			racenotify(c, c.recvx, nil)
		}
		if ep != nil {
    
			typedmemmove(c.elemtype, ep, qp)// If the received data and the passed in variable are not empty , It will call  typedmemmove  Method copies the data in the buffer to the passed in variable .
		}
		typedmemclr(c.elemtype, qp)// Finally, after the data is copied , Increase or decrease the total number of index entries and queues , And call  typedmemclr  Method to clean the memory data .
		c.recvx++
		if c.recvx == c.dataqsiz {
    
			c.recvx = 0
		}
		c.qcount--
		unlock(&c.lock)
		return true, true
	}

	if !block {
    
		unlock(&c.lock)
		return false, false
	}
// Blocking reception ==》 If I found  channel  There is no to send on  goroutine, When there is no data in the buffer . It will enter the last stage of blocking reception :
	gp := getg() // The subject is to get the current  goroutine
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
    
		mysg.releasetime = -1
	}
	// No stack splits between assigning elem and enqueuing mysg
	// on gp.waiting where copystack can find it.
	mysg.elem = ep
	mysg.waitlink = nil
	gp.waiting = mysg
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.param = nil
	c.recvq.enqueue(mysg)// structure  sudog  Structure saves the current data to be received ( The sender ) Address information for , And will  sudog  Join the waiting to receive queue .

	atomic.Store8(&gp.parkingOnChan, 1)
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)// Last call  gopark  Method to suspend the current  goroutine, Waiting to wake up .

	 //  Wake up and start here 
	if mysg != gp.waiting {
    
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	gp.activeStackChans = false
	if mysg.releasetime > 0 {
    
		blockevent(mysg.releasetime-t0, 2)
	}
	success := mysg.success
	gp.param = nil
	mysg.c = nil
	releaseSudog(mysg)
	return true, success
	// After being awakened , The site will be restored , Return to the corresponding execution point , Finish the final work .
}

recv

recv The method bears on channel The function of receiving specific data in :

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    
	if c.dataqsiz == 0 {
    
		if raceenabled {
    
			racesync(c, sg)
		}
		if ep != nil {
    
			// Direct reception ( There is no buffer ): call  recvDirect  Method , Its function and  sendDirect  Method relative , Directly from the sender  goroutine  Copy the data from the call stack to the receiver  goroutine.
			recvDirect(c.elemtype, sg, ep)
		}
	} else {
    
		// Queue is full. Take the item at the
		// head of the queue. Make the sender enqueue
		// its item at the tail of the queue. Since the
		// queue is full, those are both the same slot.
		// Buffered reception ( There is a buffer ): call  chanbuf  Method , according to  recvx  The location of the index reads the buffer element , After copying 
		qp := chanbuf(c, c.recvx)
		if raceenabled {
    
			racenotify(c, c.recvx, nil)
			racenotify(c, c.recvx, sg)
		}
		// And copy it to the memory address of the receiver .
		if ep != nil {
    
			typedmemmove(c.elemtype, ep, qp)
		}
		// copy data from sender to queue
		typedmemmove(c.elemtype, qp, sg.elem)
		// Yes  sendx  and  recvx  Adjust the index position .
		c.recvx++
		if c.recvx == c.dataqsiz {
    
			c.recvx = 0
		}
		c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
	}
	// Finally, it's routine  goroutine  Scheduling action , Would call  goready  Method to wake up the currently processed  sudog  Corresponding  goroutine. So in the next round of scheduling , Now that the data has been received , Naturally, the sender will be awakened .
	sg.elem = nil
	gp := sg.g
	unlockf()
	gp.param = unsafe.Pointer(sg)
	sg.success = true
	if sg.releasetime != 0 {
    
		sg.releasetime = cputicks()
	}
	goready(gp, skip+1)
}

close closechan

func closechan(c *hchan) {
    
// Preprocessing ==》 Basic check and close flag setting , Guarantee  channel  Not for  nil  And not closed , Ensure the boundary .
	if c == nil {
    
		panic(plainError("close of nil channel"))
	}

	lock(&c.lock)
	if c.closed != 0 {
    
		unlock(&c.lock)
		panic(plainError("close of closed channel"))
	}

	if raceenabled {
    
		callerpc := getcallerpc()
		racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan))
		racerelease(c.raceaddr())
	}

	c.closed = 1
// Release receiver ==》 After completing the abnormal boundary judgment and flag setting , Will the recipient's  sudog  Waiting in line (recvq) Join the queue to be cleared  glist  in :
	var glist gList

	// Removed and added  goroutine  The status needs to be  _Gwaiting, To ensure a new round of scheduling .
	for {
    
		sg := c.recvq.dequeue()
		if sg == nil {
    
			break
		}
		if sg.elem != nil {
    
			typedmemclr(c.elemtype, sg.elem)
			sg.elem = nil
		}
		if sg.releasetime != 0 {
    
			sg.releasetime = cputicks()
		}
		gp := sg.g
		gp.param = unsafe.Pointer(sg)
		sg.success = false
		if raceenabled {
    
			raceacquireg(gp, c.raceaddr())
		}
		glist.push(gp)
	}

	// Release sender ==》 Again , Just like releasing the receiver . The sender will also be added to the queue to be cleared  glist  in :
	for {
    
		sg := c.sendq.dequeue()
		if sg == nil {
    
			break
		}
		sg.elem = nil
		if sg.releasetime != 0 {
    
			sg.releasetime = cputicks()
		}
		gp := sg.g
		gp.param = unsafe.Pointer(sg)
		sg.success = false
		if raceenabled {
    
			raceacquireg(gp, c.raceaddr())
		}
		glist.push(gp)
	}
	unlock(&c.lock)

	// Scheduling ==》 Will all  glist  Medium  goroutine  Status from  _Gwaiting  Set to  _Grunnable  state , Waiting for scheduler to schedule :
//  All the following  goroutine  Allow to be rescheduled . If the sender or receiver is still passively blocked , Will be free again , Do what you should do in the future , Then run back to its application process .
	for !glist.empty() {
    
		gp := glist.pop()
		gp.schedlink = 0
		goready(gp, 3)
	}
}

Its data structure is a ring queue with cache , Plus symmetrical sendq、recvq And other auxiliary properties of the two-way linked list , Can outline channel The basic logic flow model .

In the specific data transmission , It's all about “ Boundary upper and lower bounds processing , Top mutex , Blocking / Non blocking , buffer / Non buffering , Cache out of queue , Copy the data , Unlock the mutex , Scheduling ” Deal with continuously . It is also relatively coincident in basic logic , Because sending and receiving , Always create and close .
Reference material
debugging + The illustration channel Internal implementation
Reference material
channal And csp
Cao Da

原网站

版权声明
本文为[It artist rookie]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/177/202206260138342714.html