探究低层建筑:asyncio
Python由于全局锁(GIL)的存在,一直无法发挥多核的优势,其性能一直饱受诟病。
不过,在IO密集型的网络编程各种,异步处理比同步处理能够提升非常之高的速度。
而相对于其他语言,Python还有一个很明显的优势,那就是它的库很多啊!!!
Python3版本引入了async/await特性,其特点是:当执行过程中遇到IO请求的时候,可以将CPU资源出让,运行其他的任务;待IO完成之后,继续执行之前的任务。协程切换与线程切换比较类似,但协程切换更轻,不需要操作系统参与(没有栈切换操作,也没有用户态与内核态切换)。
同步/异步
在介绍协程之前,我还是再说一下同步和异步的概念,如果对这两个概念都混淆不清的话,下面的更不用说了。
==同步:串行。异步:并行。==不要被字面意思所迷惑。
同步是指完成事务的逻辑,先执行第一个事务,如果阻塞了,会一直等待,直到这个事务完成,再执行第二个事务,顺序执行。。。
异步是和同步相对的,异步是指在处理调用这个事务的之后,不会等待这个事务的处理结果,直接处理第二个事务去了,通过状态、通知、回调来通知调用者处理结果。
我再简单的介绍一下协程:
了解一下协程
协程,英文Coroutines,是一种比线程更加轻量级的存在。正如一个进程可以拥有多个线程一样,一个线程也可以拥有多个协程。
子程序,或者称为函数,在所有语言中都是层级调用,比如A调用B,B在执行过程中又调用了C,C执行完毕返回,B执行完毕返回,最后是A执行完毕。
所以子程序调用是通过栈实现的,一个线程就是执行一个子程序。
子程序调用总是一个入口,一次返回,调用顺序是明确的。而协程的调用和子程序不同。
协程看上去也是子程序,但执行过程中,在子程序内部可中断,然后转而执行别的子程序,在适当的时候再返回来接着执行。
注意,在一个子程序中中断,去执行其他子程序,不是函数调用,有点类似CPU的中断。比如子程序A、B:
def A():
print '1'
print '2'
print '3'
def B():
print 'x'
print 'y'
print 'z'
假设由协程执行,在执行A的过程中,可以随时中断,去执行B,B也可能在执行过程中中断再去执行A,结果可能是:
1 x 2 y 3 z
但是在A中是没有调用B的,所以协程的调用比函数调用理解起来要难一些。
相对于线程,协程的优势
最大的优势就是协程极高的执行效率。因为子程序切换不是线程切换,而是由程序自身控制,因此,没有线程切换的开销,和多线程比,线程数量越多,协程的性能优势就越明显。
第二大优势就是不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多。
因为协程是一个线程执行,那怎么利用多核CPU呢?最简单的方法是多进程+协程,既充分利用多核,又充分发挥协程的高效率,可获得极高的性能。
同步代码转异步代码
以下为一段同步代码:
import time
def hello():
time.sleep(1)
def run():
for i in range(5):
hello()
print('Hello World:%s' % time.time()) # 任何伟大的代码都是从Hello World 开始的!
run()
以下是一段异步代码:
import time
import asyncio
# 定义异步函数
async def hello():
asyncio.sleep(1)
print('Hello World:%s' % time.time())
def run():
for i in range(5):
loop.run_until_complete(hello())
loop = asyncio.get_event_loop()
run()
通过asyncio讲解协程
- 通过async def来定义一个协程函数,通过await来执行一个协程对象。协程对象、协程函数的概念如下所示:
async def func_1(): # 1. 定义了一个协程函数
pass
async def func_2(): # 2. 注意要在函数内部调用协程函数,自身也必须定义为协程
# 3. func_1()调用产生了一个协程对象,通过await来执行这个协程。如果不加await,
# 直接以func_1()方式调用,则func_1中代码并不会执行。
await func_1()
async def 用来定义异步函数,其内部有异步操作。每个线程有一个事件循环,主线程调用asyncio.get_event_loop()时会创建事件循环,你需要把异步的任务丢给这个循环的run_until_complete()方法,事件循环会安排协同程序的执行。
- 一般情况下,无法在一个非协程函数中阻塞地调用另一个协程。但你可以通过asyncio.ensure_future()来异步执行这个协程:
import asyncio
async def fun_1(): # 1. 定义了一个协程函数
pass
def bar():
asyncio.ensure_future(fun_1()) # 这里fun_1()将会在某个时间执行,具体执行顺序未知
# 这里是阻塞执行fun_1(),但这种调用,只能在event loop进入循环之前调用(loop.run_forever()),
# 否则会抛异常
asyncio.get_event_loop().run_until_complete(fun_1)
print("fun_1() is executed!")
在一些框架中,会将某些函数定义为协程(即通过async修饰),这些函数都是在某个地方通过create_task,或者ensure_future来进行调度的。
-
协程锁:协程之间也可能会有资源共享冲突。要防止资源共享冲突产生的数据一致性问题,需要使用asyncio.Lock。asyncio.Lock也遵从上下文管理协议。
-
协程睡眠:协程函数在执行中会占用本线程的全部CPU时间,除非遇到IO切换出去。因此,如果你在函数中使用sleep(),在多线程中,一个线程进入sleep状态,操作系统会切换到其它线程执行,整个程序仍然是可响应的(除了该线程,它必须等待睡眠状态结束);而对协程来说,同一loop中的其它协程都不会得到执行,因为这个sleep会占用本线程的全部执行时间,直到协程执行完毕。
上面的问题引出一个推论,也就是如果一个协程确实需要睡眠(比如某种定时任务),必须使用asyncio.sleep()
- 如果我们要通过asyncio来远程调用一个服务,应该如何封装呢?假设你使用的底层通讯的API是发送和接收分离的(一般比较靠近底层的API都是这样设计的),那么你会面临这样的问题:当你通过异步请求(比如send)发出API request后,服务器的响应可能是通过on_message这样的API来接收的。如何让程序在调用send之后,就能得到(形式上)返回结果,然后根据返回结果继续执行呢?
from typing import Dict
# 全局事件注册表。键为外发请求的track_id,该track_id需要服务器在响应请求时传回。
# 值为另一个dict,储存着对应的asyncio.Event和网络请求的返回结果。这里也可以使用list。
# 在强调性能的场合下,使用List[event: asyncio.Event, result: object]更好。
_events: Dict[str, Dict] = {
}
# 定义阻塞调用的协程
async def sync_call(request):
event = asyncio.Event()
track_id = str(uuid.uuid4())
_events[track_id] = {
"events": event,
"result": None
}
# 发送网络请求,以下仅为示例。具体网络请求要根据业务具体场景来替换。这一步一般是立即返回,
# 服务器并没有来得及准备好response
await aiohttp.request(...)
# L1: 阻塞地等待事件结果。当框架(或者你的网络例程)收到服务器返回结果时,根据track_id
# 找到对应的event,触发之
await event.wait()
# 获取结果,并做清理
response = _events[track_id].get("result")
_events.pop(track_id)
return response
# 在框架(或者你的网络例程)的消息接收处,比如on_message函数体中:
async def on_message(response):
# 如果服务器不传回track_id,则整个机制无法生效
track_id = response.get("track_id")
waited = _events.get(track_id)
if waited:
waited["result"] = response
waited["event"].set() # !这里唤醒在L1处等待执行的
不能再深挖了,毕竟大家都是第一次接触这个模块儿。
必须要再深挖,这里面包含了太多的后端设计思想,是一个很重要的模块儿。
但是不是在这篇里面深挖,过几天会再出一篇关于asyncio的底层原理的博客,欢迎大家关注。
所以,代码到底怎么写?!!!
我相信,看了这么久,还是没有几个人知道这玩意儿到底要怎么写代码。
说实话,换我看了这么多我也不知道啊。
没事儿啊,重在理解嘛,是吧。
协程可以做哪些事?
* 等待一个 future 结束
* 等待另一个协程(产生一个结果,或引发一个异常)
* 产生一个结果给正在等它的协程
* 引发一个异常给正在等它的协程
定义协程函数:
async def do_some_work(x): pass
验证某函数是否协程函数:
print(asyncio.iscoroutinefunction(do_some_work)) # True
await是什么情况:
async def do_some_work(x):
print("Waiting " + str(x))
await asyncio.sleep(x)
asyncio.sleep 也是一个协程,所以 await asyncio.sleep(x) 就是等待另一个协程。
看一下文档解释:
sleep(delay, result=None, *, loop=None)
Coroutine that completes after a given time (in seconds)
看不懂没关系,我现在也不懂。诶,就是玩儿。
运行协程:
调用协程函数,协程并不会开始运行,只是返回一个协程对象,还会引发一条警告。
要让这个协程对象运行的话,有两种方式:
* 在另一个已经运行的协程中用 `await` 等待它
* 通过 `ensure_future` 函数计划它的执行
下面先拿到当前线程缺省的 loop ,然后把协程对象交给 loop.run_until_complete,协程对象随后会在 loop 里得到运行。
loop = asyncio.get_event_loop()
loop.run_until_complete(do_some_work(5))
# 上面这行代码属于简写,完整写法是这样的:
# loop.run_until_complete(asyncio.ensure_future(do_some_work(3)))
# run_until_complete 的参数是一个 future,它在内部会通过 ensure_future 函数把协程对象包装成了 future。
接下来就比较抽象了,需要一定的基础了。
回调
假如协程是一个 IO 的读操作,我们希望知道它什么时候结束运行,以便下一步数据的处理。这一需求可以通过往 future 添加回调来实现。
def done_callback(futu):
print('Done')
futu = asyncio.ensure_future(do_some_work(3))
futu.add_done_callback(done_callback)
loop.run_until_complete(futu)
多协程
为了把多个协程交给 loop,需要借助 asyncio.gather 函数。
方法一:
loop.run_until_complete(asyncio.gather(do_some_work(3), do_some_work(5)))
方法二:
先把协程存在列表里
coros = [do_some_work(1), do_some_work(3)]
loop.run_until_complete(asyncio.gather(*coros))
这两个协程是并发运行的,所以等待的时间不是 1 + 3 = 4 秒,而是以耗时较长的那个协程为准。
关闭循环
loop 只要不关闭,就还可以再运行。但是如果关闭了,就不能再运行了。
建议调用 loop.close,以彻底清理 loop 对象防止误用。
这一篇就先到这里啦,至于asyncio再往底层走,这周会更新的啦,能看到这里的小伙伴不容易,需要多大的毅力啊。
不准备收藏一下吗?一次看这么多,怕是很难一次性消化掉吧。
转载:https://blog.csdn.net/qq_43762191/article/details/115910292