Go信号量设计

软件发布|下载排行|最新软件

当前位置:首页IT学院IT技术

Go信号量设计

ag9920   2022-09-28 我要评论

开篇

在我们此前的文章 Golang Mutex 原理解析 中曾提到过,Mutex 的底层结构包含了两个字段,state 和 sema:

type Mutex struct {
    state int32 
    sema  uint32
}
  • state 代表互斥锁的状态,比如是否被锁定;
  • sema 表示信号量,协程阻塞会等待该信号量,解锁的协程释放信号量从而唤醒等待信号量的协程。

这个 sema 就是 semaphore 信号量的意思。Golang 协程之间的抢锁,实际上争抢给Locked赋值的权利,能给 Locked 置为1,就说明抢锁成功。抢不到就阻塞等待 sema 信号量,一旦持有锁的协程解锁,那么等待的协程会依次被唤醒。

有意思的是,虽然 semaphore 在锁的实现中起到了至关重要的作用,Golang 对信号量的实现却是隐藏在 runtime 中,并没有包含到标准库里来,在 src 源码中我们可以看到底层依赖的信号量相关函数。

// defined in package runtime
// Semacquire waits until *s > 0 and then atomically decrements it.
// It is intended as a simple sleep primitive for use by the synchronization
// library and should not be used directly.
func runtime_Semacquire(s *uint32)
// Semrelease atomically increments *s and notifies a waiting goroutine
// if one is blocked in Semacquire.
// It is intended as a simple wakeup primitive for use by the synchronization
// library and should not be used directly.
// If handoff is true, pass count directly to the first waiter.
// skipframes is the number of frames to omit during tracing, counting from
// runtime_Semrelease's caller.
func runtime_Semrelease(s *uint32, handoff bool, skipframes int)
  • runtime_Semacquire:阻塞等待直到 s 大于 0,然后立刻将 s 减去 1【原子操作】;
  • runtime_Semrelease:将 s 增加 1,然后通知一个阻塞在 runtime_Semacquire 的 goroutine【原子操作】。

两个原子操作,一个 acquire,一个 release,其实就代表了对资源的获取和释放。Mutex 作为 sync 包的核心,支撑了 RWMutex,channel,singleflight 等多个并发控制的能力,而对信号量的管理又是 Mutex 的基础。

虽然源码看不到,但 Golang 其实在扩展库 golang.org/x/sync/semaphore 也提供了一套信号量的实现,我们可以由此来参考一下,理解 semaphore 的实现思路。

信号量

在看源码之前,我们先理清楚【信号量】设计背后的场景和原理。

信号量的概念是荷兰计算机科学家 Edsger Dijkstra 在 1963 年左右提出来的,广泛应用在不同的操作系统中。在系统中,会给每一个进程一个信号量,代表每个进程目前的状态。未得到控制权的进程,会在特定的地方被迫停下来,等待可以继续进行的信号到来。

在 Mutex 依赖的信号量机制中我们可以看到,这里本质就是依赖 sema 一个 uint32 的变量 + 原子操作来实现并发控制能力。当 goroutine 完成对信号量等待时,该变量 -1,当 goroutine 完成对信号量的释放时,该变量 +1。

如果一个新的 goroutine 发现信号量不大于 0,说明资源暂时没有,就得阻塞等待。直到信号量 > 0,此时的语义是有新的资源,该goroutine就会结束等待,完成对信号量的 -1 并返回。注意我们上面有提到,runtime 支持的两个方法都是原子性的,不用担心两个同时在等待的 goroutine 同时抢占同一份资源。

典型的信号量场景是【图书馆借书】。假设学校图书馆某热门书籍现在只有 100 本存货,但是上万学生都想借阅,怎么办?

直接买一万本书是非常简单粗暴的解法,但资源有限,这不是长久之计。

常见的解决方案很简单:学生们先登记,一个一个来。我们先给 100 个同学发出,剩下的你们继续等,等到什么时候借书的同学看完了,把书还回来了,就给排队等待的同学们发放。同时,为了避免超发,每发一个,都需要在维护的记录里将【余量】减去 1,每还回来一个,就把【余量】加上 1。

