飞道的博客

比物理线程都好用的C++20的协程,你会用吗?

276人阅读  评论(0)
摘要:事件驱动(event driven)是一种常见的代码模型,其通常会有一个主循环(mainloop)不断的从队列中接收事件,然后分发给相应的函数/模块处理。常见使用事件驱动模型的软件包括图形用户界面(GUI),嵌入式设备软件,网络服务端等。

本文分享自华为云社区《C++20的协程在事件驱动代码中的应用》,原文作者:飞得乐 。

嵌入式事件驱动代码的难题

事件驱动event driven)是一种常见的代码模型,其通常会有一个主循环mainloop)不断的从队列中接收事件,然后分发给相应的函数/模块处理。常见使用事件驱动模型的软件包括图形用户界面(GUI),嵌入式设备软件,网络服务端等。

本文以一个高度简化的嵌入式处理模块做为事件驱动代码的例子:假设该模块需要处理用户命令、外部消息、告警等各种事件,并在主循环中进行分发,那么示例代码如下:


  
  1. #include <iostream>
  2. #include <vector>
  3. enum class EventType {
  4. COMMAND,
  5. MESSAGE,
  6. ALARM
  7. };
  8. // 仅用于模拟接收的事件序列
  9. std:: vector<EventType> g_events{EventType::MESSAGE, EventType::COMMAND, EventType::MESSAGE};
  10. void ProcessCmd()
  11. {
  12. std:: cout << "Processing Command" << std:: endl;
  13. }
  14. void ProcessMsg()
  15. {
  16. std:: cout << "Processing Message" << std:: endl;
  17. }
  18. void ProcessAlm()
  19. {
  20. std:: cout << "Processing Alarm" << std:: endl;
  21. }
  22. int main()
  23. {
  24. for ( auto event : g_events) {
  25. switch (event) {
  26. case EventType::COMMAND:
  27. ProcessCmd();
  28. break;
  29. case EventType::MESSAGE:
  30. ProcessMsg();
  31. break;
  32. case EventType::ALARM:
  33. ProcessAlm();
  34. break;
  35. }
  36. }
  37. return 0;
  38. }

这只是一个极简的模型示例,真实的代码要远比它复杂得多,可能还会包含:从特定接口获取事件,解析不同的事件类型,使用表驱动方法进行分发……不过这些和本文关系不大,可暂时先忽略。

用顺序图表示这个模型,大体上是这样:

在实际项目中,常常碰到的一个问题是:有些事件的处理时间很长,比如某个命令可能需要批量的进行上千次硬件操作:


  
  1. void ProcessCmd()
  2. {
  3. for ( int i{ 0}; i < 1000; ++i) {
  4. // 操作硬件接口……
  5. }
  6. }

这种事件处理函数会长时间的阻塞主循环,导致其他事件一直排队等待。如果所有事件对响应速度都没有要求,那也不会造成问题。但是实际场景中经常会有些事件是需要及时响应的,比如某些告警事件出现后,需要很快的执行业务倒换,否则就会给用户造成损失。这个时候,处理时间很长的事件就会产生问题。

有人会想到额外增加一个线程专用于处理高优先级事件,实践中这确实是个常用方法。然而在嵌入式系统中,事件处理函数会读写很多公共数据结构,还会操作硬件接口,如果并发调用,极容易导致各类数据竞争和硬件操作冲突,而且这些问题常常很难定位和解决。那在多线程的基础上加锁呢?——设计哪些锁,加在哪些地方,也是非常烧脑而且容易出错的工作,如果互斥等待过多,还会影响性能,甚至出现死锁等麻烦的问题。

另一种解决方案是:把处理时间很长的任务切割成很多个小任务,并重新加入到事件队列中。这样就不会长时间的阻塞主循环。这个方案避免了并发编程产生的各种头疼问题,但是却带来另一个难题:如何把一个大流程切割成很多独立小流程?在编码时,这需要程序员解析函数流程的所有上下文信息,设计数据结构单独存储,并建立关联这些数据结构的特殊事件。这往往会带来几倍的额外代码量和工作量。

这个问题几乎在所有事件驱动型软件中都会存在,但在嵌入式软件中尤为突出。这是因为嵌入式环境下的CPU、线程等资源受限,而实时性要求高,并发编程受限。

