only has compared to the others early, diligently diligently, can feel the successful taste。
“高并发 高性能 高可用”一直以来作为搬砖界用力搬砖的口号。由于CPU一次读取存储数据的长度有限,比如32bit的平台修改int64需要被拆分成两次写操作,更何况对于结构体的赋值,那么对于高并发场景下我们怎么才能保证数据的完整性和一致性呢?
所以今天我们来聊聊Go的atomic
包,它提供了低级别原子内存原语,对于实现同步算法起到很大作用。可以说是Go并发编程的基石,比如Mutex
、RWMutex
、WaitGroup
、Once
等实现都依赖于atomic
。当然其提供的功能需要格外小心才能正确使用,atomic
大致提供了5类原子操作,因为不会被CPU中断所以在多个goroutine
之间访问是安全的。
由
SwapT
函数实现的交换操作,在原子上等价于
-
old = *addr
-
*addr =
new
-
return old
由
CompareAndSwapT
函数实现的比较并交换操作,在原子上等价于
-
if *addr == old {
-
*addr =
new
-
return
true
-
}
-
return
false
由
AddT
函数实现的加法操作,原子上等价于
-
*addr += delta
-
return *addr
由
LoadT
和StoreT
函数实现的加载和存储操作,原子上等价于return *addr
和*addr=val
atomic.Value
为除了int32
、int64
、uint32
、uint64
、uintptr
和unsafe.Pointer
之外的类型提供原子读写能力。
atomic源码分析
然而我们在atomic
包下只能看到类似如下的函数定义
-
// SwapInt32 atomically stores new into *addr and returns the previous *addr value.
-
func SwapInt32(addr *int32, new int32) (old int32)
-
// SwapInt64 atomically stores new into *addr and returns the previous *addr value.
-
func SwapInt64(addr *int64, new int64) (old int64)
-
// SwapUint32 atomically stores new into *addr and returns the previous *addr value.
-
func SwapUint32(addr *uint32, new uint32) (old uint32)
-
... ...
并没有发现函数实现部分,但是能够找到相应汇编代码
-
TEXT ·SwapInt32(SB),NOSPLIT,$
0
-
JMP runtime∕internal∕atomic·Xchg(SB)
-
TEXT ·SwapUint32(SB),NOSPLIT,$
0
-
JMP runtime∕internal∕atomic·Xchg(SB)
-
TEXT ·SwapInt64(SB),NOSPLIT,$
0
-
JMP runtime∕internal∕atomic·Xchg64(SB)
-
... ...
可见它们最终都是基于运行时中runtime/internal/atomic
的实现。
提前剧透一下,原子操作其实最终依赖硬件指令的支持,但是因为原子操作可能会导致goroutine
阻塞,所以它同时还需要运行时调度器的配合。
案例分析
以atomic.CompareAndSwapPointer
为例,它只有函数定义没有函数体
-
// CompareAndSwapPointer executes the compare-and-swap operation for a unsafe.Pointer value.
-
func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)
通过查找,发现其本身由运行时实现
-
//go:linkname sync_atomic_CompareAndSwapPointer sync/atomic.CompareAndSwapPointer
-
//go:nosplit
-
func sync_atomic_CompareAndSwapPointer(ptr *unsafe.Pointer, old, new unsafe.Pointer) bool {
-
if writeBarrier.enabled {
-
atomicwb(ptr,
new)
-
}
-
return sync_atomic_CompareAndSwapUintptr((*
uintptr)(noescape(unsafe.Pointer(ptr))),
uintptr(old),
uintptr(
new))
-
}
可以看到其最终调用了sync_atomic_CompareAndSwapUintptr
,且sync_atomic_CompareAndSwapUintptr
也只有函数定义没有函数体,而且也找不到运行时的实现,说明它是由编译器完成。那么来通过一个栗子一窥究竟吧
-
package main
-
-
import (
-
"fmt"
-
"sync/atomic"
-
"unsafe"
-
)
-
-
func main() {
-
var p unsafe.Pointer
-
newP :=
23
-
atomic.CompareAndSwapPointer(&p,
nil, unsafe.Pointer(&newP))
-
v := (*
int)(p)
-
fmt.Println(*v)
-
}
先执行编译命令go build -gcflags="-N -l -m" -o atomic atomic.go得到二进制文件
atomic`
然后执行go tool objdump -s "main.main" atomic查看下
main.main`编译结果
-
TEXT main.main(SB) /Users/shangyindong/mywork/workspace/workspace_github/
go-snippets/atomic/atomic.
go
-
.....
-
atomic.
go:
12
0x10a6ff3 e82869fbff CALL sync/atomic.CompareAndSwapPointer(SB)
-
......
go tool objdump -s "sync/atomic.CompareAndSwapPointer" atomic
接着看CompareAndSwapPointer
-
TEXT sync/atomic.CompareAndSwapPointer(SB) /usr/local/
go/src/runtime/atomic_pointer.
go
-
......
-
atomic_pointer.
go:
76
0x105d960 e85b8b0000 CALL sync/atomic.CompareAndSwapUintptr(SB)
-
......
可以看到CompareAndSwapPointer
实际调用了CompareAndSwapUintptr
接着看go tool objdump -s "sync/atomic.CompareAndSwapUintptr" atomic
-
TEXT sync/atomic.CompareAndSwapUintptr(SB) /usr/local/
go/src/sync/atomic/asm.s
-
asm.s:
31
0x10664c0 e93bbaf9ff JMP runtime/internal/atomic.Casuintptr(SB)
-
......
最终JMP跳转到了``,这个方法为内置汇编,看下asm_amd64.s吧
-
TEXT runtime∕internal∕atomic·Casuintptr(SB), NOSPLIT, $
0
-25
-
JMP runtime∕internal∕atomic·Cas64(SB)
进而调整到runtime/internal/atomic.Cas64
-
// bool runtime∕internal∕atomic·Cas64(uint64 *val, uint64 old, uint64 new)
-
// Atomically:
-
// if(*val == *old){
-
// *val = new;
-
// return 1;
-
// } else {
-
// return 0;
-
// }
-
TEXT runtime∕internal∕atomic·Cas64(SB), NOSPLIT, $
0
-25
-
MOVQ ptr+
0(FP), BX
-
MOVQ old+
8(FP), AX
-
MOVQ
new+
16(FP), CX
-
LOCK
-
CMPXCHGQ CX,
0(BX)
-
SETEQ ret+
24(FP)
-
RET
到这里我们能够很清晰看到,本质上原子操作最终还是依赖于CPU的Lock
+CMPXCHGQ
指令,Cas64(SB)
总共包含7条指令
第一条指令:将ptr的值放入BX
第二条指令:将假设的旧值放入AX
第三条指令:将要比较的新值放入CX
第四条指令:LOCK
并不是指令,而是作为指令前缀用来修饰CMPXCHGQ CX, 0(BX)
的
大致有五类指令可强制使用LOCK语义,但当LOCK前缀被置于其他指令之前或者指令没有对内存进行写操作(目标操作数可能在寄存器中)时,会抛出一个invalid-opcode异常
位测试和修改指令(BTS,BTR,BTC)
交换指令(XADD,CMPXCHG,CMPXCHG8B)
XCHG指令自动使用LOCK前缀
单操作数算术和逻辑指令:INC,DEC,NOT,NEG
双操作数算术和逻辑指令:ADD,ADC,SUB,SBB,AND,OR,XOR
对于Intel486和Pentium处理器,在进行加锁操作时,LOCK#信号总是在总线上发出,甚至锁定的内存区域已经缓存在处理器中。这种通过封锁总线来禁止其他CPU对内存修改进而达到原子性的效果,显然锁的力度过于粗糙。
所以在Pentium4,Intel Xeon,P6系列已经最近的处理器,如果加锁的内存区域已经缓存在处理器中,处理器可能并不对总线发出LOCK#信号,而是仅仅修改缓存中的数据,然后依赖缓存一致性协议(MESI 详见《手摸手Go 深入剖析sync.Pool》有讲解)来保证加锁操作的自动执行。缓存一致性协议会自动阻止两个或多个缓存了同一区域内存的处理器同时修改数据。
感兴趣的可以详细研究下intel开发手册卷3
第五条指令:调用CMPXCHGQ
,将指令第二个操作数与累加器AX
比较 ,如果相等,CX
更新到BX
,否则BX更新到
AX`
第六条指令:AX
和CX
相等时将1写进ret+16(FP)
否则写入0
第七条指令:函数返回结束。
特殊的atomic.Value
atomic.Value
是Go语言1.4版本的时候加入的,它相当于一个容器,可以原子的Store
和Load
任意类型的值。是对int32
、int64
、uint32
、uint64
、uintptr
和unsafe.Pointer
类型原子操作的补充。但它的实现不是通过汇编来完成,而是基于已有的atomic
包。
看下它的基本结构
-
// Value的零值为nil
-
// 使用后禁止拷贝
-
type Value
struct {
-
v
interface{}
-
}
-
// ifaceWords is interface{} internal representation.
-
type ifaceWords
struct {
-
typ unsafe.Pointer
-
data unsafe.Pointer
-
}
因为atomic.Value
被设计为存储任意类型的值,所以它内部只有一个interface{}
类型的字段。并且在atomic/value.go
文件中还定义了一个ifaceWords
,之前我们讲过Go的接口结构,是不是跟eface
很像
-
type eface
struct {
-
_type *_type
-
data unsafe.Pointer
-
}
其实atomic.Value
的实现原理就是将interface{}
类型分解,得到类型和数据这两个unsafe.Pointer
类型字段,在针对它们进行原子操作来达到interface{}
类型原子操作的目的。
unsafe.Pointer
我们知道Go语言的编译器会使用静态类型检查来保证程序运行的类型安全。但它的标准库中又提供了unsafe.Pointer
,可以让程序灵活的操作内存并且可以绕过Go语言的类型检查,从而可以跟任意的指针类型相互转换。
例如 字符串和byte切片之前的零拷贝转换
-
type StringHeader
struct {
-
Data
uintptr
-
Len
int
-
}
-
-
type SliceHeader
struct {
-
Data
uintptr
-
Len
int
-
Cap
int
-
}
我们看到slice和string的底层数据结构基本一样,虽然Go语言的类型检查禁止了它们之间相互转换。但是拥有了unsafe.Pointer
这个黑魔法,你就可以零拷贝实现[]byte 和string之间的转换,只需共享底层的Data和Len即可
-
func string2bytes(s string) []byte {
-
return *(*[]
byte)(unsafe.Pointer(&s))
-
}
-
func bytes2string(b []byte) string{
-
return *(*
string)(unsafe.Pointer(&b))
-
}
如果你搞清楚了unsafe.Pointer
,那么接下来atomic.Value
的神秘面纱也就很好揭开了。
atomic.Value
提供了Load
和Store
两个操作,完成数据的读取和存储。
写操作Store
Store大致逻辑:
将待存储的数据和当前的值分别转换为
*ifaceWords
进入一个无限for循环
2.1 先检查现有值的
typ
,如果为nil表示这是第一次存储,则先调用runtime_procPin()
通过修改当前g关联m的locks属性来禁止P被抢占2.2 尝试使用
CompareAndSwapPointer
将现有值的typ
设置为unsafe.Pointer(^uintptr(0))
方便Load
操作时判断当前状态,如果失败则解除抢占回到for循环开始位置继续执行2.3 如果设置成功,则可以完成第一次的数据存储
自旋等待中的
gorountine
如果发现uintptr(typ) == ^uintptr(0)
表明第一次存储尚未完成则继续自旋等待到这里说明第一次存储已经完成,则检查Value从始至终是否都是保存同一类型数据,不是则panic
非第一次存储,则更新数据
-
// Store 将Value的值设置为x
-
// 给定值的所有Store调用都必须使用相同的具体类型否则会像存储nil值一样会发生panic
-
func (v *Value) Store(x interface{}) {
-
if x ==
nil {
-
panic(
"sync/atomic: store of nil value into Value")
-
}
-
vp := (*ifaceWords)(unsafe.Pointer(v))
-
xp := (*ifaceWords)(unsafe.Pointer(&x))
-
for {
-
typ := LoadPointer(&vp.typ)
-
if typ ==
nil {
-
// 尝试开始第一次存储
-
//禁止P被抢占 以便其他goroutine可以使用主动自旋等待来等待完成
-
//GC也互补偶然看到伪造的类型
-
runtime_procPin()
-
// 加了个乐观锁
-
if !CompareAndSwapPointer(&vp.typ,
nil, unsafe.Pointer(^
uintptr(
0))) {
-
//没抢到机会 则退出
-
runtime_procUnpin()
-
continue
-
}
-
// 完成第一次存储
-
StorePointer(&vp.data, xp.data)
-
StorePointer(&vp.typ, xp.typ)
-
runtime_procUnpin()
-
return
-
}
-
if
uintptr(typ) == ^
uintptr(
0) {
-
// 第一次存储正在进行 继续等待
-
// 因为我们已经禁止了抢占所以我们可以继续自旋等待
-
continue
-
}
-
// 第一次存储完成 检查类型并且覆盖数据
-
if typ != xp.typ {
-
panic(
"sync/atomic: store of inconsistently typed value into Value")
-
}
-
StorePointer(&vp.data, xp.data)
-
return
-
}
-
}
读操作Load
读取操作相对简单就不赘述。
-
// Load 返回最近一次Store存储的数据
-
// 如果没有调用Store存储数据则返回nil
-
func (v *Value) Load() (x interface{}) {
-
vp := (*ifaceWords)(unsafe.Pointer(v))
-
typ := LoadPointer(&vp.typ)
-
if typ ==
nil ||
uintptr(typ) == ^
uintptr(
0) { 未调用Store或第一次存储尚未完成 直接返回
nil
-
// 第一次存储尚未完成
-
return
nil
-
}
-
data := LoadPointer(&vp.data)
-
xp := (*ifaceWords)(unsafe.Pointer(&x))
-
xp.typ = typ
-
xp.data = data
-
return
-
}
-
关于bug
atomic
包中有一段这样的注释
BUG(rsc): On 386, the 64-bit functions use instructions unavailable before the Pentium MMX. On non-Linux ARM, the 64-bit functions use instructions unavailable before the ARMv6k core. On ARM, 386, and 32-bit MIPS, it is the caller's responsibility to arrange for 64-bit alignment of 64-bit words accessed atomically. The first word in a variable or in an allocated struct, array, or slice can be relied upon to be 64-bit aligned.
在《手摸手Go 你的内存对齐了吗?》中有聊到过内存对齐的内容。(不同硬件平台并不是都可以在任意地址上访问任意数据;而且如果数据没有内存对齐可能会导致CPU访问两次内存才能拿到数据,如果内存对齐一次就能完成数据读取。)
这里大概是说在ARM,386,和32位MIPS,调用者有责任安排原子访问的64位字按照8字节对齐,否则程序会panic。因为不同平台上的编译器有自己的对齐系数,32bit平台上一般是4字节对齐,而在64bit平台上一般是8字节对齐。所以32bit平台上8字节数字可能会因为内存对齐拆分成2个4字节分布。
举个栗子
-
package main
-
-
import (
-
"fmt"
-
"sync/atomic"
-
)
-
-
type M
struct {
-
x
int64
-
u
uint32
-
v
int64
-
}
-
-
func main() {
-
m := M{}
-
result := atomic.AddInt64(&m.v,
1)
-
fmt.Println(result)
-
}
-
GOARCH=amd64 go build pointer.go && ./pointer
执行正常
但是在386上GOARCH=386 go build pointer.go && ./pointer
程序发生panic
-
panic: runtime error: invalid memory address or
nil pointer dereference
-
[signal SIGSEGV: segmentation violation code=
0x1 addr=
0x0 pc=
0x2f7c]
-
-
goroutine
1 [running]:
-
runtime/internal/atomic.Xadd64(
0x1141612c,
0x1,
0x0,
0x53f8,
0x1141a230)
-
/usr/local/
go/src/runtime/internal/atomic/asm_386.s:
105 +
0xc
-
main.main()
-
/Users/mywork/workspace/workspace_go/godemo/pointer/pointer.
go:
16 +
0x40
-
总结
通过阅读源码,很显然atomic
包中的原子操作均为底层硬件指令的协助完成,不需要加锁和解锁过程,所以对于单一变量更新保护,原子操作用起来更高效。文章篇幅关系我们这里只分析CompareAndSwapPointer
,至于其他原子操作具体底层的硬件指令感兴趣的童鞋可以继续探索。
如果阅读过程中发现本文存疑或错误的地方,可以关注公众号留言。如果觉得还可以 帮忙点个在看????
转载:https://blog.csdn.net/sydhappy/article/details/114650607