runtime_Semacquire 就是排队等待借书,runtime_Semrelease 就是看完了把书归还给图书馆。

另外需要注意,虽然我们上面举例的增加/减小的粒度都是 1,但这本质上只是一种场景,事实上就算是图书馆借书,也完全有可能出现一个人同时借了两本一模一样的书。所以,信号量的设计需要支持 N 个资源的获取和释放。

所以,我们对于 acquire 和 release 两种操作的语义如下:

  • release: 将信号量增加 n【保证原子性】;
  • acquire: 若信号量 < n,阻塞等待,直到信号量 >= n,此时将信号量的值减去 n【保证原子性】。

semaphore 扩展库实现

这里我们结合golang.org/x/sync/semaphore 源码来看看怎样设计出来我们上面提到的信号量结构。

// NewWeighted creates a new weighted semaphore with the given
// maximum combined weight for concurrent access.
func NewWeighted(n int64) *Weighted {
	w := &Weighted{size: n}
	return w
}
// Weighted provides a way to bound concurrent access to a resource.
// The callers can request access with a given weight.
type Weighted struct {
	size    int64       // 最大资源数
	cur     int64       // 当前已被使用的资源
	mu      sync.Mutex
	waiters list.List   // 等待队列
}

有意思的是,虽然包名是 semaphore,但是扩展库里真正给【信号量结构体】定义的名称是 Weighted。从上面的定义我们可以看到,传入初始资源个数 n(对应 size),就可以生成一个 Weighted 信号量结构。

Weighted 提供了三个方法来实现对信号量机制的支持:

  • Acquire

对应上面我们提到的 acquire 语义,注意我们提到过,抽象的来讲,acquire 成功与否其实不太看返回值,而是只要获取不了就一直阻塞,如果返回了,就意味着获取到了。

但在 Golang 实现当中,我们肯定不希望,如果发生了异常 case,导致一直阻塞在这里。所以你可以看到 Acquire 的入参里有个 context.Context,借用 context 的上下文控制能力,你可以对此进行 cancel, 可以设置 timeout 等待超时,就能对 acquire 行为进行更多约束。

所以,acquire 之后我们仍然需要检查返回值 error,如果为 nil,代表正常获取了资源。否则可能是 context 已经不合法了。

  • Release

跟上面提到的 release 语义完全一致,传入你要释放的资源数 n,保证原子性地增加信号量。

  • TryAcquire

这里其实跟 sync 包中的各类 TryXXX 函数定位很像。并发的机制中大都包含 fast path 和 slow path,比如首个 goroutine 先来 acquire,那么一定是能拿到的,后续再来请求的 goroutine 由于慢了一步,只能走 slow path 进行等待,自旋等操作。sync 包中绝大部分精华,都在于 slow path 的处理。fast path 大多是一个基于 atomic 包的原子操作,比如 CAS 就可以解决。

TryAcquire 跟 Acquire 的区别在于,虽然也是要资源,但是不等待。有了我就获取,就减信号量,返回 trye。但是如果目前还没有,我不会阻塞在这里,而是直接返回 false。

下面我们逐个方法看看,Weighted 是怎样实现的。

Acquire

// Acquire acquires the semaphore with a weight of n, blocking until resources
// are available or ctx is done. On success, returns nil. On failure, returns
// ctx.Err() and leaves the semaphore unchanged.
//
// If ctx is already done, Acquire may still succeed without blocking.
func (s *Weighted) Acquire(ctx context.Context, n int64) error {
	s.mu.Lock()
	if s.size-s.cur >= n && s.waiters.Len() == 0 {
		s.cur += n
		s.mu.Unlock()
		return nil
	}
	if n > s.size {
		// Don't make other Acquire calls block on one that's doomed to fail.
		s.mu.Unlock()
		<-ctx.Done()
		return ctx.Err()
	}
	ready := make(chan struct{})
	w := waiter{n: n, ready: ready}
	elem := s.waiters.PushBack(w)
	s.mu.Unlock()
	select {
	case <-ctx.Done():
		err := ctx.Err()
		s.mu.Lock()
		select {
		case <-ready:
			// Acquired the semaphore after we were canceled.  Rather than trying to
			// fix up the queue, just pretend we didn't notice the cancelation.
			err = nil
		default:
			isFront := s.waiters.Front() == elem
			s.waiters.Remove(elem)
			// If we're at the front and there're extra tokens left, notify other waiters.
			if isFront && s.size > s.cur {
				s.notifyWaiters()
			}
		}
		s.mu.Unlock()
		return err
	case <-ready:
		return nil
	}
}

