Go同步原语 互斥锁Mutex源码分析

Mutex,即互斥锁,是由 Mutual Exclusion 的前缀组成。它是 Go 语言在 sync 包下提供的最基本的同步原语之一,位于 src/sync/mutex.go 文件中。

1. Mutex 结构体的定义

// A Mutex is a mutual exclusion lock.
// The zero value for a Mutex is an unlocked mutex.
//
// A Mutex must not be copied after first use.
type Mutex struct {
    state int32 // 4字节
    sema  uint32 // 4字节
}

Mutex 结构体有 state 和 sema 两个字段组成,共8字节。其中 state 表示锁的状态,sema 是信号量,用于管理等待队列。

使用 Mutex 无需格外初始化,state 默认值为 0,表示处于解锁状态。

另外,Mutex 锁实现了 Locker 接口。

// A Locker represents an object that can be locked and unlocked.
type Locker interface {
    Lock()
    Unlock()
}

Locker 方法中 Lock 用于加锁,Unlock 用于解锁。

2. Mutex 的状态模式

出于加锁的公平性考虑,Mutex 分为 正常模式饥饿模式 两种。

1) 正常模式

在正常模式下,等待者 waiter 会进入到一个 FIFO(先进先出) 队列,在获取锁时 waiter 会按照先进先出的顺序获取。

当唤醒一个 waiter 时它不会立即获取锁,而是要与新来的 goroutine 进行竞争。这种情况下新来的 goroutine 具有优势,因为它已经运行在 CPU 上,而且这种新来的 goroutine 数量可能不止一个,所以唤醒的 waiter 大概率下获取不到锁。如果唤醒的 waiter 依旧获取不到锁的情况,那么它会被添加到队列的前面。

如果 waiter 获取不到锁的时间超出了1 毫秒,当前状态将被切换为饥饿模式。

新来一个goroutine 时会尝试一次获取锁,如果获取不到我们就视其为watier,并将其添加到FIFO队列里。

2) 饥饿模式

在正常模式下,每次新来的 goroutine 都可能会抢走锁,就这会导致等待队列中的 waiter 可能永远也获取不到锁,从而产生饥饿问题。所以,为了应对高并发抢锁场景下的公平性,官方引入了饥饿模式。

在饥饿模式下,锁会直接交给队列最前面的 waiter。新来的 goroutine 即使在锁未被持有情况下,也不会参与竞争锁,同时也不会进行自旋等待,而直接将添加到队列的尾部。

如果拥有锁的 waiter 发现有以下两种情况,它将切换回正常模式:

  • 它是等待队列里的最后一个 waiter,再也没有其它 waiter
  • 它等待的时间小于1毫秒

3) 两种模式的比较

正常模式拥有更好的性能,因为新来的运行在 CPU 的 goroutine 优先获得锁,省去了 goroutine 的切换开销。

饥饿模式优先处理那些一直在等待的 waiter,避免了某些 goroutine 长时间的等待锁。

两种模式在满足一定条件下,会相互切换。这种切换机制是对公平性和性能的一种平衡。

3.  Mutex state

Mutex 结构体中 state 字段,用于表示当前锁的状态。它是一个复合型的字段,即一个字段包含多个意义,这样节省内存。state 共分四个部分,其中低三位分别表示 mutexed、mutexWoken 和 mutexStarving,剩余位表示当前等待锁的 goroutine 数量。

互斥锁Mutex

在默认情况下,互斥锁的所有状态位都是0, 不同的位表示了不同的状态

  • mutexLocked 表示锁定状态;
  • mutexWoken 表示唤醒状态;
  • mutexStarving 表示饥饿状态;
  • mutexWaiters 表示等待加锁的 waiter 的个数,最大允许记录 1<<(32-3) -1 个 goroutine。

4. Mutex 实现原理

Mutex 锁相关的常量定义:

const (
    mutexLocked = 1 << iota // mutex is locked
    mutexWoken // 2 二进制 0010
    mutexStarving // 4 二进制 0100
    mutexWaiterShift = iota // 3
    starvationThresholdNs = 1e6 // 1毫秒,正常模式和饥饿模式切换的阈值
)

其中前四个常量,会通过位运算改变当前锁的状态。

Mutex 加锁与解锁方法中都包含两个主要步骤:快速途径 (fast path) 和 慢速途径 (slow path) 。

1) 加锁

