Channels
- goroutine-safe (hchan mutex + some atomic)
- can save elements, FIFO semantics (hchan buffer)
- sharing data between goroutines (sendDirect, operations with buffer)
- blocking goroutines on reads/writes if the channel is empty/full (sendq/recvq, sudog, calls to scheduler: gopark(), goready())
All channels are stored in the heap.
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
lock mutex
}type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
lock mutex
}Channel is a pointer to a hchan struct.
buf is a circular queue, sendx and recvx are the indexes of the queue.
Sending
- acquire the lock
- add COPY of the value to the queue
- release the lock
Receiving
- acquire the lock
- read the value from the queue
- release the lock
Buffer overflow
Sender will be paused if channel is full.
ch <- data=gopark()-> scheduler -> change goroutine state to waiting -> free os threadadd to
sendq(waitqstruct, which is a linked list of goroutines waiting for send)
type waitq struct {
first *sudog
last *sudog
}
type sudog struct {
g *g
next *sudog
prev *sudog
elem unsafe.Pointer
//...
}type waitq struct {
first *sudog
last *sudog
}
type sudog struct {
g *g
next *sudog
prev *sudog
elem unsafe.Pointer
//...
}- taking data which is under the
sendxindex, reciever will check for waiting goroutines which are insendqand will wake them up (with putting data to queue)
wake up = call of goready() -> scheduler -> set runnable state -> put to run queue
mutex locked only once
Reader works in the same way with recvq.
sendDirect() will send data directly if channel is empty and some goroutines are waiting for data. Sending direct from stack from one goroutine to another.
Select statement
select {
case <-ch1:
// ...
default:
// ...
}select {
case <-ch1:
// ...
default:
// ...
}<-ch1 = chanrecv(hchan *chanType, ep unsafe.Pointer, block bool) -> block = false because we using select
Closing
- is channel initialized? (panic if not)
- lock the mutex
- is channel closed? (panic if yes)
- set closed flag
- release all readers
- release all writers, will panic
- release the mutex
- unblock all goroutines