在阅读之前回忆一下上面 Weighted 结构的定义,注意 Weighted 并没有维护一个变量用来表示【当前剩余的资源】,这一点是通过 size(初始化的时候设置,表示总资源数)减去 cur(当前已被使用的资源),二者作差得到的。

我们来拆解一下上面这段代码:

第一步:这是常规意义上的 fast path

s.mu.Lock()
if s.size-s.cur >= n && s.waiters.Len() == 0 {
        s.cur += n
        s.mu.Unlock()
        return nil
}
  • 先上锁,保证并发安全;
  • 校验如果 size - cur >= n,代表剩余的资源是足够,同时 waiters 这个等待队列为空,代表没有别的协程在等待;
  • 此时就没什么多想的,直接 cur 加上 n 即可,代表又消耗了 n 个资源,然后解锁返回,很直接。

第二步:针对特定场景做提前剪枝

if n > s.size {
        // Don't make other Acquire calls block on one that's doomed to fail.
        s.mu.Unlock()
        <-ctx.Done()
        return ctx.Err()
}

如果请求的资源数量,甚至都大于资源总数量了,说明这个协程心里没数。。。。就算我现在把所有初始化的资源都拿回来,也喂不饱你呀!!!那能怎么办,我就不烦劳后面流程处理了,直接等你的 context 什么时候 Done,给你返回 context 的错误就行了,同时先解个锁,别耽误别的 goroutine 拿资源。

第三步:资源是够的,只是现在没有,那就把当前goroutine加到排队的队伍里

ready := make(chan struct{})
w := waiter{n: n, ready: ready}
elem := s.waiters.PushBack(w)
s.mu.Unlock()

这里 ready 结构是个空结构体的 channel,仅仅是为了实现协程间通信,通知什么时候资源 ready,建立一个属于这个 goroutine 的 waiter,然后塞到 Weighted 结构的等待队列 waiters 里。

搞定了以后直接解锁,因为你已经来排队了,手续处理完成,以后的路有别的通知机制保证,就没必要在这儿拿着锁阻塞新来的 goroutine 了,人家也得排队。

第四步:排队等待

select {
    case <-ctx.Done():
            err := ctx.Err()
            s.mu.Lock()
            select {
            case <-ready:
                    // Acquired the semaphore after we were canceled.  Rather than trying to
                    // fix up the queue, just pretend we didn't notice the cancelation.
                    err = nil
            default:
                    isFront := s.waiters.Front() == elem
                    s.waiters.Remove(elem)
                    // If we're at the front and there're extra tokens left, notify other waiters.
                    if isFront && s.size > s.cur {
                            s.notifyWaiters()
                    }
            }
            s.mu.Unlock()
            return err
    case <-ready:
            return nil
    }

一个 select 语句,只看两种情况:1. 这个 goroutine 的 context 超时了;2. 拿到了资源,皆大欢喜。

重点在于 ctx.Done 分支里 default 的处理。我们可以看到,如果是超时了,此时还没拿到资源,首先会把当前 goroutine 从 waiters 等待队列里移除(合情合理,你既然因为自己的原因做不了主,没法继续等待了,就别耽误别人事了)。

然后接着判断,若这个 goroutine 同时也是排在最前的 goroutine,而且恰好现在有资源了,就赶紧通知队里的 goroutine 们,伙计们,现在有资源了,赶紧来拿。我们来看看这个 notifyWaiters 干了什么:

