返回

Go八股之Channel

底层实现原理

Go中 的 channel 是一个队列,遵循先进先出的原则,负责协程之间的通信(Go 语言提倡不要通过共享内存来通信,而要通过通信来实现内存共享,CSP(Communicating Sequential Process)并发模型,就是通过 goroutine 和 channel 来实现的)。

底层数据结构

通过 var 声明或者 make 函数创建的 channel 是一个存储在函数栈帧上的指针,占用8个字节,指向堆上 的 hchan 结构体:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
type hchan struct {
	qcount   uint           // 循环数组中的数据个数
	dataqsiz uint           // 循环数组的长度
	buf      unsafe.Pointer // 指向循环数组的指针
	elemsize uint16 // 元素的大小
	closed   uint32 // channel 关闭标志
	timer    *timer // timer feeding this chan
	elemtype *_type // 元素类型
    
	sendx    uint   // 下一次写下标的位置
	recvx    uint   // 下一次读下标的位置
	recvq    waitq  // 读等待队列
	sendq    waitq  // 写等待队列
    
	lock mutex // 互斥锁
}

等待队列是一个双向链表,包含一个头节点和一个尾节点。每个节点是一个sudog结构体变量,记录哪个协程在等待,等待的是哪个channel,等待发送/接收的数据在哪里。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
type waitq struct{
	first *sudog  // 队列头
	last *sudog   // 队列尾
}

type sudog struct{
	g *g  // 等待的 Goroutine
	next *sudog
	prev *sudog
	elem unsafe.Pointer // 发送/读取数据的指针
	c *hchan // 等待的 Channel
	...
}

Channel 使用操作

创建

1
2
3
4
// 创建有缓冲 channel,缓冲大小为 3
ch := make(chan int, 3)
// 创建无缓冲 channel
ch := make(chan int)
  • 如果是无缓冲的 channel,会直接给 hchan 分配内存
  • 如果是有缓冲的 channel,并且元素不包含指针,那么会为 hchan 和底层数组分配一段连续的地址
  • 如果是有缓冲的 channel,并且元素包含指针,那么会为 hchan 和底层数组分别分配地址

发送

  • 若 channel 的读等待队列存在接收者 goroutine,那么将数据直接发送给第一个等待的 goroutine,唤醒接收的 goroutine
  • 若 channel 的读等待队列不存在接收者 goroutine
    • 若循环数组 buf 未满,那么将会把数据发送到循环数组buf的队尾
    • 如果循环数组buf已满,这个时候就会走阻塞发送的流程,将当前goroutine加入写等待队列,并挂起等待唤醒

接收

  • 如果 channel 的写等待队列存在发送者 goroutine
    • 如果是无缓冲 channel,直接从第一个发送者 goroutine 那里把数据拷贝给接收变量,唤醒发送的 goroutine
    • 如果是有缓冲 channel(已满),将循环数组 buf 的队首元素拷贝给接收变量,将第一个发送者goroutine 的数据拷贝到 buf循环数组队尾,唤醒发送的 goroutine
  • 如果 channel 的写等待队列不存在发送者goroutine
    • 如果循环数组 buf 非空,将循环数组buf的队首元素拷贝给接收变量
    • 如果循环数组 buf 为空,这个时候就会走阻塞接收的流程,将当前 goroutine 加入读等待队列,并挂起等待唤醒

Channel 使用场景

循环读取 Channel 数据

使用for-range读取channel,这样既安全又便利,当channel关闭时,for循环会自动退出,无需主动监测channel是否关闭,可以防止读取已经关闭的channel,造成读到数据为通道所存储的数据类型的零值。

1
2
3
for x := range ch{
    fmt.Println(x)
}

多重返回判断 Channel 关闭状态

读已关闭的channel会得到零值,如果不确定channel,需要使用ok进行检测。

1
2
3
if v, ok := <- ch; ok { // ok 会接收到 channel 的状态,true 表示未关闭
    fmt.Println(v)
}

使用 select 处理多 Channel

