飞道的博客

Python异步并发机制详解,让你的代码运行效率就像搭上了火箭!!!

357人阅读  评论(0)


探究低层建筑:asyncio

Python由于全局锁(GIL)的存在,一直无法发挥多核的优势,其性能一直饱受诟病。
不过,在IO密集型的网络编程各种,异步处理比同步处理能够提升非常之高的速度。
而相对于其他语言,Python还有一个很明显的优势,那就是它的库很多啊!!!

Python3版本引入了async/await特性,其特点是:当执行过程中遇到IO请求的时候,可以将CPU资源出让,运行其他的任务;待IO完成之后,继续执行之前的任务。协程切换与线程切换比较类似,但协程切换更轻,不需要操作系统参与(没有栈切换操作,也没有用户态与内核态切换)。

同步/异步

在介绍协程之前,我还是再说一下同步和异步的概念,如果对这两个概念都混淆不清的话,下面的更不用说了。

==同步:串行。异步:并行。==不要被字面意思所迷惑。

同步是指完成事务的逻辑,先执行第一个事务,如果阻塞了,会一直等待,直到这个事务完成,再执行第二个事务,顺序执行。。。

异步是和同步相对的,异步是指在处理调用这个事务的之后,不会等待这个事务的处理结果,直接处理第二个事务去了,通过状态、通知、回调来通知调用者处理结果。


我再简单的介绍一下协程:

了解一下协程

协程,英文Coroutines,是一种比线程更加轻量级的存在。正如一个进程可以拥有多个线程一样,一个线程也可以拥有多个协程。

子程序,或者称为函数,在所有语言中都是层级调用,比如A调用B,B在执行过程中又调用了C,C执行完毕返回,B执行完毕返回,最后是A执行完毕。

所以子程序调用是通过栈实现的,一个线程就是执行一个子程序。

子程序调用总是一个入口,一次返回,调用顺序是明确的。而协程的调用和子程序不同。

协程看上去也是子程序,但执行过程中,在子程序内部可中断,然后转而执行别的子程序,在适当的时候再返回来接着执行。

注意,在一个子程序中中断,去执行其他子程序,不是函数调用,有点类似CPU的中断。比如子程序A、B:

def A():
    print '1'
    print '2'
    print '3'

def B():
    print 'x'
    print 'y'
    print 'z'

假设由协程执行,在执行A的过程中,可以随时中断,去执行B,B也可能在执行过程中中断再去执行A,结果可能是:

1 x 2 y 3 z

但是在A中是没有调用B的,所以协程的调用比函数调用理解起来要难一些。


相对于线程,协程的优势

最大的优势就是协程极高的执行效率。因为子程序切换不是线程切换,而是由程序自身控制,因此,没有线程切换的开销,和多线程比,线程数量越多,协程的性能优势就越明显。

第二大优势就是不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多。

因为协程是一个线程执行,那怎么利用多核CPU呢?最简单的方法是多进程+协程,既充分利用多核,又充分发挥协程的高效率,可获得极高的性能。


同步代码转异步代码

以下为一段同步代码:

import time

def hello():
    time.sleep(1)

def run():
    for i in range(5):
        hello()
        print('Hello World:%s' % time.time())  # 任何伟大的代码都是从Hello World 开始的!

run()

以下是一段异步代码:

import time
import asyncio

# 定义异步函数
async def hello():
    asyncio.sleep(1)
    print('Hello World:%s' % time.time())

def run():
    for i in range(5):
        loop.run_until_complete(hello())

loop = asyncio.get_event_loop()

run()

通过asyncio讲解协程

  1. 通过async def来定义一个协程函数,通过await来执行一个协程对象。协程对象、协程函数的概念如下所示:
async def func_1(): # 1. 定义了一个协程函数
    pass
    
async def func_2(): # 2. 注意要在函数内部调用协程函数,自身也必须定义为协程
    # 3. func_1()调用产生了一个协程对象,通过await来执行这个协程。如果不加await,
    # 直接以func_1()方式调用,则func_1中代码并不会执行。
    await func_1()  

async def 用来定义异步函数,其内部有异步操作。每个线程有一个事件循环,主线程调用asyncio.get_event_loop()时会创建事件循环,你需要把异步的任务丢给这个循环的run_until_complete()方法,事件循环会安排协同程序的执行。

  1. 一般情况下,无法在一个非协程函数中阻塞地调用另一个协程。但你可以通过asyncio.ensure_future()来异步执行这个协程:
import asyncio
async def fun_1(): # 1. 定义了一个协程函数
    pass
 
def bar():
    asyncio.ensure_future(fun_1()) # 这里fun_1()将会在某个时间执行,具体执行顺序未知
    
    # 这里是阻塞执行fun_1(),但这种调用,只能在event loop进入循环之前调用(loop.run_forever()),
    # 否则会抛异常
    asyncio.get_event_loop().run_until_complete(fun_1) 
    print("fun_1() is executed!")

在一些框架中,会将某些函数定义为协程(即通过async修饰),这些函数都是在某个地方通过create_task,或者ensure_future来进行调度的。

  1. 协程锁:协程之间也可能会有资源共享冲突。要防止资源共享冲突产生的数据一致性问题,需要使用asyncio.Lock。asyncio.Lock也遵从上下文管理协议。

  2. 协程睡眠:协程函数在执行中会占用本线程的全部CPU时间,除非遇到IO切换出去。因此,如果你在函数中使用sleep(),在多线程中,一个线程进入sleep状态,操作系统会切换到其它线程执行,整个程序仍然是可响应的(除了该线程,它必须等待睡眠状态结束);而对协程来说,同一loop中的其它协程都不会得到执行,因为这个sleep会占用本线程的全部执行时间,直到协程执行完毕。