func (s *Weighted) notifyWaiters() {
	for {
		next := s.waiters.Front()
		if next == nil {
			break // No more waiters blocked.
		}
		w := next.Value.(waiter)
		if s.size-s.cur < w.n {
			// Not enough tokens for the next waiter.  We could keep going (to try to
			// find a waiter with a smaller request), but under load that could cause
			// starvation for large requests; instead, we leave all remaining waiters
			// blocked.
			//
			// Consider a semaphore used as a read-write lock, with N tokens, N
			// readers, and one writer.  Each reader can Acquire(1) to obtain a read
			// lock.  The writer can Acquire(N) to obtain a write lock, excluding all
			// of the readers.  If we allow the readers to jump ahead in the queue,
			// the writer will starve — there is always one token available for every
			// reader.
			break
		}
		s.cur += w.n
		s.waiters.Remove(next)
		close(w.ready)
	}
}

其实很简单,遍历 waiters 这个等待队列,拿到排队最前的 waiter,判断资源够不够,如果够了,增加 cur 变量,资源给你,然后把你从等待队列里移出去,再 close ready 那个goroutine 就行,算是通知一下。

重点部分在于,如果资源不够怎么办?

想象一下现在的处境,Weighted 这个 semaphore 的确有资源,而目前要处理的这个 goroutine 的的确确就是排队最靠前的,而且人家也没狮子大开口,要比你总 size 还大的资源。但是,但是,好巧不巧,现在你要的数量,比我手上有的少。。。。

很无奈,那怎么办呢?

无非两种解法:

  • 我先不管你,反正你要的不够,我先看看你后面那个 goroutine 人家够不够,虽然你现在是排位第一个,但是也得继续等着;
  • 没办法,你排第一,需求我就得满足,所以我们都继续等,等啥时候资源够了就给你。

扩展库实际选用的是第 2 种策略,即一定要满足排在最前面的 goroutine,这里的考虑在注释里有提到,如果直接继续看后面的 goroutine 够不够,优先满足后面的,在一些情况下会饿死有大资源要求的 goroutine,设计上不希望这样的情况发生。

简单说:要的多不是错,既然你排第一,目前货不多,那就大家一起阻塞等待,保障你的权利。

Release

// Release releases the semaphore with a weight of n.
func (s *Weighted) Release(n int64) {
	s.mu.Lock()
	s.cur -= n
	if s.cur &lt; 0 {
		s.mu.Unlock()
		panic("semaphore: released more than held")
	}
	s.notifyWaiters()
	s.mu.Unlock()
}

Release 这里的实现非常简单,一把锁保障不出现并发,然后将 cur 减去 n 即可,说明此时又有 n 个资源回到了货仓。然后和上面 Acquire 一样,调用 notifyWaiters,叫排队第一的哥们(哦不,是 goroutine)来领东西了。

TryAcquire

// TryAcquire acquires the semaphore with a weight of n without blocking.
// On success, returns true. On failure, returns false and leaves the semaphore unchanged.
func (s *Weighted) TryAcquire(n int64) bool {
	s.mu.Lock()
	success := s.size-s.cur >= n && s.waiters.Len() == 0
	if success {
		s.cur += n
	}
	s.mu.Unlock()
	return success
}

其实就是 Acquire 方法的 fast path,只是返回了个 bool,标识是否获取成功。

总结

今天我们了解了扩展库 semaphore 对于信号量的封装实现,整体代码加上注释也才 100 多行,是非常好的学习材料,建议大家有空了对着源码再过一遍。Acquire 和 Release 的实现都很符合直觉。

其实,我们使用 buffered channel 其实也可以模拟出来 n 个信号量的效果,但就不具备 semaphore Weighted 这套实现里面,一次获取多个资源的能力了。

Copyright 2022 版权所有 软件发布 访问手机版

声明:所有软件和文章来自软件开发商或者作者 如有异议 请与本站联系 联系我们