加锁方法对应的是 Lock() 方法,其中还有一个私有方法 lockSlow()。

// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
    // Fast path: grab unlocked mutex.
    // 快速路径:锁未被任何 goroutin 持有,那么直接持有锁
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        if race.Enabled {
            race.Acquire(unsafe.Pointer(m))
        }
        return
    }
    // Slow path (outlined so that the fast path can be inlined)
    // 慢速路径:尝试自旋竞争,或者饥饿状态下竞争
    m.lockSlow()
}

Lock 方法先使用 fast path 快速路径,如果能够获取锁就直接返回,加锁完成,过程结束。

如果获取不到锁,那么就走 slow path 慢速路径,执行 lockSlow() 函数。

func (m *Mutex) lockSlow() {
    var waitStartTime int64 // 当前 waiter 开始等待时间
    starving := false // 当前饥饿状态
    awoke := false // 当前唤醒状态
    iter := 0 // 当前自旋次数
    old := m.state // 当前锁的状态
    for {
        // Don't spin in starvation mode, ownership is handed off to waiters
        // so we won't be able to acquire the mutex anyway.
// 在饥饿模式下不需要自旋,直接将锁移交给队列头部的 waiter,因此新来的 goroutine 永远不会获取锁 // 在正常模式下,锁已经被其它 goroutine 持有,而且当前允许自旋, 则先尝试进行自旋 if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { // Active spinning makes sense. // Try to set mutexWoken flag to inform Unlock // to not wake other blocked goroutines.
           // 当前 goroutine 未设置唤醒状态,锁的原状态中没有被唤醒的 goroutine,而且等待队列非空
           if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true // 设置当前 goroutine 唤醒成功,锁的原状态 state 设置唤醒标识为 1 } runtime_doSpin() // 自旋 iter++ // 当前自旋次数+1 old = m.state // 当前 goroutine 更新锁的当前状态,因为在自旋过程中,可能被别的 goroutine 改变了状态
           continue // 继续尝试自选 } ...... } ...... }

如果当前锁被其它goroutine持有(低一位为1)且处于正常模式(低三位为0),且当前还允许自旋则进行自旋操作。

重点介绍这下这块的位运算逻辑,mutexLocked 和 mutexStarving 这两个位分别代表了锁 是否被持有 和 饥饿状态 ,它们的二进制值表示 0001和 0100。

判断条件 old&(mutexLocked|mutexStarving) == 0001 可以转化为

old & (0001 | 0100)
old & 0101

如果 old & 0101 = 0001 ,由此计算得知 old 的值必须是低一位为1,低三位为0。

然后通过函数 runtime_canSpin() 判断是否可以自旋。

// src/runtime/proc.go

// Active spinning for sync.Mutex.
//go:linkname sync_runtime_canSpin sync.runtime_canSpin
//go:nosplit
func sync_runtime_canSpin(i int) bool {
    // sync.Mutex is cooperative, so we are conservative with spinning.
    // Spin only few times and only if running on a multicore machine and
    // GOMAXPROCS>1 and there is at least one other running P and local runq is empty.
    // As opposed to runtime mutex we don't do passive spinning here,
    // because there can be work on global runq or on other Ps.
    if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
        return false
    }
    if p := getg().m.p.ptr(); !runqempty(p) {
        return false
    }
    return true
}

要想实现自旋,必须符合以下条件:

  • 自旋的次数 active_spin < 4;
  • CPU 必须为多核处理器,单核处理器没必要自旋;
  • 当前程序中设置的 gomaxprocs 个数 >(空闲P个数 + 当前处于自旋m的个数 + 1)
  • 当前运行 G 的 P 的本地运行队列为空

如果当前条件同时也满足自旋的条件,则通过 CAS 设置 mutexWoken 标记以通知解锁(位运算),并将唤醒变量 awoke 设置为 true。

我们看下这块的位运算逻辑,判断条件共有四个

  • !awoke 表示waiter 处于未唤醒状态
  • old&mutexWoken == 0 表示未唤醒状态。old & 0010 = 0 ,则表示低第二位值为0表示未唤醒状态
  • old>>mutexWaiterShift != 0 即 m.state >> 3 的值不等于0,则说明当前 waitersCount > 0, 表示当前存在等待释放锁的 goroutine
  • atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) 设置唤醒位的值为1。如0001 | 0010 = 0011

调用函数 runtime_doSpin() 执行自旋,并更新自旋次数,同步新的状态 m.state。此函数内部会执行30次的 PAUSE 指令。