select可以同时监控多个通道的情况,只处理未阻塞的case。当通道为nil时,对应的case永远为阻塞,无论读写。特殊关注:普通情况下,对nil的通道写操作是要panic的

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func main() {
    ch1 := make(chan int)
	ch2 := make(chan int)
	
    go func() {
        for {
            ch1<-1
        }
    }()
    
    go func() {
        for {
            ch2<-2
        }
    }()
    
    for i := 0; i < 10; i++ {
        select {
        case <-ch1:
            fmt.Println("Selected ch1")
        case <-ch2:
            fmt.Println("Selected ch2")
        }
    }
}

使用channel的声明控制读写权限

如果协程对某个channel只有写操作,则这个channel声明为只写。

如果协程对某个channel只有读操作,则这个channe声明为只读。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
// 只有 generator 进行对 outCh 进行写操作,返回声明
// <-chan int,可以防止其他协程乱用此通道,造成隐藏 bug
func generator(int n) <-chan int {
    outCh := make(chan int)
    go func(){
        for i:=0;i<n;i++{
            outCh<-i
        }
    }()
    return outCh
}

// consumer只读inCh的数据,声明为<-chan int
// 可以防止它向inCh写数据
func consumer(inCh <-chan int) {
    for x := range inCh {
        fmt.Println(x)
    }
}

使用缓冲channel增强并发

有缓冲通道可供多个协程同时处理,在一定程度可提高并发性。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// 无缓冲
ch1 := make(chan int)
ch2 := make(chan int, 0)
// 有缓冲
ch3 := make(chan int, 1)

func test() {
    inCh := generator(100)
    outCh := make(chan int, 10)

    // 使用5个 do 协程同时处理输入数据
    var wg sync.WaitGroup
    wg.Add(5)
    for i := 0; i < 5; i++ {
        go do(inCh, outCh, &wg)
    }

    go func() {
        wg.Wait()
        close(outCh)
    }()

    for r := range outCh {
        fmt.Println(r)
    }
}

func generator(n int) <-chan int {
    outCh := make(chan int)
    go func() {
        for i := 0; i < n; i++ {
            outCh <- i
        }
        close(outCh)
    }()
    return outCh
}

func do(inCh <-chan int, outCh chan<- int, wg *sync.WaitGroup) {
    for v := range inCh {
        outCh <- v * v
    }

    wg.Done()
}

为操作加上超时

使用selecttime.After,看操作和定时器哪个先返回,处理先完成的,就达到了超时控制的效果

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
func doWithTimeOut(timeout time.Duration) (int, error) {
    select {
    case ret := <-do():
        return ret, nil
    case <-time.After(timeout):
        return 0, errors.New("timeout")
    }
}

func do() <-chan int {
    outCh := make(chan int)
    go func() {
        // do work
    }()
    return outCh
}

使用time实现channel无阻塞读写

为操作加上超时的扩展,这里的操作是channel的读或写

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
func unBlockRead(ch chan int) (x int, err error) {
    select {
    case x = <-ch:
        return x, nil
    case <-time.After(time.Microsecond):
        return 0, errors.New("read time out")
    }
}

func unBlockWrite(ch chan int, x int) (err error) {
    select {
    case ch <- x:
        return nil
    case <-time.After(time.Microsecond):
        return errors.New("read time out")
    }
}

close(ch) 关闭所有下游协程

退出时,显示通知所有协程退出。所有读 ch 的协程都会收到close(ch)的信号

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
func (h *Handler) Stop() {
    close(h.stopCh)

    // 可以使用WaitGroup等待所有协程退出
}

// 收到停止后,不再处理请求
func (h *Handler) loop() error {
    for {
        select {
        case req := <-h.reqCh:
            go handle(req)
        case <-h.stopCh:
            return
        }
    }
}

使用chan struct{}作为信号 channel

1
2
3
4
5
// 只是要给所有协程发送退出的信号
type Handler struct {
    stopCh chan struct{}
    reqCh chan *Request
}

使用 channel 传递结构体的指针

channel 本质上传递的是数据的拷贝,拷贝的数据越小传输效率越高,传递结构体指针,比传递结构体更高效

