锁
锁是多线程编程中的一种同步机制,用于控制对共享资源的访问。
悲观锁
悲观锁总是假设最坏的情况,认为共享资源每次被访问的时候就会出现问题(比如共享数据被修改),所以每次在获取资源操作的时候都会上锁,这样其他线程想拿到这个资源就会阻塞直到锁被上一个持有者释放。也就是说,共享资源每次只给一个线程使用,其它线程阻塞,用完后再把资源转让给其它线程。
乐观锁
乐观锁总是假设最好的情况,认为共享资源每次被访问的时候不会出现问题,线程可以不停地执行,无需加锁也无需等待,只是在提交修改的时候去验证对应的资源(也就是数据)是否被其它线程修改。
Go Mutex
Go sync 包提供了两种锁类型:互斥锁 sync.Mutex
、读写互斥锁 sync.RWMutex
。二者都属于悲观锁
底层数据结构
1
2
3
4
| type Mutex struct {
state int32
sema uint32
}
|
加锁
- 首先尝试使用原子操作快速获取锁,如果锁处于初始状态(未被锁定且无等待队列),则直接获取锁并返回,这是快速路径。
- 如果快速获取失败,则进入慢路径。在慢路径中,会判断锁是否已被锁定且可自旋。如果是,则尝试设置唤醒标志并进行自旋等待,即执行空循环一段时间,期望锁能够快速释放。
- 如果自旋等待后仍未获取锁,或者锁不满足自旋条件,则将当前 Goroutine 加入等待队列。根据锁的当前状态(正常模式或饥饿模式),设置相应的队列信息。
- 如果锁处于饥饿模式,当前 Goroutine 会直接阻塞,直到被唤醒并获取锁。如果处于正常模式,被唤醒的 Goroutine 需要与新到达的 Goroutine 竞争锁。
解锁
- 解锁时,首先使用原子操作更新锁的状态,将锁标记为未锁定,并根据需要更新等待队列信息。
- 如果有等待的 Goroutine,并且满足饥饿模式切换条件(如等待队列中只有一个等待者或等待时间较短),则将锁切换到正常模式。
- 唤醒等待队列中的一个 Goroutine,使其有机会获取锁。
自旋和阻塞
Goroutine 在没有获取到锁时有以下两种处理方式:
- 一种是没有获取到锁的线程就一直循环等待判断该资源是否已经释放锁,这种锁也叫做自旋锁,它不用将线程阻塞起来, 适用于并发低且程序执行时间短的场景,缺点是cpu占用较高
- 另外一种处理方式就是把自己阻塞起来,会释放CPU给其他线程,内核会将线程置为睡眠状态,等到锁被释放后,内核会在合适的时机唤醒该线程,适用于高并发场景,缺点是有线程上下文切换的开销
而进入自选态需要满足以下条件:
- 锁已被占用,并且锁不处于饥饿模式。
- 积累的自旋次数小于最大自旋次数(active_spin=4)。
- cpu 核数大于 1。
- 有空闲的 P。
- 当前 goroutine 所挂载的 P 下,本地待运行队列为空。
小结
- 在 Lock() 之前使用 Unlock() 会导致 panic 异常
- 使用 Lock() 加锁后,再次 Lock() 会导致死锁(不支持重入),需Unlock()解锁后才能再加锁
- 锁定状态与 goroutine 没有关联,一个 goroutine 可以 Lock,另一个 goroutine 可以 Unlock
Goroutine 抢锁模式
正常模式(非公平锁)
在刚开始的时候,是处于正常模式(Barging),也就是,当一个G1持有着一个锁的时候,G2会自旋的去尝试获取这个锁
当自旋超过4次还没有能获取到锁的时候,这个G2就会被加入到获取锁的等待队列里面,并阻塞等待唤醒
正常模式下,所有等待锁的 goroutine 按照 FIFO(先进先出)顺序等待。唤醒的goroutine 不会直接拥有锁,而是会和新请求锁的 goroutine 竞争锁。新请求锁的 goroutine 具有优势:它正在 CPU 上执行,而且可能有好几个,所以刚刚唤醒的 goroutine 有很大可能在锁竞争中失败,长时间获取不到锁,就会切换到饥饿模式
饥饿模式(公平锁)
当一个 goroutine 等待锁时间超过 1 毫秒时,它可能会遇到饥饿问题。 在版本1.9中,这种场景下Go Mutex 切换到饥饿模式(handoff),解决饥饿问题。回归正常模式满足两个条件任何一个:
- Goutine 的执行时间小于 1ms
- 等待队列已经清空
小结
对于两种模式,正常模式下的性能是最好的,goroutine 可以连续多次获取锁,饥饿模式解决了取锁公平的问题,但是性能会下降,其实是性能和公平的 一个平衡模式。
sync.RWMutex
底层数据结构
1
2
3
4
5
6
7
| type RWMutex struct {
w Mutex // 互斥锁
writerSem uint32 // 信号量,用于写等待读
readerSem uint32 // 信号量,用于读等待写
readerCount int32 // 当前执行读的 goroutine 数量
readerWait int32 // 被阻塞的准备读的 goroutine 的数量
}
|
读锁
1
2
| func (rw *RWMutex) RLock() // 加读锁
func (rw *RWMutex) RUnlock() // 释放读锁
|
- 先通过原子操作将readerCount加1
- 如果readerCount>=0就直接返回,所以如果只有获取读取锁的操作,那么其成本只有一个原子操作
- 当readerCount<0时,说明当前有写锁,当前协程将借助信号量陷入等待状态,如果获取到信号量则直接退出,没有获取到信号量时的逻辑与互斥锁的逻辑相似
- 读锁解锁时,如果当前没有写锁,则其成本只有一个原子操作并直接退出
- 如果当前有写锁正在等待,则调用rUnlockSlow判断当前是否为最后一个被释放的读锁,如果是则需要
增加信号量并唤醒写锁
写锁
1
2
| func (rw *RWMutex) Lock() // 加写锁
func (rw *RWMutex) Unlock() // 释放写锁
|
- 写锁申请时必须先获取互斥锁,因为它复用了互斥锁的功能。接着readerCount减去
rwmutexMaxReaders阻止后续读操作
- 获取到互斥锁并不一定能直接获取写锁,如果当前已经有其它Goroutine持有互斥锁的读锁,那么当前
协程会加入全局等待队列并进入休眠状态,当最后一个读锁被释放时,会唤醒该协程
- 解锁时,调用Unlock方法,将readerCount加上rwmutexMaxReader,表示不会阻塞后序的读锁,依次
唤醒所有等待中的读锁,当所有的读锁唤醒完毕后会释放互斥锁
小结
- 读锁或写锁在 Lock() 之前使用 Unlock() 会导致 panic 异常
- 使用 Lock() 加锁后,再次 Lock() 会导致死锁(不支持重入),需Unlock()解锁后才能再加锁
- 锁定状态与 goroutine 没有关联,一个 goroutine 可以 RLock(Lock),另一个 goroutine 可以 RUnlock(Unlock)
递归锁
可重入锁又称为递归锁,是指在同一个线程在外层方法获取锁的时候,在进入该线程的内层方法时会自动获取锁,不会因为之前已经获取过还没释放再次加锁导致死锁。
Go 里面的 Mutex 不是可重入的锁。Mutex 的实现中没有记录哪个 goroutine 拥有这把锁,理论上,任何 goroutine 都可以随意地 Unlock 这把锁,所以没办法计算重入条件,并且 Mutex 重复 Lock 会导致死锁。
如果要实现递归锁,需要以下两个条件:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
| package main
import (
"bytes"
"fmt"
"runtime"
"strconv"
"sync"
"sync/atomic"
)
type ReentrantLock struct {
sync.Mutex
recursion int32 // 这个goroutine 重入的次数
owner int64 // 当前持有锁的goroutine id
}
// Get returns the id of the current goroutine.
func GetGoroutineID() int64 {
var buf [64]byte
var s = buf[:runtime.Stack(buf[:], false)]
s = s[len("goroutine "):]
s = s[:bytes.IndexByte(s, ' ')]
gid, _ := strconv.ParseInt(string(s), 10, 64)
return gid
}
func NewReentrantLock() sync.Locker {
res := &ReentrantLock{
Mutex: sync.Mutex{},
recursion: 0,
owner: 0,
}
return res
}
// ReentrantMutex 包装一个Mutex,实现可重入
type ReentrantMutex struct {
sync.Mutex
owner int64 // 当前持有锁的goroutine id
recursion int32 // 这个goroutine 重入的次数
}
func (m *ReentrantMutex) Lock() {
gid := GetGoroutineID()
// 如果当前持有锁的goroutine就是这次调用的goroutine,说明是重入
if atomic.LoadInt64(&m.owner) == gid {
m.recursion++
return
}
m.Mutex.Lock()
// 获得锁的goroutine第一次调用,记录下它的goroutine id,调用次数加1
atomic.StoreInt64(&m.owner, gid)
m.recursion = 1
}
func (m *ReentrantMutex) Unlock() {
gid := GetGoroutineID()
// 非持有锁的goroutine尝试释放锁,错误的使用
if atomic.LoadInt64(&m.owner) != gid {
panic(fmt.Sprintf("wrong the owner(%d): %d!", m.owner, gid))
}
// 调用次数减1
m.recursion--
if m.recursion != 0 { // 如果这个goroutine还没有完全释放,则直接返回
return
}
// 此goroutine最后一次调用,需要释放锁
atomic.StoreInt64(&m.owner, -1)
m.Mutex.Unlock()
}
func main() {
var mutex = &ReentrantMutex{}
mutex.Lock()
mutex.Lock()
fmt.Println(111)
mutex.Unlock()
mutex.Unlock()
}
|
Go 原子操作
Go atomic 包是最轻量级的锁(也称无锁结构),可以在不形成临界区和创建互斥量的情况下完成并发安全的值替换操作,支持 int32/int64/uint32/uint64/uintptr
的基础操作(增减、交换、载入、存储等)。
Add 操作
1
2
3
4
5
| func AddInt32(addr *int32, delta int32) (new int32)
func AddInt64(addr *int64, delta int64) (new int64)
func AddUint32(addr *uint32, delta uint32) (new uint32)
func AddUint64(addr *uint64, delta uint64) (new uint64)
func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)
|
Load 操作
1
2
3
4
5
6
7
8
| func LoadInt32( addr *int32)(val int32)
func LoadInt64(addr *int64)(val int64)
func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer)
func LoadUint32( addr *uint32)(val uint32)
func LoadUint64(addr *uint64) (val uint64)
func LoadUintptr(addr *uintptr) (val uintptr)
//特殊类型:Value类型,常用于配置变更
func (v *Value) Load() (x interface{}){}
|
CAS 操作
1
2
3
4
5
6
| func CompareAndSwapInt32(addr *int32,old,new int32)(swapped bool)
func CompareAndSwapInt64(addr *int64,old,new int64) (swapped bool)
func CompareAndSwapPointer(addr *unsafe.Pointer,old,new unsafe.Pointer) (swapped bool)
func CompareAndSwapUint32(addr *uint32,old,new uint32)(swapped bool)
func CompareAndSwapUint64(addr *uint64,old,new uint64) (swapped bool)
func CompareAndSwapUintptr(addr *uintptr,old,new uintptr) (swapped bool)
|
Swap 操作
1
2
3
4
5
6
| func SwapInt32(addr *int32, new int32) (old int32)
func SwapInt64(addr *int64, new int64) (old int64)
func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)
func SwapUint32(addr *uint32, new uint32) (old uint32)
func SwapUint64(addr *uint64, new uint64) (old uint64)
func SwapUintptr(addr *uintptr, new uintptr) (old uintptr)
|
Store 操作
1
2
3
4
5
6
7
8
| func StoreInt32(addr *int32, val int32)
func StoreInt64(addr *int64, val int64)
func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)
func StoreUint32(addr *uint32, val uint32)
func StoreUint64(addr *uint64, val uint64)
func StoreUintptr(addr *uintptr, val uintptr)
// 特殊类型: Value类型,常用于配置变更
func (v *Value) Store(x interface{})
|
原子操作 VS 锁
| 原子操作 | 锁 |
---|
实现原理 | 底层硬件支持 | 原子操作 + 信号量 |
范围 | 单个指令 | 临界区(多条指令) |
性能 | 性能较高 | 性能较低 |
类别 | 乐观锁 | 悲观锁 |