sync.Cond
cond是condition的缩写,意为条件。sync.Cond有阻塞与结束阻塞的功能。结束阻塞可以分为两种方法,一种是调用Signal(),另一种是调用Broadcast()。前者会随机唤醒一个使用其Wait()方法阻塞的协程,后者会唤醒所有使用其Wait()方法阻塞的协程。
// Cond implements a condition variable, a rendezvous point
// for goroutines waiting for or announcing the occurrence
// of an event.
// Cond 实现了一个条件变量,一个等待或宣布事件发生的 goroutines 的集合点。
// Each Cond has an associated Locker L (often a *Mutex or *RWMutex),
// which must be held when changing the condition and
// when calling the Wait method.
// 每个 Cond 都有一个关联的 Locker L(通常是互斥或 RWMutex),在更改条件和调用 Wait 方法时必须保留该 L
// A Cond must not be copied after first use.
// Cond 在首次使用后不得复制。
// In the terminology of the Go memory model, Cond arranges that
// a call to Broadcast or Signal “synchronizes before” any Wait call
// that it unblocks.
// 在 Go 内存模型的术语中,Cond 安排对广播或信号的调用在它取消阻止的任何 Wait 调用之前“同步”。
// For many simple use cases, users will be better off using channels than a
// Cond (Broadcast corresponds to closing a channel, and Signal corresponds to
// sending on a channel).
// 对于许多简单的用例,用户使用通道比使用Cons更好(广播对应于关闭通道,信号对应于在通道上发送)。
// For more on replacements for sync.Cond, see [Roberto Clapis's series on
// advanced concurrency patterns], as well as [Bryan Mills's talk on concurrency
// patterns].
// 有关替换同步的更多信息。Cond,参见[Roberto Clapis关于高级并发模式的系列],以及[Bryan Mills关于并发模式的演讲]
// [Roberto Clapis's series on advanced concurrency patterns]: https://blogtitle.github.io/categories/concurrency/
// [Bryan Mills's talk on concurrency patterns]: https://drive.google.com/file/d/1nPdvhB0PutEJzdCq5ms6UI58dp50fcAN/view
Signal(),和Broadcast()应发生在Wait()之前
Cond结构
以下源码若为标注出处,则均来自src/sync/cond.go
type Cond struct {
noCopy noCopy
// L is held while observing or changing the condition
L Locker
notify notifyList
checker copyChecker
}
noCopy:检测是否拷贝过的标记
// noCopy may be added to structs which must not be copied
// after the first use.
//
// See https://golang.org/issues/8005#issuecomment-190753527
// for details.
//
// Note that it must not be embedded, due to the Lock and Unlock methods.
type noCopy struct{
}
// Lock is a no-op used by -copylocks checker from `go vet`.
func (*noCopy) Lock() {
}
func (*noCopy) Unlock() {
}
L:Locker类型的接口,可以接收实现该接口的所有类型,如Mutex、RWMutex
src/sync/mutex.go
// A Locker represents an object that can be locked and unlocked.
type Locker interface {
Lock()
Unlock()
}
notify:维护一个通知列表
src/sync/runtime2.go
// Approximation of notifyList in runtime/sema.go. Size and alignment must
// agree.
type notifyList struct {
wait uint32
notify uint32
lock uintptr // key field of the mutex
head unsafe.Pointer
tail unsafe.Pointer
}
checker:检测Cond结构体是否发生过拷贝的对象
copyChecker 8字节数值指针类型,copyChecker.check(),检查该接收者是否发生过拷贝,在第一次调用时赋地址值,之后调用则检测该地址是否发生变化
// copyChecker holds back pointer to itself to detect object copying.
type copyChecker uintptr // uint指针类型,保存地址
func (c *copyChecker) check() {
// 检查地址是否发生改变,从而判断是否发生复制
if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&
!atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
uintptr(*c) != uintptr(unsafe.Pointer(c)) {
panic("sync.Cond is copied")
}
}
NewCond 创建Cond,传入一把锁
// NewCond returns a new Cond with Locker l.
func NewCond(l Locker) *Cond {
return &Cond{
L: l}
}
Cond.Wait 阻塞所在的goroutine
技巧:Wait()应被锁保护
// Wait atomically unlocks c.L and suspends execution
// of the calling goroutine. After later resuming execution,
// Wait locks c.L before returning. Unlike in other systems,
// Wait cannot return unless awoken by Broadcast or Signal.
// 等待以原子方式解锁 c.L 并暂停调用 goroutine的执行。稍后恢复执行后,Wait 在返回之前锁定 c.L。与其他系统不同,除非被广播或信号唤醒,否则等待无法返回。
// Because c.L is not locked when Wait first resumes, the caller
// typically cannot assume that the condition is true when
// Wait returns. Instead, the caller should Wait in a loop:
// 由于 c.L 在 Wait 首次恢复时未锁定,因此调用方通常无法假定 Wait 返回时条件为 true。相反,调用方应该在循环中等待:
// c.L.Lock()
// for !condition() {
// c.Wait()
// }
// ... make use of condition ... 使用条件
// c.L.Unlock()
func (c *Cond) Wait() {
c.checker.check() // copy 检查
t := runtime_notifyListAdd(&c.notify) // 向通知列表列表中加入该通知
c.L.Unlock() // 暂时解锁
runtime_notifyListWait(&c.notify, t) //通知操作
c.L.Lock() //加锁,还原状态
}
Cond.Signal 随机通知一个被Cond.Wait阻塞的协程结束阻塞
// Signal wakes one goroutine waiting on c, if there is any.
//
// It is allowed but not required for the caller to hold c.L
// during the call.
// 信号唤醒一个等待 c 的 goroutine,如果有的话。允许但不是要求调用方在呼叫期间保持 c.L。
// Signal() does not affect goroutine scheduling priority; if other goroutines
// are attempting to lock c.L, they may be awoken before a "waiting" goroutine.
// Signal() 不影响 goroutine 调度优先级;如果其他 goroutines 试图锁定 c.L,它们可能会在“等待”的 goroutines 之前被唤醒。
func (c *Cond) Signal() {
c.checker.check() // copy检查
runtime_notifyListNotifyOne(&c.notify) //随机通知一个被Wait阻塞的协程,取消阻塞
}
Cond.Broadcast 通知所有被Cond.Wait阻塞的协程结束阻塞
// Broadcast wakes all goroutines waiting on c.
//
// It is allowed but not required for the caller to hold c.L
// during the call.
func (c *Cond) Broadcast() {
c.checker.check() // copy 检查
runtime_notifyListNotifyAll(&c.notify) //通知所有阻塞协程
}
src/runtme/sema.go notifyListAdd() 通知列表
// notifyListAdd adds the caller to a notify list such that it can receive
// notifications. The caller must eventually call notifyListWait to wait for
// such a notification, passing the returned ticket number.
//
//go:linkname notifyListAdd sync.runtime_notifyListAdd
func notifyListAdd(l *notifyList) uint32 {
// This may be called concurrently, for example, when called from
// sync.Cond.Wait while holding a RWMutex in read mode.
return atomic.Xadd(&l.wait, 1) - 1
}
src/runtime/sema.go notifyListNotifyAll()
// notifyListNotifyAll notifies all entries in the list.
//
//go:linkname notifyListNotifyAll sync.runtime_notifyListNotifyAll
func notifyListNotifyAll(l *notifyList) {
// Fast-path: if there are no new waiters since the last notification
// we don't need to acquire the lock.
if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
return
}
// Pull the list out into a local variable, waiters will be readied
// outside the lock.
lockWithRank(&l.lock, lockRankNotifyList)
s := l.head
l.head = nil
l.tail = nil
// Update the next ticket to be notified. We can set it to the current
// value of wait because any previous waiters are already in the list
// or will notice that they have already been notified when trying to
// add themselves to the list.
atomic.Store(&l.notify, atomic.Load(&l.wait))
unlock(&l.lock)
// Go through the local list and ready all waiters.
for s != nil {
next := s.next
s.next = nil
readyWithTime(s, 4)
s = next
}
}
src/runtime/sema.go notifyListNotifyOne()
// notifyListNotifyOne notifies one entry in the list.
//
//go:linkname notifyListNotifyOne sync.runtime_notifyListNotifyOne
func notifyListNotifyOne(l *notifyList) {
// Fast-path: if there are no new waiters since the last notification
// we don't need to acquire the lock at all.
if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
return
}
lockWithRank(&l.lock, lockRankNotifyList)
// Re-check under the lock if we need to do anything.
t := l.notify
if t == atomic.Load(&l.wait) {
unlock(&l.lock)
return
}
// Update the next notify ticket number.
atomic.Store(&l.notify, t+1)
// Try to find the g that needs to be notified.
// If it hasn't made it to the list yet we won't find it,
// but it won't park itself once it sees the new notify number.
//
// This scan looks linear but essentially always stops quickly.
// Because g's queue separately from taking numbers,
// there may be minor reorderings in the list, but we
// expect the g we're looking for to be near the front.
// The g has others in front of it on the list only to the
// extent that it lost the race, so the iteration will not
// be too long. This applies even when the g is missing:
// it hasn't yet gotten to sleep and has lost the race to
// the (few) other g's that we find on the list.
for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
if s.ticket == t {
n := s.next
if p != nil {
p.next = n
} else {
l.head = n
}
if n == nil {
l.tail = p
}
unlock(&l.lock)
s.next = nil
readyWithTime(s, 4)
return
}
}
unlock(&l.lock)
}
一个用广播通知所有消费者开始工作的例子
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
func consumer(cond *sync.Cond, id int) {
cond.L.Lock()
cond.Wait()
rand.Seed(time.Now().UnixNano())
fmt.Println("消费者协程", id, "执行开始")
time.Sleep(time.Second * time.Duration(rand.Intn(3)))
// do something
cond.L.Unlock()
fmt.Println("消费者协程", id, "执行结束")
}
func main() {
lock := new(sync.Mutex)
cond := sync.NewCond(lock)
for i := 0; i < 10; i++ {
go consumer(cond, i)
}
time.Sleep(time.Second)
// cond.Signal() //通知一个等待的协程结束阻塞
cond.Broadcast() //通知所有等待的协程结束阻塞
time.Sleep(time.Second * 30)
fmt.Println("执行结束")
}
转载:https://blog.csdn.net/dawnto/article/details/128743645
查看评论