1
2
3
4
reqCh chan *Request

// 好过
reqCh chan Request

使用channel传递channel

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

func main() {
    reqs := []int{1, 2, 3, 4, 5, 6, 7, 8, 9}

    // 存放结果的channel的channel
    outs := make(chan chan int, len(reqs))
    var wg sync.WaitGroup
    wg.Add(len(reqs))
    for _, x := range reqs {
        o := handle(&wg, x)
        outs <- o
    }

    go func() {
        wg.Wait()
        close(outs)
    }()

    // 读取结果,结果有序
    for o := range outs {
        fmt.Println(<-o)
    }
}

// handle 处理请求,耗时随机模拟
func handle(wg *sync.WaitGroup, a int) chan int {
    out := make(chan int)
    go func() {
        time.Sleep(time.Duration(rand.Intn(3)) * time.Second)
        out <- a
        wg.Done()
    }()
    return out
}

控制 Goroutine 并发执行顺序

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
var wg sync.WaitGroup

func main() {
    ch1 := make(chan struct{}, 1)
    ch2 := make(chan struct{}, 1)
    ch3 := make(chan struct{}, 1)
    ch1 <- struct{}{}
    wg.Add(3)
    start := time.Now().Unix()
    go print("gorouine1", ch1, ch2)
    go print("gorouine2", ch2, ch3)
    go print("gorouine3", ch3, ch1)
    wg.Wait()
    end := time.Now().Unix()
    fmt.Printf("duration:%d\n", end-start)
}

func print(gorouine string, inputchan chan struct{}, outchan chan struct{}) {
    // 模拟内部操作耗时
    time.Sleep(1 * time.Second)
    select {
    case <-inputchan:
        fmt.Printf("%s\n", gorouine)
        outchan <- struct{}{}
    }
    wg.Done()
}

Channel 特点

  • 三种模式

    单向通道的写操作单向通道的读操作双向通道的读写操作
    创建make(chan<-int)make(<- chan int)make(chan int)
  • 三种状态

    未初始化(nil)关闭(closed)正常(active)
    关闭 channelpanicpanic正常关闭
    发送数据死锁,永远阻塞panic阻塞或成功发送
    接收数据死锁,永远阻塞缓冲区为空为零值,否则继续读阻塞或者成功接收

    有1个特殊场景:当nil的通道在select的某个case中时,这个case会阻塞,但不会造成死锁。

  • 一个 channel不能多次关闭,会导致painc

    如果多个 goroutine 都监听同一个 channel,那么 channel 上的数据都可能随机被某一个 goroutine 取走进行消费

    如果多个 goroutine 监听同一个 channel,如果这个 channel 被关闭,则所有 goroutine 都能收到退出信号

Channel 死锁

无缓存 Channel 只写不读

1
2
3
4
func deadlock( ) {
    ch := make(chan int)
    ch<-5 // 程序会一直阻塞在这里
}

无缓存 Channel 读在写后

1
2
3
4
5
6
func deadlock() {
    ch:=make(chan int)
    ch<-5 // 程序会一直阻塞在这里
    num:=<-ch
    fmt.Println("num=",num)
}

有缓存 Channel 写入超过缓冲区大小

1
2
3
4
5
6
func deadlock() {
	ch := make(chan int, 2)
    ch<-1
    ch<-2
    ch<-3  // 这里会发生一直阻塞的情况
}

空读

1
2
3
4
func deadlock() {
    ch := make(chan int)
    fmt.Println(<-ch)
}

多个协程相互等待

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
func deadlock() {
    ch1 := make(chan int)
    ch2 := make(chan int)
    // 互相等对方造成死锁
    go func() {
        for {
            select {
            case num := <-ch1:
                fmt.Println("num=", num)
                ch2 <- 100
            }
        }
    }()

    for {
        select {
        case num := <-ch2:
            fmt.Println("num=", num)
            ch1 <- 300
        }
    }
}
Licensed under CC BY-NC-SA 4.0
载入天数...载入时分秒...
使用 Hugo 构建
主题 StackJimmy 设计