小言_互联网的博客

C++线程间的互斥和通信

506人阅读  评论(0)

互斥锁(mutex)

为了更好地理解,互斥锁,我们可以首先来看这么一个应用场景:模拟车站卖票。

模拟车站卖票

场景说明:
Yang车站售卖从亚特兰蒂斯到古巴比伦的时光飞船票;因为机会难得,所以票数有限,一经发售,谢绝补票。
飞船票总数:100张;
售卖窗口:3个。
对于珍贵的飞船票来说,这个资源是互斥的,比如第100张票,只能卖给一个人,不可能同时卖给两个人。3个窗口都有权限去售卖飞船票(唯一合法途径)。

不加锁的结果

根据场景说明,我们可以很快地分析如下:
可以使用三个线程来模拟三个独立的窗口同时进行卖票;
定义一个全局变量,每当一个窗口卖出一张票,就对这个变量进行减减操作。
故写出如下代码:

#include <iostream>
#include <thread>
#include <list>
using namespace std;

int tickets = 100; // 车站剩余票数总数

void sellTickets(int win)
{
   
	while (tickets > 0)
	{
   
		{
   
			if (tickets > 0)
			{
   
				cout << "窗口:" << win << " 卖出了第:" << tickets << "张票!" << endl;
				tickets--;
			}
			std::this_thread::sleep_for(std::chrono::microseconds(400));
		}
	}
}

int main()
{
   
	list<std::thread> tlist;
	for (int i = 1; i <= 3; ++i)
	{
   
		tlist.push_back(std::thread(sellTickets, i));
	}
	for (std::thread& t : tlist)
	{
   
		t.join();
	}
	cout << "所有窗口卖票结束!" << endl;

	return 0;
}

运行结果如下:

通过运行,我们可以发现问题
对于一张票来说,卖出去了多次!
这不白嫖吗???这合适吗?
原因也很简单,对于线程来说,谁先执行,谁后执行,完全是根据CPU的调度,根本不可能掌握清楚。
所以,这个代码是线程不安全的!
那,怎么解决呢?
当然是:互斥锁了!

加锁后的结果

我们对上述代码做出如下修改:

#include <iostream>
#include <thread>
#include <list>
#include <mutex>
using namespace std;

int tickets = 100; 
std::mutex mtx;

void sellTickets(int win)
{
   
	while (tickets > 0)
	{
   
		{
   
			lock_guard<std::mutex> lock(mtx);
			if (tickets > 0)
			{
   
				cout << "窗口:" << win << " 卖出了第:" << tickets << "张票!" << endl;
				tickets--;
			}
			std::this_thread::sleep_for(std::chrono::microseconds(400));
		}
	}
}

int main()
{
   
	list<std::thread> tlist;
	for (int i = 1; i <= 3; ++i)
	{
   
		tlist.push_back(std::thread(sellTickets, i));
	}
	for (std::thread& t : tlist)
	{
   
		t.join();
	}
	cout << "所有窗口卖票结束!" << endl;

	return 0;
}

首先定义了一个全局的互斥锁std::mutex mtx;接着在对票数tickets进行减减操作时,定义了lock_guard,这个就相当于智能指针scoped_ptr一样,可以出了作用域自动释放锁资源。
运行结果如下:

我们可以看到这一次,就没问题了。

简单总结

