写在最前,Context是Golang的核心包,每个Gopper都应该熟练的掌握并应用它!
1-Context 应用场景
- ①上层任务取消后,所有的下层任务都会被取消;②中间某一层的任务取消后,只会将当前任务的下层任务取消,而不会影响上层的任务以及同级任务。
- 业务需要对访问的数据库,RPC ,或API接口,为了防止这些依赖导致我们的服务超时,需要针对性的做超时控制
- 为了详细了解服务性能,记录详细的调用链Log
2-Context 原理
-
Context 的调用应该是链式的,从Context 派生出新的子类:
WithCancel
、WithDeadline/WithTimeout
、WithValue
。当父 Context 被取消时,其派生的所有 Context 都将取消。 -
通过
context.WithXXX
都将返回新的 Context 和 CancelFunc。① 调用 CancelFunc 将取消子代,移除父代对子代的引用,并且停止所有定时器。
② 未能调用 CancelFunc 将泄漏子代,直到父代被取消或定时器触发。
go vet
工具检查所有流程控制路径上使用 CancelFuncs。
3-Context 使用遵循规则
Context 的使用遵循以下规则,以保持包之间的接口一致,并启用静态分析工具以检查上下文传播。
-
不要将 Contexts 放入结构体,相反**
context
应该作为第一个参数传入,命名为ctx
**。func DoSomething(ctx context.Context,arg Arg) error { // ... use ctx ... }
-
即使函数允许,也不要传入
nil
的 Context。如果不知道用哪种 Context,可以使用
context.TODO()
-
context.Value方法:只应该用于在程序和接口中传递的和请求相关的元数据,不要用它来传递一些可选的参数
-
相同的 Context 可以传递给在不同的
goroutine
;Context 是并发安全的
4-Context 包
4.1. 解读Context interface
type Context interface {
// Done returns a channel that is closed when this Context is canceled or times out.
// 返回一个channel。当times out 或 调用cancel方法时,将会close掉
Done() <-chan struct{
}
// Err indicates why this context was canceled, after the Done channel is closed.
// 返回一个错误。该context为什么被取消掉。
Err() error
// Deadline returns the time when this Context will be canceled, if any.
// Value returns the value associated with key or nil if none.
Value(key interface{
}) interface{
}
}
-
Deadline()
:返回绑定当前ctx的任务被取消的时间 (如果没设置截止时间, 将返回 ok == false) -
Done()
: 当ctx被取消/到期时,返回一个关闭的chan (如果当前ctx不会被取消, 将返回nil) -
Err()
:如果Done返回的chan,① 没有关闭,err()返回nil ② 已经关闭,err()返回非nil,解释goroutine
被取消的原因 -
Value()
:返回ctx存储的键值对中当前key对应的value(如果没有对应的key, 则返回nil)Q:
context
所包含的额外信息键值对是如何存储的呢?A:其实可以想象一颗树,树的每个节点可能携带一组键值对,如果当前节点上无法找到
key
所对应的值,就会向上去父节点里找,直到根节点,具体后面会说到。
上面可以看到:
- Context是一个接口,想要使用它,就得实现其方法。
- 在context包内部已经为我们实现好了两个
emptyCtx
,可以通过调用Background()
、TODO()
方法获取。一般的将它们作为Context的根,往下派生。
4.2. Context 所有方法
学习本章节,需要对照着4.3章节一起学习。
4.2.1. Background/TODO
func Background() Context
func TODO() Context
- Background():Background通常被用于主函数、初始化以及测试中,作为一个顶层的ctx,也就是说一般我们创建的ctx都是基于Background
- TODO():在不确定使用什么
context
的时候才会使用(context一定不能为nil,如果不确定,可以使用context.TODO()生成一个empty的context)
4.2.2. WithXXX系列函数:创建子Ctx
先说结论
-
WithXXX的种方法比较类似,均会基于 parent Context 生成一个子 ctx,以及一个 Cancel 方法。如果调用了cancel 方法,ctx 以及基于 ctx 构造的子 context 都会被取消。不同点在于 WithCancel 必需要手动调用 cancel 方法,WithDeadline
可以设置一个时间点,WithTimeout 是设置调用的持续时间,到指定时间后,会调用 cancel 做取消操作。 -
除了上面的构造方式,还有一类是用来创建传递 traceId, token 等重要数据的 Context。
func WithValue(parent Context, key, val interface{}) Context {}
withValue 会构造一个新的context,新的context 会包含一对 Key-Value 数据,可以通过Context.Value(Key) 获取存在 ctx 中的 Value 值。
1- WithCancel – cancelCtx
- 必须手动调用cancle()才能取消
/*
* 给parent ctx新建一个子节点(类型cancleCtx)
* @return ctx: 新子节点 cacle: 取消函数
*/
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
if parent == nil {
panic("cannot create context from nil parent")
}
c := newCancelCtx(parent) // 新建cancleCtx(详细见4.4.)
propagateCancel(parent, &c) // 来建立当前节点与祖先节点这个取消关联逻辑
return &c, func() {
c.cancel(true, Canceled) }
}
// newCancelCtx returns an initialized cancelCtx.
func newCancelCtx(parent Context) cancelCtx {
return cancelCtx{
Context: parent}
}
2- WithDeadline – timerCtx
-
手动调用cancle()能取消
-
超时后,会自动调用cancle()取消ctx
/*
* 给parent ctx新建一个子节点(带超时, 类型cancleCtx)
* @param [in] deadline 过期时间点
* @return ctx: 新子节点 cacle: 取消函数
*/
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
注意:WithDeadline 的最后期限调整为不晚于 d 返回父上下文的副本。如果父母的截止日期已经早于 d,WithDeadline (父,d) 是在语义上等效为父。返回的上下文完成的通道关闭的最后期限期满后,返回的取消函数调用时,或当父上下文完成的通道关闭,以先发生者为准。
package main
import (
"context"
"fmt"
"time"
)
func main() {
// 创建son ctx, 到期时间设置为50ms
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(50 * time.Millisecond))
//即使ctx将过期,在任何情况下调用它的cancel函数都是一个好习惯。
//如果不这样做,可能会使ctx及其父节点存活的时间超过必要时间。
defer cancel()
select {
case <-time.After(100 * time.Millisecond): // parent ctx, 到期时间为100ms
fmt.Printf("parent ctx is finish: overslept\n")
case <-ctx.Done(): // son ctx先到期(50ms), 到期后会向ctx.Done()写入数据 ==> 因此执行下面的语句
fmt.Printf("son ctx is finish, err:%s\n", ctx.Err())
}
}
3- WithTimeout – timerCtx
-
手动调用cancle()能取消
-
超时后,会自动调用cancle()取消ctx
/*
* 给parent ctx新建一个子节点(带超时, 类型cancleCtx)
* @param [in] deadline 接收一个相对当前时间的过期时长timeout
* 等待于 WithDeadline(parent, time.Now().Add(timeout))
* @return ctx: 新子节点 cacle: 取消函数
*/
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
4- WithValue – valueCtx
注意:这里添加键值对不是在原parent context
结构体上直接添加,而是以此parent context
作为父节点,重新创建一个新的valueCtx
子节点,将键值对添加在子节点上,由此形成一条context
链。
// 为parent创建新的子ctx,该ctx携带<key,val>键值对信息
func WithValue(parent Context, key, val interface{
}) Context {
if parent == nil {
// 断言:parent不为nil
panic("cannot create context from nil parent")
}
if key == nil {
// 断言:key不为nil
panic("nil key")
}
if !reflectlite.TypeOf(key).Comparable() {
// 断言:key可比较
panic("key is not comparable")
}
// 以parent为父亲,创建新的子节点valueCtx,包含<key,val>
return &valueCtx{
parent, key, val}
}
4.3. Context的派生类
在使用场景中可以看到context包本身包含了数个导出函数,包括WithValue、WithTimeout等,无论是最初构造context还是传导context,最核心的接口类型都是context.Context,任何一种context也都实现了该接口,包括value context。
4.3.1. emptyCtx
-
emptyCtx
没有超时时间,不能取消,也不能携带任何额外信息 -
emptyCtx
用来作为context
树的根节点 -
一般不会直接使用
emptyCtx
,而是使用由emptyCtx
实例化的两个变量,分别可以通过调用Background
和TODO
方法得到。(区别见4.2.1)
// An emptyCtx is never canceled, has no values, and has no deadline. It is not
// struct{}, since vars of this type must have distinct addresses.
type emptyCtx int
// 没有超时时间
func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
return
}
// 不能被取消
func (*emptyCtx) Done() <-chan struct{
} {
return nil
}
func (*emptyCtx) Err() error {
return nil
}
// 不能携带额外信息
func (*emptyCtx) Value(key interface{
}) interface{
} {
return nil
}
func (e *emptyCtx) String() string {
switch e {
case background:
return "context.Background"
case todo:
return "context.TODO"
}
return "unknown empty Context"
}
var (
background = new(emptyCtx)
todo = new(emptyCtx)
)
func Background() Context {
return background
}
func TODO() Context {
return todo
}
4.3.1. cancelCtx
继承自Context,补充了新的功能:
- 实现了``canceler
接口,支持取消当前ctx下所有的子ctx
type cancelCtx struct {
Context // 继承了Context接口,表示valueCtx所属的父节点
mu sync.Mutex // 保护下面的字段
done chan struct{
} // chan(用来传递关闭信号): 惰性创建
children map[canceler]struct{
} // 存储当前ctx节点下所有的子节点
err error // 存储错误信息,表示人物结束的原因
}
// cancleer接口
type canceler interface {
cancel(removeFromParent bool, err error)
Done() <-chan struct{
}
}
cancelCtx 除了实现Context
接口,还实现了 canceler
接口,Done()
和cancle()
详细见下:
// 创建done通道, 用于通信
func (c *cancelCtx) Done() <-chan struct{
} {
c.mu.Lock()
// 如果c.done不存在, 就创建一个
if c.done == nil {
c.done = make(chan struct{
})
}
d := c.done // 保存到变量d
c.mu.Unlock()
return d // 返回d
}
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
if err == nil {
panic("context: internal error: missing cancel error")
}
c.mu.Lock()
if c.err != nil {
c.mu.Unlock()
return // already canceled
}
// 设置取消原因
c.err = err
if c.done == nil {
// 设置一个关闭的channel
c.done = closedchan
} else {
// 将c.done通道关闭,用以发送关闭信号
close(c.done)
}
// 将子节点context依次取消
for child := range c.children {
// NOTE: acquiring the child's lock while holding parent's lock.
child.cancel(false, err)
}
c.children = nil
c.mu.Unlock()
if removeFromParent {
removeChild(c.Context, c) // 将当前context节点从父节点上移除
}
}
4.3.2. timerCtx
- 手工取消
- 超时取消
type timerCtx struct {
cancelCtx
timer *time.Timer // Under cancelCtx.mu.
deadline time.Time
}
func (c *timerCtx) Deadline() (deadline time.Time, ok bool) {
return c.deadline, true
}
func (c *timerCtx) cancel(removeFromParent bool, err error) {
// 内部使用cancelCtx实现取消
c.cancelCtx.cancel(false, err)
if removeFromParent {
// Remove this timerCtx from its parent cancelCtx's children.
removeChild(c.cancelCtx.Context, c)
}
// 取消计时器
c.mu.Lock()
if c.timer != nil {
c.timer.Stop()
c.timer = nil
}
c.mu.Unlock()
}
4.3.3. valueCtx
继承自Context,补充了新的功能:
- 添加了成员属性:key,val,能够携带额外的信息
type valueCtx struct {
Context // 继承了Context接口,表示valueCtx所属的父节点
key, val interface{
} // 携带key-value键值对,同来保存额外信息
}
// 实现了Value方法:用来在context链上(直到根节点),寻找key对应的value
func (c *valueCtx) Value(key interface{
}) interface{
} {
if c.key == key {
/* 当前c上找到, 就返回val */
return c.val
}
/* 在c所属的父节点c.Context上继续调用Value, 查找key对应的val */
return c.Context.Value(key)
}
5-Context使用技巧
5.1. 构造Context
一般来说,我们的根context会在请求的入口处构造如下:ctx := context.Background()
如果拿捏不准是否需要一个全局的context,可以使用下面这个函数构造:ctx := context.TODO()
5.2. 传值方式
- 不能使用传引用方式,而是使用传值方式
- 智能自顶向下传值,反之则不可以
package main
import (
"context"
"fmt"
)
func func1(ctx context.Context) {
// WithValue创建:携带(k1,v1)的子节点ctx
ctx = context.WithValue(ctx, "k1", "v1")
// 获取ctx的key="k1"对应的val值
func2(ctx)
}
func func2(ctx context.Context) {
// Value获取
fmt.Println(ctx.Value("k1").(string))
}
func main() {
ctx := context.Background()
func1(ctx)
}package main
import (
"context"
"fmt"
)
func func1(ctx context.Context) {
ctx = context.WithValue(ctx, "k1", "v1")
func2(ctx)
}
func func2(ctx context.Context) {
fmt.Println(ctx.Value("k1").(string))
}
func main() {
ctx := context.Background()
func1(ctx)
}
// 执行结果: v1
5.3. 取消cancel
- 如果有cancel,一定要保证调用,否则会造成资源泄露,比如timer泄露。
- cancel函数是幂等的,可以被多次调用。
- context中包含done channel可以用来确认是否取消、通知取消。
6-Context使用案例
6.1. SQL超时查询
在做数据库查询时,需要对数据的查询做超时控制,例如:
ctx = context.WithTimeout(context.Background(), time.Second)
rows, err := pool.QueryContext(ctx, "select * from products where id = ?", 100)
上面的代码基于 Background 派生出一个带有超时取消功能的ctx,传入带有context查询的方法中,如果超过1s未返回结果,则取消本次的查询。使用起来非常方便。为了了解查询内部是如何做到超时取消的,我们看看DB内部是如何使用传入的ctx的。
6.2. 互联网中的使用场景
func main() {
/*1.准备req*/
// 创建一个http请求req
req, _ := http.NewRequest("GET", "https://api.github.com/users/helei112g", nil)
// 创建子ctx, 设置超时时间
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond * 1)
defer cancel()
// 将ctx绑定到req.ctx中: req.ctx = ctx
req = req.WithContext(ctx)
/*2.执行req请求*/
resp, err := http.DefaultClient.Do(req) // 执行请求, 得到resp
if err != nil {
log.Fatalln("request Err", err.Error())
}
defer resp.Body.Close()
body, _ := ioutil.ReadAll(resp.Body) // 读取resp.Body的内容
fmt.Println(string(body))
}
这段程序就是请求 github 获取用户信息的接口,通过 context 包设置了请求超时时间是 1ms (肯定无法访问到)。执行时我们看到控制台做如下输出:
2020/xx/xx xx:xx:xx request Err Get https://api.github.com/users/helei112g: context canceled
exit status 1
package main
import (
"context"
"fmt"
"time"
)
type key int
// 全局常量
const (
userIP = iota
userID
logID
)
// 保存业务请求返回结果
type Result struct {
order string // 订单
logistics string // 物流
recommend string // 推荐商品
}
// timeout: 1s
// 入口函数
func api() (result *Result, err error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
defer cancel()
// ctx下添加3个子节点
ctx = context.WithValue(ctx, userIP, "127.0.0.1") // 订单
ctx = context.WithValue(ctx, userID, 666888) // 物流
ctx = context.WithValue(ctx, logID, "123456") // 推荐商品
result = &Result{
}
// 业务逻辑处理放到协程中: 3个子业务并发执行
go func() {
result.order, err = getOrderDetail(ctx)
}()
go func() {
result.logistics, err = getLogisticsDetail(ctx)
}()
go func() {
result.recommend, err = getRecommend(ctx)
}()
for {
select {
case <-ctx.Done():
fmt.Printf("cancel or timeout")
return result, ctx.Err() // 取消or超时,把现有已经拿到的结果返回
default:
}
// 有错误直接返回
if err != nil {
fmt.Printf("err != nil, err:%s", err)
return result, err
}
// 全部处理完成,直接返回
if result.order != "" && result.logistics != "" && result.recommend != "" {
fmt.Println("3 success")
return result, nil
}
}
}
// timeout: 500ms
func getOrderDetail(ctx context.Context) (string, error) {
ctx, cancel := context.WithTimeout(ctx, time.Millisecond*500)
defer cancel()
uip := ctx.Value(userIP).(string) // 获取 user id
fmt.Println("userIP", uip)
return handleTimeout(ctx, func() string {
time.Sleep(time.Millisecond * 700) // 模拟超时
return "order"
})
}
// timeout: 700ms
func getLogisticsDetail(ctx context.Context) (string, error) {
ctx, cancel := context.WithTimeout(ctx, time.Millisecond*700)
defer cancel()
uid := ctx.Value(userID).(int) // 获取 user id
fmt.Println("userID", uid)
return handleTimeout(ctx, func() string {
return "logistics"
})
}
// timeout: 400ms
func getRecommend(ctx context.Context) (string, error) {
ctx, cancel := context.WithTimeout(ctx, time.Millisecond*400)
defer cancel()
lid := ctx.Value(logID).(string) // 获取 log id
fmt.Println("logID", lid)
return handleTimeout(ctx, func() string {
return "recommend"
})
}
// 超时的统一处理代码
func handleTimeout(ctx context.Context, f func() string) (string, error) {
// 请求之前先去检查下是否超时
select {
case <-ctx.Done():
return "", ctx.Err()
default:
}
str := make(chan string) // 创建chan, 用于阻塞
go func() {
// 业务逻辑
str <- f()
}()
select {
case <-ctx.Done(): // 上层ctx调用cancel()、时间超时
return "", ctx.Err()
case ret := <-str:
return ret, nil
}
}
func main() {
api()
}
/*
userIP 127.0.0.1
logID 123456
userID 666888
err != nil, err:context deadline exceeded
*/
6.3. 源码简单案例
参考链接:liwenzhou
-
示例代码1
package main import ( "context" "fmt" "sync" "time" ) var wg sync.WaitGroup func worker(ctx context.Context) { LOOP: for { fmt.Println("worker") time.Sleep(time.Second) select { case <-ctx.Done(): // 等待上层调用cancel break LOOP default: } } wg.Done() } func main() { ctx, cancel := context.WithCancel(context.Background()) wg.Add(1) go worker(ctx) time.Sleep(time.Second * 3) cancel() // 通知子goroutine结束 wg.Wait() fmt.Println("over") } /* worker worker worker worker over */
-
示例代码2
package main import ( "context" "fmt" "sync" "time" ) var wg sync.WaitGroup func worker1(ctx context.Context) { go worker2(ctx) LOOP: for { fmt.Println("worker1") time.Sleep(time.Second) select { case <-ctx.Done(): // 等待上级通知 break LOOP default: } } wg.Done() } func worker2(ctx context.Context) { LOOP: for { fmt.Println("worker2") time.Sleep(time.Second) select { case <-ctx.Done(): // 等待上级通知 break LOOP default: } } } func main() { ctx, cancel := context.WithCancel(context.Background()) wg.Add(1) go worker1(ctx) time.Sleep(time.Second * 3) cancel() // 通知子goroutine结束 wg.Wait() fmt.Println("over") } /* worker2 worker1 worker2 worker1 worker2 worker1 over */
转载:https://blog.csdn.net/weixin_36750623/article/details/116208552