当前位置:网站首页>Golang--channel and select
Golang--channel and select
2022-06-26 02:40:00 【It artist rookie】
channal principle 1
channal principle 2
channal principle 3
Why? channel Can be in different g Send a message in the , For users, there is no need to worry about concurrency ?
In fact, that is hchan Mutex lock is used internally to ensure concurrency security
ch := make(chan struct{
})
Back to hchan Pointer to type ; Create a channel In essence, you get a runtime.hchan The pointer to , Follow up on this chan The operation of , Nothing more than to perform corresponding operations on structure fields .
chan The essence of hchan
type hchan struct {
qcount uint // The number of current elements in the queue
dataqsiz uint // The capacity of the queue , It is immutable ( Never write after the channel is created ), It is therefore safe to read it at any time during channel operation .
buf unsafe.Pointer // The pointing length is dataqsiz The underlying array of , Only if channel It makes sense to be buffered .
elemsize uint16 // Size of each object in the queue
closed uint32 //channal Whether to shut down ==> be equal to 0 Is not closed
elemtype *_type // Types of elements in the queue
sendx uint // The index position of the sent element in the circular queue .
recvx uint // The index position of the received element in the circular queue .
recvq waitq // Of the recipient sudog Waiting in line ( Blocking waiting when buffer is insufficient goroutine).
sendq waitq // The sender's sudog Waiting in line .
lock mutex // The mutex
}
type waitq struct {
first *sudog
last *sudog
}
sudog yes Go Language is used to store the process whose status is blocked goroutine Two way linked list abstraction , You can directly understand it as a waiting goroutine That's all right. .
sudog It's a runtime structure , Its main function is to represent a... In the waiting list Goroutine, It stores information about this blocking and two points to the front and back respectively sudog The pointer .
type sudog struct {
// The following fields are protected by the hchan.lock of the
// channel this sudog is blocking on. shrinkstack depends on
// this for sudogs involved in channel ops.
g *g // Point to the current goroutine.
next *sudog // Point to next g.
prev *sudog // Point to previous g.
elem unsafe.Pointer // data element (may point to stack)
// The following fields are never accessed concurrently.
// For channels, waitlink is only accessed by g.
// For semaphores, all fields (including the ones above)
// are only accessed when holding a semaRoot lock.
acquiretime int64
releasetime int64
ticket uint32
// isSelect indicates g is participating in a select, so
// g.selectDone must be CAS'd to win the wake-up race.
isSelect bool
// success indicates whether communication over channel c
// succeeded. It is true if the goroutine was awoken because a
// value was delivered over channel c, and false if awoken
// because c was closed.
success bool
parent *sudog // semaRoot binary tree
waitlink *sudog // g.waiting list or semaRoot
waittail *sudog // semaRoot
c *hchan // channel
}
makechan
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// compiler checks this but be safe.
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
var c *hchan
switch {
case mem == 0: // The size of the queue or element is 0 Under the circumstances , Will call mallocgc Method to allocate a continuous memory space .
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case elem.ptrdata == 0: // At present channel The stored element does not have a pointer reference , Together with hchan Allocate a continuous memory space with the underlying array at the same time .
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default: // Default current channel Stored elements have pointer references ------ The default allocation matches the contiguous memory space .
// Elements contain pointers.
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
}
return c
}
// channel The creation of is called mallocgc Method , That is to say channel It's all created on the heap . therefore channel Yes, it will be GC The recycling , Naturally, it is not always necessary close Method to display and close .
chansend send data
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
// In limine chansend Method will first judge the current channel Is it nil. if nil, Logically speaking, it means to nil channel send data , Will call gopark Method makes the current Goroutine Sleep , A deadlock crash occurs , Appearance is appearance panic Event to quickly fail .
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
if debugChan {
print("chansend: chan=", c, "\n")
}
if raceenabled {
racereadpc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(chansend))
}
// And then there's the non blocking channel Make an upper limit judgment , See if it fails quickly .
// The failure scenario is as follows :
// If not blocked and not closed , At the same time, the underlying data dataqsiz The size is 0( Buffer has no element ), Will return a failure ..
// if qcount And dataqsiz Same size ( Buffer full ) when , Will return a failure .
if !block && c.closed == 0 && full(c) {
return false
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// When above channel The prepositional judgment of . At the completion of the channel After the pre judgment of , Before entering the processing of sending data ,channel Will be locked
lock(&c.lock)
// Before the official start of sending , After lock up , Would be right channel Make a state judgment ( Whether to shut down ):
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
// Direct transmission ===> At present channel There are blocking waiting recipients , Then you just need to send it directly .
if sg := c.recvq.dequeue(); sg != nil {
// adopt dequeue from recvq The first to fall into wait Goroutine And send it data directly
send(c, sg, ep, func() {
unlock(&c.lock) }, 3)
return true
}
// Buffered transmission ===> Indirect transmission , Judge channel Whether there is still space in the buffer :
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
// call chanbuf Method , In this way, the data in the underlying buffer is obtained sendx The element pointer value of the index .
qp := chanbuf(c, c.sendx)
if raceenabled {
racenotify(c, c.sendx, nil)
}
// call typedmemmove Method , Copy the data to be sent to the buffer .
typedmemmove(c.elemtype, qp, ep)
c.sendx++ // After copying the data , Yes sendx The index increases by itself 1. At the same time, if sendx And dataqsiz Same size , Then return 0( The circular queue ).
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++ // After the self increment is completed , The total number of queues increases at the same time 1. Unlock the mutex , Return results .
unlock(&c.lock)
return true
}
if !block {
// If there is no logic to enter the buffer for processing , Then it will judge whether it is currently blocked channel, If non blocking , Will unlock and directly return to failure .
unlock(&c.lock)
return false
}
// Blocking transmission
gp := getg()// call getg Method to get the current goroutine The pointer to , Used for subsequent data transmission .
mysg := acquireSudog()// call acquireSudog Method to get sudog Structure , And set the current sudog Specific data information and status to be sent .
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)// call c.sendq.enqueue Method will just get sudog Join the waiting queue to be sent .
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
atomic.Store8(&gp.parkingOnChan, 1)
// call gopark Method to suspend the current goroutine( The execution location will be recorded ), Status as waitReasonChanSend, Block waiting channel.
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
// Ensure the value being sent is kept alive until the
// receiver copies it out. The sudog has a pointer to the
// stack object, but sudogs aren't considered as roots of the
// stack tracer.
// call KeepAlive Method to ensure that the data value to be sent is active , Until the receiver copies it . That is, allocated on the heap , To avoid being GC Recycling .
KeepAlive(ep)
// Wake up from here , And resume the blocked sending operation
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
if closed {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
return true
}
// This completes... For all categories channel Data transmission management .
func full(c *hchan) bool {
// c.dataqsiz is immutable (never written after the channel is created)
// so it is safe to read at any time during channel operation.
if c.dataqsiz == 0 {
// Assumes that a pointer read is relaxed-atomic.
return c.recvq.first == nil
}
// Assumes that a uint read is relaxed-atomic.
return c.qcount == c.dataqsiz
}
send
send The method undertakes to channel The function of sending specific data :
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if raceenabled {
if c.dataqsiz == 0 {
racesync(c, sg)
} else {
// Pretend we go through the buffer, even though
// we copy directly. Note that we need to increment
// the head/tail locations only when raceenabled.
racenotify(c, c.recvx, nil)
racenotify(c, c.recvx, sg)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
}
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)// call sendDirect Method copies the data to be sent directly to the memory address of the variable to be received ( Execution stack ).
// for example :msg := <-ch sentence , That is to remove data from ch Directly copied to msg Memory address of .
sg.elem = nil
}
gp := sg.g// call sg.g attribute , from sudog To get the data waiting to be received goroutine, And pass the parameters required for subsequent wake-up .
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)// call goready Method wakes up the to receive data goroutine, Expect from _Gwaiting The status schedule is _Grunnable.
}
receive data runtime.chanrecv
Send and receive channel Is relative , That is, its core implementation is also relative
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if debugChan {
print("chanrecv: chan=", c, "\n")
}
// if channel yes nil channel, And for blocking reception, call gopark Method to suspend the current goroutine.
// if channel Non blocking mode , Then return directly .
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
// For non blocking mode channel A quick failure check will be performed , testing channel Are you ready to receive .
// Empty check
if !block && empty(c) {
// Turn off the check
if atomic.Load(&c.closed) == 0 {
return
}
// The channel is irreversibly closed . Recheck whether the channel has any data to be received , This data may arrive between the empty check and the close check above . So check again
//channel It has been closed and there is no cached data , It will be cleaned up ep Pointer and returns .
if empty(c) {
if raceenabled {
raceacquire(c.raceaddr())
}
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// Direct reception ==》 If I found channel When there is a waiting sender blocking on , Then receive it directly :
lock(&c.lock)
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() {
unlock(&c.lock) }, 3)
return true, true
}
// Buffered reception ==》 If I found channel When there are elements in the buffer of :
if c.qcount > 0 {
// call chanbuf Methods according to the recvx Get the data at the index position of , Find the element to receive for processing .
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)// If the received data and the passed in variable are not empty , It will call typedmemmove Method copies the data in the buffer to the passed in variable .
}
typedmemclr(c.elemtype, qp)// Finally, after the data is copied , Increase or decrease the total number of index entries and queues , And call typedmemclr Method to clean the memory data .
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
if !block {
unlock(&c.lock)
return false, false
}
// Blocking reception ==》 If I found channel There is no to send on goroutine, When there is no data in the buffer . It will enter the last stage of blocking reception :
gp := getg() // The subject is to get the current goroutine
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)// structure sudog Structure saves the current data to be received ( The sender ) Address information for , And will sudog Join the waiting to receive queue .
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)// Last call gopark Method to suspend the current goroutine, Waiting to wake up .
// Wake up and start here
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, success
// After being awakened , The site will be restored , Return to the corresponding execution point , Finish the final work .
}
recv
recv The method bears on channel The function of receiving specific data in :
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 {
if raceenabled {
racesync(c, sg)
}
if ep != nil {
// Direct reception ( There is no buffer ): call recvDirect Method , Its function and sendDirect Method relative , Directly from the sender goroutine Copy the data from the call stack to the receiver goroutine.
recvDirect(c.elemtype, sg, ep)
}
} else {
// Queue is full. Take the item at the
// head of the queue. Make the sender enqueue
// its item at the tail of the queue. Since the
// queue is full, those are both the same slot.
// Buffered reception ( There is a buffer ): call chanbuf Method , according to recvx The location of the index reads the buffer element , After copying
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
racenotify(c, c.recvx, sg)
}
// And copy it to the memory address of the receiver .
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// copy data from sender to queue
typedmemmove(c.elemtype, qp, sg.elem)
// Yes sendx and recvx Adjust the index position .
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
// Finally, it's routine goroutine Scheduling action , Would call goready Method to wake up the currently processed sudog Corresponding goroutine. So in the next round of scheduling , Now that the data has been received , Naturally, the sender will be awakened .
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
}
close closechan
func closechan(c *hchan) {
// Preprocessing ==》 Basic check and close flag setting , Guarantee channel Not for nil And not closed , Ensure the boundary .
if c == nil {
panic(plainError("close of nil channel"))
}
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
if raceenabled {
callerpc := getcallerpc()
racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan))
racerelease(c.raceaddr())
}
c.closed = 1
// Release receiver ==》 After completing the abnormal boundary judgment and flag setting , Will the recipient's sudog Waiting in line (recvq) Join the queue to be cleared glist in :
var glist gList
// Removed and added goroutine The status needs to be _Gwaiting, To ensure a new round of scheduling .
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
// Release sender ==》 Again , Just like releasing the receiver . The sender will also be added to the queue to be cleared glist in :
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)
// Scheduling ==》 Will all glist Medium goroutine Status from _Gwaiting Set to _Grunnable state , Waiting for scheduler to schedule :
// All the following goroutine Allow to be rescheduled . If the sender or receiver is still passively blocked , Will be free again , Do what you should do in the future , Then run back to its application process .
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
Its data structure is a ring queue with cache , Plus symmetrical sendq、recvq And other auxiliary properties of the two-way linked list , Can outline channel The basic logic flow model .
In the specific data transmission , It's all about “ Boundary upper and lower bounds processing , Top mutex , Blocking / Non blocking , buffer / Non buffering , Cache out of queue , Copy the data , Unlock the mutex , Scheduling ” Deal with continuously . It is also relatively coincident in basic logic , Because sending and receiving , Always create and close .
Reference material
debugging + The illustration channel Internal implementation
Reference material
channal And csp
Cao Da
边栏推荐
- How did the thief unlock the password after the iPhone was stolen? After reading the long knowledge
- MySQL must master 4 languages!
- leetcode 300. Longest increasing subsequence (medium)
- 网上股票开户安全吗?
- 基于 ovarian 数据集 进行生存分析
- The programmer's eight-year salary change has made netizens envious: you pay me one year's salary per month
- Deep understanding of distributed cache design
- 在 R 中创建非线性最小二乘检验
- Implementation of depth first traversal based on adjacency matrix
- How do I fix the iPhone green screen problem? Try these solutions
猜你喜欢
为 ServiceCollection 实现装饰器模式
【flask入门系列】flask处理请求和处理响应
MySQL doit maîtriser 4 langues!
One year's work
2022-06-25:给定一个正数n, 表示有0~n-1号任务, 给定一个长度为n的数组time,time[i]表示i号任务做完的时间, 给定一个二维数组matrix, matrix[j] = {a,
Markov decision process (MDP): gambler problem
Redis Lua sandbox bypass command execution (cve-2022-0543)
图的广度优先遍历
Multi surveyor Gongshu Xiao sir_ The solution of using websocket error reporting under working directory
音视频与CPU架构
随机推荐
Multi surveyor Gongshu Xiao sir_ The solution of using websocket error reporting under working directory
Bloc入门之Cubit详解
OpenAPI 3.0 specification - Food Guide
Fastadmin applet assistant is purchased, but the work order cannot be published in the problem work order
Depth first traversal based on adjacency table
Possible values for @supply in kotlin
使用 AnnotationDbi 转换 R 中的基因名称
Digital commodity DGE -- the dark horse of wealth in digital economy
音视频与CPU架构
Redis classic 20 questions
Share some remote office experience in Intranet operation | community essay solicitation
MySQL必须掌握4种语言!
How did the thief unlock the password after the iPhone was stolen? After reading the long knowledge
为 ServiceCollection 实现装饰器模式
Introduction to bloc: detailed explanation of cube
网上开户选哪个证券公司?网上开户是否安全么?
解读Oracle
One year's work
Multi surveyor Gongshu campus Xiao sir_ Page error in Jenkins
Implementation of depth first traversal based on adjacency matrix