自旋逻辑结束后,会根据当前 m.state 最新值进行一些处理。

func (m *Mutex) lockSlow() {
    for {
        ......

        // 当前goroutine的 m.state 新状态
        new := old
        // Don't try to acquire starving mutex, new arriving goroutines must queue.
        // 如果当前处于正常模式,则加锁
        if old&mutexStarving == 0 {
            new |= mutexLocked
        }

        // 当前处于饥饿模式,则更新waiters数量 +1
        if old&(mutexLocked|mutexStarving) != 0 {
            new += 1 << mutexWaiterShift
        }

        // The current goroutine switches mutex to starvation mode.
        // But if the mutex is currently unlocked, don't do the switch.
        // Unlock expects that starving mutex has waiters, which will not
        // be true in this case.
        // 当前goroutine处于饥饿状态且锁被其它goroutine持有,新状态则更新锁为饥饿模式
        if starving && old&mutexLocked != 0 {
            new |= mutexStarving
        }
        // 当前goroutine的waiter被唤醒,则重置flag
        if awoke {
            // The goroutine has been woken from sleep,
            // so we need to reset the flag in either case.
            // 唤醒状态不一致,直接抛出异常
            if new&mutexWoken == 0 {
                throw("sync: inconsistent mutex state")
            }
            // 新状态清除唤醒标记
            new &^= mutexWoken
        }

        // CAS更新 m.state 状态成功
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            // 锁已被释放且为正常模式
            if old&(mutexLocked|mutexStarving) == 0 {
                // 通过 CAS 函数获取了锁,直接中止返回
                break // locked the mutex with CAS
            }

            // If we were already waiting before, queue at the front of the queue.
            // waitStartTime != 0 说明当前已处于等待状态
            queueLifo := waitStartTime != 0
            // 首次设置当前goroutine的开始等待时间
            if waitStartTime == 0 {
                waitStartTime = runtime_nanotime()
            }

            // queueLifo为true,说明已经等待了一会,本次循环则直接将waiter添加到等待队列的头部,使用信号量
            runtime_SemacquireMutex(&m.sema, queueLifo, 1)

            // 如果当前goroutine的等待时间>1毫秒则视为饥饿状态
            starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
            old = m.state

            // 如果处于饥饿状态(有可能等待时间>1毫秒)
            if old&mutexStarving != 0 {
                if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                    throw("sync: inconsistent mutex state")
                }

                // 加锁并且将waiter数减1(暂时未理解这块)
                delta := int32(mutexLocked - 1<<mutexWaiterShift)

                // 当前goroutine非饥饿状态 或者 等待队列只剩下一个waiter,则退出饥饿模式(清除饥饿标识位)
                if !starving || old>>mutexWaiterShift == 1 {
                    delta -= mutexStarving
                }
                // 更新状态值并中止for循环
                atomic.AddInt32(&m.state, delta)
                break
            }

            // 设置当前goroutine为唤醒状态,且重置自璇次数
            awoke = true
            iter = 0
        } else {
            old = m.state
        }
    }

}

可以看到主要实现原来就是通过一个for循环实现的,正常模式下可能发生spinning,而允许自旋必须有四个条件,最多允许有四次spinning机会,否则将转为饥饿模式。饥饿模式下,需要对waiter数据进行累加。而当队列里只剩下一个它自己一个waiter的时候,会恢复为正常模式。每次是计算出了新的状态值new,下面通过 cas 实现更新状态,如果更新失败,则读取新的锁状态m.state并开始新一轮的for循环逻辑。这里的逻辑比较复杂不是太容易理解。

以上是加锁的过程,下面我们再看下解锁的过程.

2) 解锁

解锁对应的方法为 Unlock(),同时对应的私有方法为 unlockSlow(),相比加锁代码要简单的太多了。

// Unlock unlocks m.
// It is a run-time error if m is not locked on entry to Unlock.
//
// A locked Mutex is not associated with a particular goroutine.
// It is allowed for one goroutine to lock a Mutex and then
// arrange for another goroutine to unlock it.
func (m *Mutex) Unlock() {
    ......

    // Fast path: drop lock bit.
    new := atomic.AddInt32(&m.state, -mutexLocked)

    // 如果 new=0 表示恢复了锁的默认初始化状态,否则表示锁仍在使用
    if new != 0 {
        // Outlined slow path to allow inlining the fast path.
        // To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
        m.unlockSlow(new)
    }
}

