底层实现原理
Go中 的 channel 是一个队列,遵循先进先出的原则,负责协程之间的通信(Go 语言提倡不要通过共享内存来通信,而要通过通信来实现内存共享,CSP(Communicating Sequential Process)并发模型,就是通过 goroutine 和 channel 来实现的)。
底层数据结构
通过 var 声明或者 make 函数创建的 channel 是一个存储在函数栈帧上的指针,占用8个字节,指向堆上
的 hchan 结构体:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| type hchan struct {
qcount uint // 循环数组中的数据个数
dataqsiz uint // 循环数组的长度
buf unsafe.Pointer // 指向循环数组的指针
elemsize uint16 // 元素的大小
closed uint32 // channel 关闭标志
timer *timer // timer feeding this chan
elemtype *_type // 元素类型
sendx uint // 下一次写下标的位置
recvx uint // 下一次读下标的位置
recvq waitq // 读等待队列
sendq waitq // 写等待队列
lock mutex // 互斥锁
}
|
等待队列是一个双向链表,包含一个头节点和一个尾节点。每个节点是一个sudog结构体变量,记录哪个协程在等待,等待的是哪个channel,等待发送/接收的数据在哪里。
1
2
3
4
5
6
7
8
9
10
11
12
13
| type waitq struct{
first *sudog // 队列头
last *sudog // 队列尾
}
type sudog struct{
g *g // 等待的 Goroutine
next *sudog
prev *sudog
elem unsafe.Pointer // 发送/读取数据的指针
c *hchan // 等待的 Channel
...
}
|
Channel 使用操作
创建
1
2
3
4
| // 创建有缓冲 channel,缓冲大小为 3
ch := make(chan int, 3)
// 创建无缓冲 channel
ch := make(chan int)
|
- 如果是无缓冲的 channel,会直接给 hchan 分配内存
- 如果是有缓冲的 channel,并且元素不包含指针,那么会为 hchan 和底层数组分配一段连续的地址
- 如果是有缓冲的 channel,并且元素包含指针,那么会为 hchan 和底层数组分别分配地址
发送
- 若 channel 的读等待队列存在接收者 goroutine,那么将数据直接发送给第一个等待的 goroutine,唤醒接收的 goroutine
- 若 channel 的读等待队列不存在接收者 goroutine
- 若循环数组 buf 未满,那么将会把数据发送到循环数组buf的队尾
- 如果循环数组buf已满,这个时候就会走阻塞发送的流程,将当前goroutine加入写等待队列,并挂起等待唤醒
接收
- 如果 channel 的写等待队列存在发送者 goroutine
- 如果是无缓冲 channel,直接从第一个发送者 goroutine 那里把数据拷贝给接收变量,唤醒发送的 goroutine
- 如果是有缓冲 channel(已满),将循环数组 buf 的队首元素拷贝给接收变量,将第一个发送者goroutine 的数据拷贝到 buf循环数组队尾,唤醒发送的 goroutine
- 如果 channel 的写等待队列不存在发送者goroutine
- 如果循环数组 buf 非空,将循环数组buf的队首元素拷贝给接收变量
- 如果循环数组 buf 为空,这个时候就会走阻塞接收的流程,将当前 goroutine 加入读等待队列,并挂起等待唤醒
Channel 使用场景
循环读取 Channel 数据
使用for-range
读取channel,这样既安全又便利,当channel关闭时,for循环会自动退出,无需主动监测channel是否关闭,可以防止读取已经关闭的channel,造成读到数据为通道所存储的数据类型的零值。
1
2
3
| for x := range ch{
fmt.Println(x)
}
|
多重返回判断 Channel 关闭状态
读已关闭的channel会得到零值,如果不确定channel,需要使用ok
进行检测。
1
2
3
| if v, ok := <- ch; ok { // ok 会接收到 channel 的状态,true 表示未关闭
fmt.Println(v)
}
|
使用 select 处理多 Channel
select
可以同时监控多个通道的情况,只处理未阻塞的case。当通道为nil时,对应的case永远为阻塞,无论读写。特殊关注:普通情况下,对nil的通道写操作是要panic的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
| func main() {
ch1 := make(chan int)
ch2 := make(chan int)
go func() {
for {
ch1<-1
}
}()
go func() {
for {
ch2<-2
}
}()
for i := 0; i < 10; i++ {
select {
case <-ch1:
fmt.Println("Selected ch1")
case <-ch2:
fmt.Println("Selected ch2")
}
}
}
|
使用channel的声明控制读写权限
如果协程对某个channel只有写操作,则这个channel声明为只写。
如果协程对某个channel只有读操作,则这个channe声明为只读。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| // 只有 generator 进行对 outCh 进行写操作,返回声明
// <-chan int,可以防止其他协程乱用此通道,造成隐藏 bug
func generator(int n) <-chan int {
outCh := make(chan int)
go func(){
for i:=0;i<n;i++{
outCh<-i
}
}()
return outCh
}
// consumer只读inCh的数据,声明为<-chan int
// 可以防止它向inCh写数据
func consumer(inCh <-chan int) {
for x := range inCh {
fmt.Println(x)
}
}
|
使用缓冲channel增强并发
有缓冲通道可供多个协程同时处理,在一定程度可提高并发性。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
| // 无缓冲
ch1 := make(chan int)
ch2 := make(chan int, 0)
// 有缓冲
ch3 := make(chan int, 1)
func test() {
inCh := generator(100)
outCh := make(chan int, 10)
// 使用5个 do 协程同时处理输入数据
var wg sync.WaitGroup
wg.Add(5)
for i := 0; i < 5; i++ {
go do(inCh, outCh, &wg)
}
go func() {
wg.Wait()
close(outCh)
}()
for r := range outCh {
fmt.Println(r)
}
}
func generator(n int) <-chan int {
outCh := make(chan int)
go func() {
for i := 0; i < n; i++ {
outCh <- i
}
close(outCh)
}()
return outCh
}
func do(inCh <-chan int, outCh chan<- int, wg *sync.WaitGroup) {
for v := range inCh {
outCh <- v * v
}
wg.Done()
}
|
为操作加上超时
使用select
和time.After
,看操作和定时器哪个先返回,处理先完成的,就达到了超时控制的效果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| func doWithTimeOut(timeout time.Duration) (int, error) {
select {
case ret := <-do():
return ret, nil
case <-time.After(timeout):
return 0, errors.New("timeout")
}
}
func do() <-chan int {
outCh := make(chan int)
go func() {
// do work
}()
return outCh
}
|
使用time实现channel无阻塞读写
为操作加上超时的扩展,这里的操作是channel的读或写
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| func unBlockRead(ch chan int) (x int, err error) {
select {
case x = <-ch:
return x, nil
case <-time.After(time.Microsecond):
return 0, errors.New("read time out")
}
}
func unBlockWrite(ch chan int, x int) (err error) {
select {
case ch <- x:
return nil
case <-time.After(time.Microsecond):
return errors.New("read time out")
}
}
|
close(ch)
关闭所有下游协程
退出时,显示通知所有协程退出。所有读 ch
的协程都会收到close(ch)
的信号
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| func (h *Handler) Stop() {
close(h.stopCh)
// 可以使用WaitGroup等待所有协程退出
}
// 收到停止后,不再处理请求
func (h *Handler) loop() error {
for {
select {
case req := <-h.reqCh:
go handle(req)
case <-h.stopCh:
return
}
}
}
|
使用chan struct{}
作为信号 channel
1
2
3
4
5
| // 只是要给所有协程发送退出的信号
type Handler struct {
stopCh chan struct{}
reqCh chan *Request
}
|
使用 channel 传递结构体的指针
channel 本质上传递的是数据的拷贝,拷贝的数据越小传输效率越高,传递结构体指针,比传递结构体更高效
1
2
3
4
| reqCh chan *Request
// 好过
reqCh chan Request
|
使用channel传递channel
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
| package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
func main() {
reqs := []int{1, 2, 3, 4, 5, 6, 7, 8, 9}
// 存放结果的channel的channel
outs := make(chan chan int, len(reqs))
var wg sync.WaitGroup
wg.Add(len(reqs))
for _, x := range reqs {
o := handle(&wg, x)
outs <- o
}
go func() {
wg.Wait()
close(outs)
}()
// 读取结果,结果有序
for o := range outs {
fmt.Println(<-o)
}
}
// handle 处理请求,耗时随机模拟
func handle(wg *sync.WaitGroup, a int) chan int {
out := make(chan int)
go func() {
time.Sleep(time.Duration(rand.Intn(3)) * time.Second)
out <- a
wg.Done()
}()
return out
}
|
控制 Goroutine 并发执行顺序
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| var wg sync.WaitGroup
func main() {
ch1 := make(chan struct{}, 1)
ch2 := make(chan struct{}, 1)
ch3 := make(chan struct{}, 1)
ch1 <- struct{}{}
wg.Add(3)
start := time.Now().Unix()
go print("gorouine1", ch1, ch2)
go print("gorouine2", ch2, ch3)
go print("gorouine3", ch3, ch1)
wg.Wait()
end := time.Now().Unix()
fmt.Printf("duration:%d\n", end-start)
}
func print(gorouine string, inputchan chan struct{}, outchan chan struct{}) {
// 模拟内部操作耗时
time.Sleep(1 * time.Second)
select {
case <-inputchan:
fmt.Printf("%s\n", gorouine)
outchan <- struct{}{}
}
wg.Done()
}
|
Channel 特点
三种模式
| 单向通道的写操作 | 单向通道的读操作 | 双向通道的读写操作 |
---|
创建 | make(chan<-int) | make(<- chan int) | make(chan int) |
三种状态
| 未初始化(nil) | 关闭(closed) | 正常(active) |
---|
关闭 channel | panic | panic | 正常关闭 |
发送数据 | 死锁,永远阻塞 | panic | 阻塞或成功发送 |
接收数据 | 死锁,永远阻塞 | 缓冲区为空为零值,否则继续读 | 阻塞或者成功接收 |
有1个特殊场景:当nil
的通道在select
的某个case
中时,这个case会阻塞,但不会造成死锁。
一个 channel不能多次关闭,会导致painc
如果多个 goroutine 都监听同一个 channel,那么 channel 上的数据都可能随机被某一个 goroutine 取走进行消费
如果多个 goroutine 监听同一个 channel,如果这个 channel 被关闭,则所有 goroutine 都能收到退出信号
Channel 死锁
无缓存 Channel 只写不读
1
2
3
4
| func deadlock( ) {
ch := make(chan int)
ch<-5 // 程序会一直阻塞在这里
}
|
无缓存 Channel 读在写后
1
2
3
4
5
6
| func deadlock() {
ch:=make(chan int)
ch<-5 // 程序会一直阻塞在这里
num:=<-ch
fmt.Println("num=",num)
}
|
有缓存 Channel 写入超过缓冲区大小
1
2
3
4
5
6
| func deadlock() {
ch := make(chan int, 2)
ch<-1
ch<-2
ch<-3 // 这里会发生一直阻塞的情况
}
|
空读
1
2
3
4
| func deadlock() {
ch := make(chan int)
fmt.Println(<-ch)
}
|
多个协程相互等待
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| func deadlock() {
ch1 := make(chan int)
ch2 := make(chan int)
// 互相等对方造成死锁
go func() {
for {
select {
case num := <-ch1:
fmt.Println("num=", num)
ch2 <- 100
}
}
}()
for {
select {
case num := <-ch2:
fmt.Println("num=", num)
ch1 <- 300
}
}
}
|