上面的问题引出一个推论,也就是如果一个协程确实需要睡眠(比如某种定时任务),必须使用asyncio.sleep()

  1. 如果我们要通过asyncio来远程调用一个服务,应该如何封装呢?假设你使用的底层通讯的API是发送和接收分离的(一般比较靠近底层的API都是这样设计的),那么你会面临这样的问题:当你通过异步请求(比如send)发出API request后,服务器的响应可能是通过on_message这样的API来接收的。如何让程序在调用send之后,就能得到(形式上)返回结果,然后根据返回结果继续执行呢?
from typing import Dict
 
# 全局事件注册表。键为外发请求的track_id,该track_id需要服务器在响应请求时传回。
# 值为另一个dict,储存着对应的asyncio.Event和网络请求的返回结果。这里也可以使用list。
# 在强调性能的场合下,使用List[event: asyncio.Event, result: object]更好。
_events: Dict[str, Dict] = {
   }
 
# 定义阻塞调用的协程
async def sync_call(request):
  event = asyncio.Event()
  track_id = str(uuid.uuid4())
  _events[track_id] = {
   
    "events": event,
    "result": None
  }
   
  # 发送网络请求,以下仅为示例。具体网络请求要根据业务具体场景来替换。这一步一般是立即返回,
  # 服务器并没有来得及准备好response
  await aiohttp.request(...)
   
  # L1: 阻塞地等待事件结果。当框架(或者你的网络例程)收到服务器返回结果时,根据track_id
  # 找到对应的event,触发之
  await event.wait()
  
  # 获取结果,并做清理
  response = _events[track_id].get("result")
  _events.pop(track_id)
 
  return response
 
# 在框架(或者你的网络例程)的消息接收处,比如on_message函数体中:
async def on_message(response):
    # 如果服务器不传回track_id,则整个机制无法生效
    track_id = response.get("track_id")
     
    waited = _events.get(track_id)
    if waited:
        waited["result"] = response
        waited["event"].set()   # !这里唤醒在L1处等待执行的

不能再深挖了,毕竟大家都是第一次接触这个模块儿。
必须要再深挖,这里面包含了太多的后端设计思想,是一个很重要的模块儿。
但是不是在这篇里面深挖,过几天会再出一篇关于asyncio的底层原理的博客,欢迎大家关注。


所以,代码到底怎么写?!!!

我相信,看了这么久,还是没有几个人知道这玩意儿到底要怎么写代码。
说实话,换我看了这么多我也不知道啊。

没事儿啊,重在理解嘛,是吧。

协程可以做哪些事?

* 等待一个 future 结束
* 等待另一个协程(产生一个结果,或引发一个异常)
* 产生一个结果给正在等它的协程
* 引发一个异常给正在等它的协程

定义协程函数:

async def do_some_work(x): pass

验证某函数是否协程函数:

print(asyncio.iscoroutinefunction(do_some_work)) # True

await是什么情况:

async def do_some_work(x):
    print("Waiting " + str(x))
    await asyncio.sleep(x)

asyncio.sleep 也是一个协程,所以 await asyncio.sleep(x) 就是等待另一个协程。

看一下文档解释:

sleep(delay, result=None, *, loop=None)
Coroutine that completes after a given time (in seconds)

看不懂没关系,我现在也不懂。诶,就是玩儿。


运行协程:

调用协程函数,协程并不会开始运行,只是返回一个协程对象,还会引发一条警告。

要让这个协程对象运行的话,有两种方式:

* 在另一个已经运行的协程中用 `await` 等待它
* 通过 `ensure_future` 函数计划它的执行

下面先拿到当前线程缺省的 loop ,然后把协程对象交给 loop.run_until_complete,协程对象随后会在 loop 里得到运行。

loop = asyncio.get_event_loop()
loop.run_until_complete(do_some_work(5))
# 上面这行代码属于简写,完整写法是这样的:
# loop.run_until_complete(asyncio.ensure_future(do_some_work(3)))
# run_until_complete 的参数是一个 future,它在内部会通过 ensure_future 函数把协程对象包装成了 future。

接下来就比较抽象了,需要一定的基础了。

回调

假如协程是一个 IO 的读操作,我们希望知道它什么时候结束运行,以便下一步数据的处理。这一需求可以通过往 future 添加回调来实现。

def done_callback(futu):
    print('Done')

futu = asyncio.ensure_future(do_some_work(3))
futu.add_done_callback(done_callback)

loop.run_until_complete(futu)

多协程

为了把多个协程交给 loop,需要借助 asyncio.gather 函数。

方法一:

loop.run_until_complete(asyncio.gather(do_some_work(3), do_some_work(5)))

方法二:

先把协程存在列表里

coros = [do_some_work(1), do_some_work(3)]
loop.run_until_complete(asyncio.gather(*coros))

这两个协程是并发运行的,所以等待的时间不是 1 + 3 = 4 秒,而是以耗时较长的那个协程为准。


关闭循环

loop 只要不关闭,就还可以再运行。但是如果关闭了,就不能再运行了。
建议调用 loop.close,以彻底清理 loop 对象防止误用。


这一篇就先到这里啦,至于asyncio再往底层走,这周会更新的啦,能看到这里的小伙伴不容易,需要多大的毅力啊。
不准备收藏一下吗?一次看这么多,怕是很难一次性消化掉吧。


转载:https://blog.csdn.net/qq_43762191/article/details/115910292
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场