C++20语言给这个问题提供了一种新的解决方案:协程。

C++20的协程简介

关于协程coroutine)是什么,在wikipedia[1]等资料中有很好的介绍,本文就不赘述了。在C++20中,协程的关键字只是语法糖:编译器会将函数执行的上下文(包括局部变量等)打包成一个对象,并让未执行完的函数先返回给调用者。之后,调用者使用这个对象,可以让函数从原来的“断点”处继续往下执行。

使用协程,编码时就不再需要费心费力的去把函数“切割”成多个小任务,只用按照习惯的流程写函数内部代码,并在允许暂时中断执行的地方加上co_yield语句,编译器就可以将该函数处理为可“分段执行”。

协程用起来的感觉有点像线程切换,因为函数的栈帧stack frame)被编译器保存成了对象,可以随时恢复出来接着往下运行。但是实际执行时,协程其实还是单线程顺序运行的,并没有物理线程切换,一切都只是编译器的“魔法”。所以用协程可以完全避免多线程切换的性能开销以及资源占用,也不用担心数据竞争等问题。

可惜的是,C++20标准只提供了协程基础机制,并未提供真正实用的协程库(在C++23中可能会改善)。目前要用协程写实际业务的话,可以借助开源库,比如著名的cppcoro[2]。然而对于本文所述的场景,cppcoro也没有直接提供对应的工具(generator经过适当的包装可以解决这个问题,但是不太直观),因此我自己写了一个切割任务的协程工具类用于示例。

自定义的协程工具

下面是我写的SegmentedTask工具类的代码。这段代码看起来相当复杂,但是它作为可重用的工具存在,没有必要让程序员都理解它的内部实现,一般只要知道它怎么用就行了。SegmentedTask的使用很容易:它只有3个对外接口:ResumeIsFinishedGetReturnValue,其功能可根据接口名字自解释。


  
  1. #include <optional>
  2. #include <coroutine>
  3. template< typename T>
  4. class SegmentedTask {
  5. public:
  6. struct promise_type {
  7. SegmentedTask<T> get_return_object()
  8. {
  9. return SegmentedTask{Handle::from_promise(* this)};
  10. }
  11. static std::suspend_never initial_suspend() noexcept { return {}; }
  12. static std::suspend_always final_suspend() noexcept { return {}; }
  13. std::suspend_always yield_value(std::nullopt_t) noexcept { return {}; }
  14. std::suspend_never return_value(T value) noexcept
  15. {
  16. returnValue = value;
  17. return {};
  18. }
  19. static void unhandled_exception() { throw; }
  20. std::optional<T> returnValue;
  21. };
  22. using Handle = std::coroutine_handle<promise_type>;
  23. explicit SegmentedTask(const Handle coroutine) : coroutine{coroutine} {}
  24. ~SegmentedTask()
  25. {
  26. if (coroutine) {
  27. coroutine.destroy();
  28. }
  29. }
  30. SegmentedTask( const SegmentedTask&) = delete;
  31. SegmentedTask& operator=( const SegmentedTask&) = delete;
  32. SegmentedTask(SegmentedTask&& other) noexcept : coroutine(other.coroutine) { other.coroutine = {}; }
  33. SegmentedTask& operator=(SegmentedTask&& other) noexcept
  34. {
  35. if ( this != &other) {
  36. if (coroutine) {
  37. coroutine.destroy();
  38. }
  39. coroutine = other.coroutine;
  40. other.coroutine = {};
  41. }
  42. return * this;
  43. }
  44. void Resume() const { coroutine.resume(); }
  45. bool IsFinished() const { return coroutine.promise().returnValue.has_value(); }
  46. T GetReturnValue() const { return coroutine.promise().returnValue.value(); }
  47. private:
  48. Handle coroutine;
  49. };

自己编写协程的工具类不光需要深入了解C++协程机制,而且很容易产生悬空引用等未定义行为。因此强烈建议项目组统一使用编写好的协程类。如果读者想深入学习协程工具的编写方法,可以参考Rainer Grimm的博客文章[3]。