这里要注意一下,对于Mutexq锁来说,一个goroutine里进行加锁,在其它goroutine是可以实现解锁的,但不要重复解锁,否则可能会触发panic。一般是哪个goroutine加锁,就由那个goroutine来解锁。

解锁大致流程和加锁差不多,先是执行fast path 原子更新,如果失败则执行 slow path 过程。

func (m *Mutex) unlockSlow(new int32) {
    // 未加锁状态,直接解锁出错
    if (new+mutexLocked)&mutexLocked == 0 {
        throw("sync: unlock of unlocked mutex")
    }

    // 正常模式 (当前m.state & mutexStarving ==0,则说明 m.state 的 mutexStarving 位是0)
    if new&mutexStarving == 0 {
        old := new
        for {
            // If there are no waiters or a goroutine has already
            // been woken or grabbed the lock, no need to wake anyone.
            // In starvation mode ownership is directly handed off from unlocking
            // goroutine to the next waiter. We are not part of this chain,
            // since we did not observe mutexStarving when we unlocked the mutex above.
            // So get off the way.

            // 如果当前队列里没有waiters 或 当前goroutine已经唤醒 或 持有了锁,则不需要唤醒其它waiter
            // 在饥饿模式下,锁控制权直接交给下一个waiter
            // 如果 当前队列没有waiter(old>>mutexWaiterShift == 0) 或 (锁为被持有状态、唤醒状态、饥饿状态其中条件之一),则直接返回
            if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                return
            }
            // Grab the right to wake someone.
            // 这里 old-1<<mutexWaiterShift 表示 waiters 数量-1,mutexWoken 表示当前goroutine设置新值唤醒位值为唤醒状态,然后再通过CAS更新m.state,如果更新失败,说明当前m.state 已被其它goroutine修改过了,然后它再重新读取m.state的值,开始新一轮的for循环
            new = (old - 1<<mutexWaiterShift) | mutexWoken
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
                // 抢锁控制权成功(解锁成功)
                runtime_Semrelease(&m.sema, false, 1)
                return
            }

            // 重新读取m.state值,开始新一轮判断
            old = m.state
        }
    } else {
        // Starving mode: handoff mutex ownership to the next waiter, and yield
        // our time slice so that the next waiter can start to run immediately.
        // Note: mutexLocked is not set, the waiter will set it after wakeup.
        // But mutex is still considered locked if mutexStarving is set,
        // so new coming goroutines won't acquire it.

        // 饥饿模式,将当前锁控制权直接交给下一个waiter
        runtime_Semrelease(&m.sema, true, 1)
    }
}

解锁源码比较好理解,对于slow path 而言

  • 饥饿模式直接调用函数runtime_Semrelease(),通过信号量将锁控制权交给下一个waiter。
  • 正常模式下分以下情况
    • 如果等待队列里没有 waiter 或 锁为 被持有状态、唤醒状态、饥饿状态三者其中条件之一,则直接返回并结束处理逻辑;
    • 当前goroutine抢锁的控制权。先读取m.state的值,waiters数量减少1,并修改状态为唤醒标记,最后通过CAS修改m.state,如果修改成功则表示抢锁控制权成功,即解锁成功,则直接结束
    • 否则重新读取m.state 的值,for 循环新一轮的逻辑

总结

  • 锁模式分为正常模式和饥饿模式。正常模式下新来的 goroutine 与 waiter 竞争锁,且新来的 goroutine 大概率优先获取锁;饥饿模式下队列头部的 waiter 获取锁,新来的 goroutine 直接进入 waiter 队列,同时也不会自旋。
  • 饥饿模式下,拥有锁的 waiter 当发现它是队列中的最后一个 waiter 或者等待时间<1毫秒时,将自动切换为正常模式。
  • 在正常模式下,新来的 goroutine 如果获取不到锁,则将尝试 spinning。
  • 在加锁和解锁是分 fast path 和 slow path 两种路径,加锁时执行 runtime_SemacquireMutex() 函数,解锁时执行对应的 runtime_Semrelease() 函数。

CSP 并发模型是上个世纪七十年代提出的,用于描述两个独立的并发实体通过共享的通讯管道 channel 进行通信的并发模型,它不同于传统的并发实体通过共享内存来通信。CSP 的基本原则是 “通过通信来共 ...