对象池模式在软件中广泛使用,例如,线程池,连接池,内存池等。Boost库中的pool实现了内存池,thread_group实现了简单的线程池。
以下实现的线程池与boost无关,只是提供一种思路。
Task类:
对应要执行的任务Task::run, 任务入参Task::mParam.
-
class Task {
-
public:
-
Task(
int val=
0):
-
mParam(val)
-
{
-
-
}
-
virtual ~Task() {
-
-
}
-
void setParam(int val) {
-
mParam = val;
-
}
-
void run() {
-
thread::id threadId =
std::this_thread::get_id();
-
cout <<
"param = " << mParam <<
" ThreadId = " << threadId <<
endl;
-
string data =
"param = " +
std::to_string(mParam)+
" ThreadId = "+ getThreadIdOfString(threadId) +
"\n";
-
myLogger.logData(data);
-
}
-
private:
-
int mParam;
-
};
threadPool类:
成员变量vector<shared_ptr<thread>> threadCache保存构造函数中开辟的线程;构造函数中开辟线程,各个线程的入口函数都是threadFunc,如果任务队列为空则线程bolck,等待任务。
std::queue<Task> mTasksQueue保存外界传入的要执行的task.
std::condition_variable mCondVar用于当任务队列mTasksQueue不为空时,唤醒一个线程,从队列头取出一个task执行。
stopAllThreads()用于停止所有线程;
joinThreads()用于等待所有线程结束,在析构函数中被调用。
Note:
threadFunc函数中的线程唤醒条件while (mTasksQueue.empty() && !mReceStopOrder), 不加mReceStopOrder的判断,易导致stopAllThreads()可能不起作用(调用stopAllThreads函数时,taskQueue已经为空)。
-
//c++实现线程池:
-
class threadPool {
-
public:
-
threadPool(
int maxThreadsNum=
1):
-
mMaxThreadsNum(maxThreadsNum){
-
mReceStopOrder =
false;
-
//creat threads:
-
for (
int i =
0; i < mMaxThreadsNum; ++i) {
-
shared_ptr<thread> th =
std::make_shared<thread>(&threadPool::threadFunc,
this);
//Ok
-
//shared_ptr<thread> th = std::make_shared<thread>(std::bind(&threadPool::threadFunc, this)); //Ok
-
//shared_ptr<thread> th = std::make_shared<thread>(std::bind(&threadPool::threadFunc, *this));//build error
-
-
//newThread.detach();
-
threadCache.push_back(th);
-
}
-
}
-
-
void joinThreads() {
-
for (
auto& it : threadCache) {
-
it->join();
-
}
-
}
-
virtual ~threadPool() {
-
joinThreads();
//等待所有线程结束
-
}
-
void pushTask(Task task) {
-
-
std::lock_guard<std::mutex> lk(mMux);
-
mTasksQueue.push(task);
-
mCondVar.notify_one();
-
}
-
//构造函数中开辟的多个线程的入口
-
void threadFunc() {
-
while (
true)
-
{
-
//终止函数,停止线程
-
if (mReceStopOrder) {
-
myLogger.logData(
"break---");
-
break;
-
}
-
std::unique_lock<std::mutex> lk(mMux);
-
while (mTasksQueue.empty() && !mReceStopOrder)
-
{
-
mCondVar.wait(lk);
-
}
-
if (mReceStopOrder) {
-
myLogger.logData(
"break");
-
break;
-
}
-
//get a task and execute it:
-
Task task = mTasksQueue.front();
-
mTasksQueue.pop();
-
lk.unlock();
//显式地解锁.
-
task.run();
-
}
-
-
thread::id threadId =
std::this_thread::get_id();
-
string data =
"ThreadId "+ getThreadIdOfString(threadId) +
" exit \n";
-
myLogger.logData(data);
-
}
-
void stopAllThreads() {
-
myLogger.logData(
"stopAllThreads was called!");
-
mReceStopOrder =
true;
-
mCondVar.notify_all();
//可能有的线程处于wait状态
-
}
-
-
private:
-
unsigned
int mMaxThreadsNum;
-
std::
queue<Task> mTasksQueue;
//缓存各个任务
-
vector<
shared_ptr<thread>> threadCache;
//保存开辟的各个线程
-
std::mutex mMux;
-
std::condition_variable mCondVar;
//当任务队列mTasksQueue不为空时,唤醒一个线程,从队列头取出一个task执行。
-
std::atomic<
bool> mReceStopOrder;
//用于控制线程的停止
-
};
logger类 & std::thread::id转换为string的函数:
log数据到文件,如果用屏幕打印的话,由于多线程,容易出现穿插打印。
-
-
-
std::string getThreadIdOfString(const std::thread::id& id)
-
{
-
std::
stringstream ss;
//#include<sstream>
-
ss << id;
-
return ss.str();
-
}
-
-
-
class logger {
-
public:
-
logger(
string path =
"") :filePath(path) {
-
ofs.open(filePath, ios::out);
-
}
-
-
virtual ~logger() {
-
ofs.close();
-
}
-
-
void logData(string a) {
-
std::lock_guard<mutex> lkguard(mMut);
-
ofs << a <<
endl;
-
}
-
private:
-
mutex mMut;
-
string filePath =
"";
-
ofstream ofs;
-
};
-
logger myLogger("C:\\Users\\xxx\\Desktop\\goodVideo\\1.txt");
//全局变量
Test:
-
#include <iostream>
-
#include <stack>
-
#include <vector>
-
#include <list>
-
#include <unordered_map>
-
#include <queue>
-
#include <functional>
-
#include <mutex>
-
#include <atomic>
-
#include <thread>
-
#include <string>
-
#include <fstream>
-
#include <sstream>
-
#include <memory>
-
-
int main() {
-
-
threadPool myThreadPools(5);
-
for (
int i =
0; i <
5; i++) {
-
Task myTempTask(i);
-
myThreadPools.pushTask(myTempTask);
-
std::this_thread::sleep_for(
std::chrono::milliseconds(
1));
-
}
-
-
-
std::this_thread::sleep_for(
std::chrono::seconds(
2));
-
-
myThreadPools.stopAllThreads();
-
-
}
Related:
https://blog.csdn.net/FlushHip/article/details/81902188 -- thread_id
https://blog.csdn.net/FlushHip/article/details/81902188 -- good idea and implemented with linux api
转载:https://blog.csdn.net/qq_35865125/article/details/115773774