接下来,我们使用SegmentedTask来改造前面的事件处理代码。当一个C++函数中使用了co_awaitco_yieldco_return中的任何一个关键字时,这个函数就变成了协程,其返回值也会变成对应的协程工具类。在示例代码中,需要内层函数提前返回时,使用的是co_yield。但是C++20的co_yield后必须跟随一个表达式,这个表达式在示例场景下并没必要,就用了std::nullopt让其能编译通过。实际业务环境下,co_yield可以返回一个数字或者对象用于表示当前任务执行的进度,方便外层查询。

协程不能使用普通return语句,必须使用co_return来返回值,而且其返回类型也不直接等同于co_return后面的表达式类型。


  
  1. enum class EventType {
  2. COMMAND,
  3. MESSAGE,
  4. ALARM
  5. };
  6. std:: vector<EventType> g_events{EventType::COMMAND, EventType::ALARM};
  7. std::optional<SegmentedTask< int>> suspended; // 没有执行完的任务保存在这里
  8. SegmentedTask<int> ProcessCmd()
  9. {
  10. for ( int i{ 0}; i < 10; ++i) {
  11. std:: cout << "Processing step " << i << std:: endl;
  12. co_yield std::nullopt;
  13. }
  14. co_return 0;
  15. }
  16. void ProcessMsg()
  17. {
  18. std:: cout << "Processing Message" << std:: endl;
  19. }
  20. void ProcessAlm()
  21. {
  22. std:: cout << "Processing Alarm" << std:: endl;
  23. }
  24. int main()
  25. {
  26. for ( auto event : g_events) {
  27. switch (event) {
  28. case EventType::COMMAND:
  29. suspended = ProcessCmd();
  30. break;
  31. case EventType::MESSAGE:
  32. ProcessMsg();
  33. break;
  34. case EventType::ALARM:
  35. ProcessAlm();
  36. break;
  37. }
  38. }
  39. while (suspended.has_value() && !suspended->IsFinished()) {
  40. suspended->Resume();
  41. }
  42. if (suspended.has_value()) {
  43. std:: cout << "Final return: " << suspended->GetReturnValue() << endl;
  44. }
  45. return 0;
  46. }

出于让示例简单的目的,事件队列中只放入了一个COMMAND和一个ALARMCOMMAND是可以分段执行的协程,执行完第一段后,主循环会优先执行队列中剩下的事件,最后再来继续执行COMMAND余下的部分。实际场景下,可根据需要灵活选择各种调度策略,比如专门用一个队列存放所有未执行完的分段任务,并在空闲时依次执行。

本文中的代码使用gcc 10.3版本编译运行,编译时需要同时加上-std=c++20-fcoroutines两个参数才能支持协程。代码运行结果如下:


  
  1. Processing step 0
  2. Processing Alarm
  3. Processing step 1
  4. Processing step 2
  5. Processing step 3
  6. Processing step 4
  7. Processing step 5
  8. Processing step 6
  9. Processing step 7
  10. Processing step 8
  11. Processing step 9
  12. Final return: 0

可以看到ProcessCmd函数(协程)的for循环语句并没有一次执行完,在中间插入了ProcessAlm的执行。如果分析运行线程还会发现,整个过程中并没有物理线程的切换,所有代码都是在同一个线程上顺序执行的。

使用了协程的顺序图变成了这样:

事件处理函数的执行时间长不再是问题,因为可以中途“插入”其他的函数运行,之后再返回断点继续向下运行。

总结

一个较普遍的认识误区是:使用多线程可以提升软件性能。但事实上,只要CPU没有空跑,那么当物理线程数超过了CPU核数,就不再会提升性能,相反还会由于线程的切换开销而降低性能。大多数开发实践中,并发编程的主要好处并非为了提升性能,而是为了编码的方便,因为现实中的场景模型很多都是并发的,容易直接对应成多线程代码。

协程可以像多线程那样方便直观的编码,但是同时又没有物理线程的开销,更没有互斥、同步等并发编程中令人头大的设计负担,在嵌入式应用等很多场景下,常常是比物理线程更好的选择。

相信随着C++20的逐步普及,协程将来会得到越来越广泛的使用。

尾注

[1] https://en.wikipedia.org/wiki/Coroutine
[2] https://github.com/lewissbaker/cppcoro
[3] https://www.modernescpp.com/index.php/tag/coroutines

 

点击关注,第一时间了解华为云新鲜技术~


转载:https://blog.csdn.net/devcloud/article/details/116918105
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场