互斥锁的使用可以有三种:
(首先都需要在全局定义互斥锁std::mutex mtx

  1. 首先可以直接在需要加锁和解锁的地方,手动进行:加锁mtx.lock()、解锁mtx.unlock()
  2. 可以在需要加锁的地方定义保护锁lock_guard<std::mutex> lock(mtx),这个锁在定义的时候自动上锁,出了作用域自动解锁。(其实就是借助了智能指针的思想,定义对象出调用构造函数底层调用lock(),出了作用域调用析构函数底层调用unlock());
  3. 可以在需要加锁的地方定义唯一锁unique_lock<std::mutex> lock(mtx),这个锁和保护锁类似,但是比保护锁更加好用。(可以类比智能指针中的scoped_ptrunique_ptr的区别,二者都是将拷贝构造和赋值重载函数删除了,但是unique_ptrunique_lock都定义了带有右值引用的拷贝构造和赋值)

条件变量(conditon_variable)

如果说,互斥锁是为了解决线程间互斥的问题,那么,条件变量就是为了解决线程间通信的问题。
同样的,我们可以首先来看一个问题(模型):

生产者消费者线程模型

生产者消费者线程模型是一个很经典的线程模型;
首先会有两个线程,一个是生产者,一个是消费者,生产者只负责生产资源,消费者只负责消费资源。

产生问题

根据上述互斥锁的理解,我们可以写出如下代码:

#include <iostream>
#include <thread>
#include <mutex>
#include <queue>
using namespace std;

std::mutex mtx; 

class Queue
{
   
public:
	void put(int num)
	{
   
		lock_guard<std::mutex> lock(mtx);
		que.push(num);
		cout << "生产者,生产了:" << num << "号产品" << endl;
	}
	void get()
	{
   
		lock_guard<std::mutex> lock(mtx);
		int val = que.front();
		que.pop();
		cout << "消费者,消费了:" << val << "号产品" << endl;
	}
private:
	queue<int> que;
};

void producer(Queue* que)
{
   
	for (int i = 0; i < 10; ++i)
	{
   
		que->put(i);
		std::this_thread::sleep_for(std::chrono::milliseconds(200));
	}
}

void consumer(Queue* que)
{
   
	for (int i = 0; i < 10; ++i)
	{
   
		que->get();
		std::this_thread::sleep_for(std::chrono::milliseconds(200));
	}
}
int main()
{
   
	Queue que;

	std::thread t1(producer, &que);
	std::thread t2(consumer, &que);

	t1.join();
	t2.join();

	return 0;
}

同样的,我们定义了两个线程:t1t2分别作为生产者消费者,并且定义了两个线程函数producerconsumer,这两个函数接受一个Queue*的参数,并且通过这个指针调用putget方法,这两个方法就是往资源队列里面执行入队和出队操作
运行结果如下:

我们会发现,出错了。
多运行几次试试:


我们发现,每次运行的结果还都不一样,但是都会出现系统崩溃的问题。
仔细来看这个错误原因:

我们再想想这个代码的逻辑:
一个生产者只负责生产;
一个消费者只负责消费;
他们共同在队列里面存取资源;
存取资源操作本身是互斥的。
发现问题了吗?
这两个线程之间彼此的操作独立,换句话说,
没有通信
生产者生产的时候,消费者不知道;
消费者消费的时候,生产者也不知道;
但是消费者是要从队列里面取资源的,如果某一个时刻,队列里为空了,它就不能取了!

解决问题

分析完问题之后,我们知道了:
问题出在:没有通信上面。
那么如何解决通信问题呢?
当然就是:条件变量了!
我们做出如下代码的修改:

#include <iostream>
#include <thread>
#include <mutex>
#include <queue>
#include <condition_variable>
using namespace std;

std::mutex mtx; // 互斥锁,用于线程间互斥
std::condition_variable cv;// 条件变量,用于线程间通信

class Queue
{
   
public:
	void put(int num)
	{
   
		unique_lock<std::mutex> lck(mtx);
		while (!que.empty())
		{
   
			cv.wait(lck);
		}
		que.push(num);
		cv.notify_all();
		cout << "生产者,生产了:" << num << "号产品" << endl;
	}
	void get()
	{
   
		unique_lock<std::mutex> lck(mtx);
		while (que.empty())
		{
   
			cv.wait(lck);
		}
		int val = que.front();
		que.pop();
		cv.notify_all();
		cout << "消费者,消费了:" << val << "号产品" << endl;
	}
private:
	queue<int> que;
};

void producer(Queue* que)
{
   
	for (int i = 0; i < 10; ++i)
	{
   
		que->put(i);
		std::this_thread::sleep_for(std::chrono::milliseconds(200));
	}
}

void consumer(Queue* que)
{
   
	for (int i = 0; i < 10; ++i)
	{
   
		que->get();
		std::this_thread::sleep_for(std::chrono::milliseconds(200));
	}
}
int main()
{
   
	Queue que;

	std::thread t1(producer, &que);
	std::thread t2(consumer, &que);

	t1.join();
	t2.join();

	return 0;
}

这个时候我们再来看运行结果:

这个时候就是:
生产一个、消费一个。

原子类型(atomic)

我们前面遇到线程不安全的问题,主要是因为涉及++--操作的时候,有可能被其他的线程干扰,所以使用了互斥锁
只允许得到的线程进行操作;
其他没有得到的线程只能眼巴巴的干看着。
但是,对于互斥锁来说,它是比较重的,它对于临界区代码做的事情比较复杂。
简单来说,如果只是为了++--这样的简单操作互斥的话,使用互斥锁,就有点杀鸡用牛刀的意味了。
那么有没有比互斥锁更加轻量的,并且能够解决问题的呢?
当然有,就是我们要说的原子类型

简单使用

我们可以简单设置一个场景:
定义十个线程,对一个公有的变量myCount进行task的操作,该操作是对变量进行100次的++
所以,如果顺利,我们会最终得到myCount = 1000
代码如下:

#include <iostream>
#include <thread>
#include <atomic>
#include <list>

volatile std::atomic_bool isReady = false;
volatile std::atomic_int myCount = 0;

void task()
{
   
	while (!isReady)
	{
   
		// 线程让出当前的CPU时间片,等待下一次调度
		std::this_thread::yield();
	}

	for (int i = 0; i < 100; ++i)
	{
   
		myCount++;
	}
}

int main()
{
   
	std::list<std::thread> tlist;
	for (int i = 0; i < 10; ++i)
	{
   
		tlist.push_back(std::thread(task));
	}

	std::this_thread::sleep_for(std::chrono::milliseconds(200));
	isReady = true;

	for (std::thread& it : tlist)
	{
   
		it.join();
	}
	std::cout << "myCount:" << myCount << std::endl;

	return 0;
}

运行结果如下:

改良车站卖票

对于原子类型来说,使用方法非常简单:
首先包含头文件:#include <atomic>
接着把需要原子操作的变量定义为对应的原子类型就好:
bool -> atomic_bool;
int -> atomic_int;
其他同理。
理解了这个以后,我们可以使用原子类型对我们的车站卖票进行改良:

#include <iostream>
#include <thread>
#include <list>
#include <mutex>
#include <atomic>
using namespace std;

std::atomic_int tickets = 100; // 车站剩余票数总数

void sellTickets(int win)
{
   
	while (tickets > 0)
	{
   
		tickets--;
		cout << "窗口:" << win << " 卖出了第:" << tickets << "张票!" << endl;
	}
}

int main()
{
   
	list<std::thread> tlist;
	for (int i = 1; i <= 3; ++i)
	{
   
		tlist.push_back(std::thread(sellTickets, i));
	}
	for (std::thread& t : tlist)
	{
   
		t.join();
	}
	cout << "所有窗口卖票结束!" << endl;

	return 0;
}

可以看到,从代码长度来说就轻量了很多!
运行结果如下:

虽然还有部分打印乱序的情况:
(毕竟线程的执行顺序谁也摸不清 😦
但是,代码的逻辑没有问题!
不会出现一张票被卖了多次的情况!
这个原子类型也被叫做:无锁类型,像是一些无锁队列之类的实现,就是靠的这个东西。


转载:https://blog.csdn.net/m0_46308273/article/details/117037597
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场