飞道的博客

c++11实现一个线程池子

204人阅读  评论(0)

对象池模式在软件中广泛使用,例如,线程池,连接池,内存池等。Boost库中的pool实现了内存池,thread_group实现了简单的线程池。

以下实现的线程池与boost无关,只是提供一种思路。


Task类:

对应要执行的任务Task::run, 任务入参Task::mParam.


  
  1. class Task {
  2. public:
  3. Task( int val= 0):
  4. mParam(val)
  5. {
  6. }
  7. virtual ~Task() {
  8. }
  9. void setParam(int val) {
  10. mParam = val;
  11. }
  12. void run() {
  13. thread::id threadId = std::this_thread::get_id();
  14. cout << "param = " << mParam << " ThreadId = " << threadId << endl;
  15. string data = "param = " + std::to_string(mParam)+ " ThreadId = "+ getThreadIdOfString(threadId) + "\n";
  16. myLogger.logData(data);
  17. }
  18. private:
  19. int mParam;
  20. };

 

threadPool类:

成员变量vector<shared_ptr<thread>> threadCache保存构造函数中开辟的线程;构造函数中开辟线程,各个线程的入口函数都是threadFunc,如果任务队列为空则线程bolck,等待任务。

std::queue<Task> mTasksQueue保存外界传入的要执行的task.

std::condition_variable mCondVar用于当任务队列mTasksQueue不为空时,唤醒一个线程,从队列头取出一个task执行。

stopAllThreads()用于停止所有线程;

joinThreads()用于等待所有线程结束,在析构函数中被调用。

Note:

threadFunc函数中的线程唤醒条件while (mTasksQueue.empty() && !mReceStopOrder), 不加mReceStopOrder的判断,易导致stopAllThreads()可能不起作用(调用stopAllThreads函数时,taskQueue已经为空)。


  
  1. //c++实现线程池:
  2. class threadPool {
  3. public:
  4. threadPool( int maxThreadsNum= 1):
  5. mMaxThreadsNum(maxThreadsNum){
  6. mReceStopOrder = false;
  7. //creat threads:
  8. for ( int i = 0; i < mMaxThreadsNum; ++i) {
  9. shared_ptr<thread> th = std::make_shared<thread>(&threadPool::threadFunc, this); //Ok
  10. //shared_ptr<thread> th = std::make_shared<thread>(std::bind(&threadPool::threadFunc, this)); //Ok
  11. //shared_ptr<thread> th = std::make_shared<thread>(std::bind(&threadPool::threadFunc, *this));//build error
  12. //newThread.detach();
  13. threadCache.push_back(th);
  14. }
  15. }
  16. void joinThreads() {
  17. for ( auto& it : threadCache) {
  18. it->join();
  19. }
  20. }
  21. virtual ~threadPool() {
  22. joinThreads(); //等待所有线程结束
  23. }
  24. void pushTask(Task task) {
  25. std::lock_guard<std::mutex> lk(mMux);
  26. mTasksQueue.push(task);
  27. mCondVar.notify_one();
  28. }
  29. //构造函数中开辟的多个线程的入口
  30. void threadFunc() {
  31. while ( true)
  32. {
  33. //终止函数,停止线程
  34. if (mReceStopOrder) {
  35. myLogger.logData( "break---");
  36. break;
  37. }
  38. std::unique_lock<std::mutex> lk(mMux);
  39. while (mTasksQueue.empty() && !mReceStopOrder)
  40. {
  41. mCondVar.wait(lk);
  42. }
  43. if (mReceStopOrder) {
  44. myLogger.logData( "break");
  45. break;
  46. }
  47. //get a task and execute it:
  48. Task task = mTasksQueue.front();
  49. mTasksQueue.pop();
  50. lk.unlock(); //显式地解锁.
  51. task.run();
  52. }
  53. thread::id threadId = std::this_thread::get_id();
  54. string data = "ThreadId "+ getThreadIdOfString(threadId) + " exit \n";
  55. myLogger.logData(data);
  56. }
  57. void stopAllThreads() {
  58. myLogger.logData( "stopAllThreads was called!");
  59. mReceStopOrder = true;
  60. mCondVar.notify_all(); //可能有的线程处于wait状态
  61. }
  62. private:
  63. unsigned int mMaxThreadsNum;
  64. std:: queue<Task> mTasksQueue; //缓存各个任务
  65. vector< shared_ptr<thread>> threadCache; //保存开辟的各个线程
  66. std::mutex mMux;
  67. std::condition_variable mCondVar; //当任务队列mTasksQueue不为空时,唤醒一个线程,从队列头取出一个task执行。
  68. std::atomic< bool> mReceStopOrder; //用于控制线程的停止
  69. };

logger类 & std::thread::id转换为string的函数

log数据到文件,如果用屏幕打印的话,由于多线程,容易出现穿插打印。


  
  1. std::string getThreadIdOfString(const std::thread::id& id)
  2. {
  3. std:: stringstream ss; //#include<sstream>
  4. ss << id;
  5. return ss.str();
  6. }
  7. class logger {
  8. public:
  9. logger( string path = "") :filePath(path) {
  10. ofs.open(filePath, ios::out);
  11. }
  12. virtual ~logger() {
  13. ofs.close();
  14. }
  15. void logData(string a) {
  16. std::lock_guard<mutex> lkguard(mMut);
  17. ofs << a << endl;
  18. }
  19. private:
  20. mutex mMut;
  21. string filePath = "";
  22. ofstream ofs;
  23. };
  24. logger myLogger("C:\\Users\\xxx\\Desktop\\goodVideo\\1.txt"); //全局变量

 

Test:


  
  1. #include <iostream>
  2. #include <stack>
  3. #include <vector>
  4. #include <list>
  5. #include <unordered_map>
  6. #include <queue>
  7. #include <functional>
  8. #include <mutex>
  9. #include <atomic>
  10. #include <thread>
  11. #include <string>
  12. #include <fstream>
  13. #include <sstream>
  14. #include <memory>
  15. int main() {
  16. threadPool myThreadPools(5);
  17. for ( int i = 0; i < 5; i++) {
  18. Task myTempTask(i);
  19. myThreadPools.pushTask(myTempTask);
  20. std::this_thread::sleep_for( std::chrono::milliseconds( 1));
  21. }
  22. std::this_thread::sleep_for( std::chrono::seconds( 2));
  23. myThreadPools.stopAllThreads();
  24. }


Related:

https://blog.csdn.net/FlushHip/article/details/81902188  -- thread_id

https://blog.csdn.net/FlushHip/article/details/81902188 -- good idea and implemented with linux api


转载:https://blog.csdn.net/qq_35865125/article/details/115773774
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场