线程通信
进程通信就是指进程之间的信息交换。
共享存储
多个线程直接访问同一块内存用来通信,使用互斥锁对内存进行保护。这种方法抽象层次低,耦合度高,还有可能导致死锁问题。
- 基于数据结构的通信
- 多个进程共用某些数据结构,实现诸进程间的信息交换。由用户(程序员)负责同步处理,OS提供共享存储器。
- 低级通信:可以传递少量数据,效率低
- 基于共享存储区的通信
- 多个进程可通过对共享存储区中的数据读或写来实现通信。
- 高级通信:可以传递大量数据,效率高。
消息传递
进程通信采用消息传递方式时,进程间的数据交换会以格式化的信息 (Message) 为单位。进程通过操作系统提供的"发送消息/接受消息"两个原语进行数据交换。
- 直接消息传递
- 发送方将消息直接挂到接收方的消息队列中
- 间接消息传递
- 发送进程发送消息到中间缓冲区
- 接收进程在中间缓冲区接收线程
管道通信
管道是指用于连接读写进程的一个共享文件,又名pipe文件。进程通信采用管道通信方式时,操作系统会在内存中开辟的一个大小固定的缓冲区,进程需要按照规则进行通信。其中规则如下:
- 一条管道只能实现半双工通信,即 某一时间段内只能实现单向的数据传输。
- 各个进程只能互斥的访问管道,即 当一个进程在写的时候,另外一个进程不能读,反之亦然。
- 数据会以字符流的形式写入管道,当管道写满时,写进程的write()系统调用将会被阻塞,直到读进程将数据取走;当读进程将数据全部取走后,管道变空,此时读进程的read()系统调用将被阻塞。
- 如果没写满,就不允许读;如果没读空,就不允许写。
CSP 并发模型
CSP(Communicating Sequential Processes,通信顺序进程)并发模型倡导使用通信的手段来进行共享内存,继而实现多个线程之间的通信。这也是 Golang 倡导使用的并发模型,通过 Channel 来使用。其中有两个核心概念:
- 并发实体:Go 使用 Goroutine,Goroutine 之间相互独立、且并发执行
- 通道:Go 使用 Channel,并发实体之间使用通道发送消息
Go 的并发
原子操作
原子操作是最基础的并发原语。Go 官方在 atomic
包中提供了操作,具体详见 Go八股之Mutex
Channel
channel
管道,高级同步原语,goroutine之间通信的桥梁,详见 Go八股之Channel
基本并发原语
sync.Mutex:互斥锁可以限制对临界资源的访问,保证同一时刻只有一个 Goroutine 访问共享资源
1 2 3 4 5
mutex := &sync.Mutex{} mutex.Lock() // 临界区操作 mutex.Unlock()
sync.RWMutex:读写锁可以限制对临界资源的访问,保证同一时刻只有一个 Goroutine 写共享资源,允许多个 Goroutine 读共享资源
1 2 3 4 5 6 7 8 9
mutex := &sync.RWMutex{} mutex.Lock() // 临界区写操作 mutex.Unlock() mutex.RLock() // 临界区读操作 mutex.RUnlock()
sync.WaitGroup:等待一组 Goroutine 的返回。
sync.WaitGroup
拥有一个内部计数器。当计数器等于0
时,则Wait()
方法会立即返回。否则它将阻塞执行Wait()
方法的goroutine
直到计数器等于0
时为止。要增加计数器,我们必须使用
Add(int)
方法。要减少它,我们可以使用Done()
(将计数器减1
),也可以传递负数给Add
方法把计数器减少指定大小,Done()
方法底层就是通过Add(-1)
实现的。1 2 3 4 5 6 7 8 9 10 11 12 13
wg := &sync.WaitGroup{} // 启动 5 个 Goroutine for i := 0; i < 5; i++ { wg.Add(1) go func() { // 操作 wg.Done() }() } wg.Wait() // 继续往下执行...
sync.Map:线程安全的 Map
- 使用
Store(interface {},interface {})
添加元素。 - 使用
Load(interface {}) interface {}
检索元素。 - 使用
Delete(interface {})
删除元素。 - 使用
LoadOrStore(interface {},interface {}) (interface {},bool)
检索或添加之前不存在的元素。如果键之前在map中存在,则返回的布尔值为true。 - 使用
Range
遍历元素。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
m := &sync.Map{} // 添加元素 m.Store(1, "one") m.Store(2, "two") // 获取元素1 value, contains := m.Load(1) if contains { fmt.Printf("%s\n", value.(string)) } // 返回已存value,否则把指定的键值存储到map中 value, loaded := m.LoadOrStore(3, "three") if !loaded { fmt.Printf("%s\n", value.(string)) } m.Delete(3) // 迭代所有元素 m.Range(func(key, value interface{}) bool { fmt.Printf("%d: %s\n", key.(int), value.(string)) return true })
- 使用
sync.Pool:可以将暂时将不用的对象缓存起来,待下次需要的时候直接使用,不用再次经过内存分配,复用对象的内存,减轻 GC 的压力,提升系统的性能。
Get() interface{}
用来从并发池中取出元素。Put(interface{})
将一个对象加入并发池。
1 2 3 4 5 6 7 8 9 10 11 12
pool := &sync.Pool{} pool.Put(NewConnection(1)) pool.Put(NewConnection(2)) pool.Put(NewConnection(3)) connection := pool.Get().(*Connection) fmt.Printf("%d\n", connection.id) connection = pool.Get().(*Connection) fmt.Printf("%d\n", connection.id) connection = pool.Get().(*Connection) fmt.Printf("%d\n", connection.id)
sync.Once:确保一个函数仅执行一次
1 2 3 4 5 6 7 8 9
once := &sync.Once{} for i := 0; i < 4; i++ { i := i go func() { once.Do(func() { fmt.Printf("first %d\n", i) }) }() }
sync.Cond:发出信号(一对一)或广播信号(一对多)到 Goroutine
sync.Context:上下文信息传递、提供超时和取消机制、控制子 goroutine 的执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
package main import ( "context" "fmt" "time" ) func main() { ctx, cancel := context.WithCancel(context.Background()) go func() { defer func() { fmt.Println("goroutine exit") }() for { select { case <-ctx.Done(): fmt.Println("receive cancel signal!") return default: fmt.Println("default") time.Sleep(time.Second) } } }() time.Sleep(time.Second) cancel() time.Sleep(2 * time.Second) }
拓展并发原语
- errGroup
- 提供了一种方便的方式来跟踪和处理多个 Goroutine 中的错误。它可以让你启动多个 Goroutine ,并等待它们全部完成,或者在任何一个 Goroutine 返回错误时立即取消所有其他 Goroutine。
- 在
errgroup
包的源码中,它主要使用了sync.WaitGroup
和context.Context
来实现多个goroutine的管理和错误处理。
- Semaphore
- go中的
semaphore
,提供sleep
和wakeup
原语,使其能够在其它同步原语中的竞争情况下使用。当一个goroutine
需要休眠时,将其进行集中存放,当需要wakeup
时,再将其取出,重新放入调度器中。 - 通过信号量来限制并行的
goroutine
数量,达到最大的maxWorkers
数量,Acquire
将会阻塞,直到其中一个goroutine
执行完成,释放出信号量。
- go中的
- SingleFlight
SingleFlight
提供了重复函数调用抑制机制,使用它可以避免同时进行相同的函数调用。第一个调用未完成时后续的重复调用会等待,当第一个调用完成时则会与它们分享结果,这样以来虽然只执行了一次函数调用但是所有调用都拿到了最终的调用结果。