Channels
- goroutine-safe (hchan mutex + some atomic)
- can save elements, FIFO semantics (hchan buffer)
- sharing data between goroutines (sendDirect, operations with buffer)
- blocking goroutines on reads/writes if the channel is empty/full (sendq/recvq, sudog, calls to scheduler: gopark(), goready())
All channels are stored in the heap.
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
lock mutex
}
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
lock mutex
}
Channel is a pointer to a hchan struct.
buf
is a circular queue, sendx
and recvx
are the indexes of the queue.
Sending
- acquire the lock
- add COPY of the value to the queue
- release the lock
Receiving
- acquire the lock
- read the value from the queue
- release the lock
Buffer overflow
Sender will be paused if channel is full.
ch <- data
=gopark()
-> scheduler -> change goroutine state to waiting -> free os threadadd to
sendq
(waitq
struct, which is a linked list of goroutines waiting for send)
type waitq struct {
first *sudog
last *sudog
}
type sudog struct {
g *g
next *sudog
prev *sudog
elem unsafe.Pointer
//...
}
type waitq struct {
first *sudog
last *sudog
}
type sudog struct {
g *g
next *sudog
prev *sudog
elem unsafe.Pointer
//...
}
- taking data which is under the
sendx
index, reciever will check for waiting goroutines which are insendq
and will wake them up (with putting data to queue)
wake up = call of goready()
-> scheduler -> set runnable state -> put to run queue
mutex locked only once
Reader works in the same way with recvq
.
sendDirect()
will send data directly if channel is empty and some goroutines are waiting for data. Sending direct from stack from one goroutine to another.
Select statement
select {
case <-ch1:
// ...
default:
// ...
}
select {
case <-ch1:
// ...
default:
// ...
}
<-ch1
= chanrecv(hchan *chanType, ep unsafe.Pointer, block bool)
-> block = false
because we using select
Closing
- is channel initialized? (panic if not)
- lock the mutex
- is channel closed? (panic if yes)
- set closed flag
- release all readers
- release all writers, will panic
- release the mutex
- unblock all goroutines