飞道的博客

【Linux】第十四章 多线程(生产者消费者模型+POSIX信号量)

483人阅读  评论(0)

🏆个人主页企鹅不叫的博客

​ 🌈专栏

⭐️ 博主码云gitee链接:代码仓库地址

⚡若有帮助可以【关注+点赞+收藏】,大家一起进步!

💙系列文章💙


【Linux】第一章环境搭建和配置

【Linux】第二章常见指令和权限理解

【Linux】第三章Linux环境基础开发工具使用(yum+rzsz+vim+g++和gcc+gdb+make和Makefile+进度条+git)

【Linux】第四章 进程(冯诺依曼体系+操作系统+进程概念+PID和PPID+fork+运行状态和描述+进程优先级)

【Linux】第五章 环境变量(概念补充+作用+命令+main三个参数+environ+getenv())

【Linux】第六章 进程地址空间(程序在内存中存储+虚拟地址+页表+mm_struct+写实拷贝+解释fork返回值)

【Linux】第七章 进程控制(进程创建+进程终止+进程等待+进程替换+min_shell)

【Linux】第八章 基础IO(open+write+read+文件描述符+重定向+缓冲区+文件系统管理+软硬链接)

【Linux】第九章 动态库和静态库(生成原理+生成和使用+动态链接)

【Linux】第十章 进程间通信(管道+system V共享内存)

【Linux】第十一章 进程信号(概念+产生信号+阻塞信号+捕捉信号)

【Linux】第十二章 多线程(线程概念+线程控制)

【Linux】第十三章 多线程(线程互斥+线程安全和可重入+死锁+线程同步)



💎一、生产者消费者模型

🏆1.生产者消费者模型概念

概念: 生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。

生产者和消费者彼此之间不直接通讯,而通过一个来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

🏆2.生产者消费者模型优点

  • 解耦:生产者和消费者是通过一个共享数据区域来进行通信。而不是直接进行通信,这样两个角色之间的依耐性就降低了(代码层面实现解耦),变成了角色与共享数据区域之间的弱耦合,一个逻辑出错不影响两一个逻辑,二者变得更独立。
  • 支持并发:生产者负责生产数据,消费者负责拿数据。生产者生产完数据可以继续生产,在消费者消费期间生产者可以同时进行生产,主要是指生产前或者消费后的对应的并发
  • 支持忙闲下不均:生产者生产了数据是放进容器中,消费者不必立即消费,可以慢慢地从容器中取数据。容器快要空了,消费者的消费速度就可以降下来,让生产者继续生产。

🏆3.生产者消费者模型特点

  1. 3种关系: 生产者与生产者(互斥)、生产者与消费者(互斥、同步)和消费者与消费者(互斥)
  2. 两个角色: 生产者和消费者
  3. 一个交易场所: 容器、共享资源等

生产者和消费者之间存在同步关系:

  • 让生产者一直生产,那么消费者会产生饥饿问题。
  • 让消费者一直消费,生产者会产生饥饿问题。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-mvfJSrPD-1669198680727)(https://only-figure-bed.oss-cn-hangzhou.aliyuncs.com/img2/%E5%8D%9A%E5%AE%A2%E5%9B%BE%E5%BA%8A/202211221004879.png)]

💎二、基于BlockingQueue的生产者消费者模型

🏆1.基于BlockingQueue的生产者消费者模型概念

多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构

阻塞队列的特点:

  • 当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素
  • 当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出

🏆2.基于BlockingQueue的生产者消费者模型实现

概述

  • 以单生产者、单消费者为例进行实现
  • 队列: 使用STL中的queue来实现
  • 容量: 阻塞队列的容量,由用户给定,我们也可以提供一个默认的容量
  • 互斥量: 为了实现生产者和消费者的同步,我们需要使用条件变量和互斥量来实现同步的操作
  • 生产者唤醒和等待的条件变量: 当队列满了,生产者等待条件满足,应该挂起等待,等待消费者唤醒
  • 消费者唤醒和等待的条件变量: 当队列为空,消费者等待条件满足,应该挂起等待,等待生产者唤醒

框架:blockqueue.hpp

#pragma once
#include <iostream>
#include <queue>
#include <cstdlib>
#include <unistd.h>
#include <pthread.h>
using namespace std;
#define NUM 5	//存储数据的上限为5
template <class T>
class BlockQueue
{
    
public:
    BlockQueue(size_t capacity = NUM) 
        : _capacity(capacity)
    {
    
        pthread_mutex_init(&_lock, nullptr);
        pthread_cond_init(&_conCond, nullptr);
        pthread_cond_init(&_proCond, nullptr);
    }
    ~BlockQueue()
    {
    
        pthread_mutex_destroy(&_lock);
        pthread_cond_destroy(&_conCond);
        pthread_cond_destroy(&_proCond);
    }
private:
    size_t _capacity;          		  //容量
    queue<T> _q;        	  	  // 阻塞队列
    pthread_mutex_t _lock;	  //保护阻塞队列的互斥锁
    pthread_cond_t _conCond; // 让消费者等待的条件变量
    pthread_cond_t _proCond; // 让生产者等待的条件变量
};

  

tips:条件(对应的共享资源的状态)条件变量(条件满足或者不满足的时候,进行wait或singal的一种方式)

生产接口和消费接口

  • 生产者进行相关操作前先上锁,队列如果为满就需要挂起等待。队列不为满就生成一个数据,然后需要把数据放入阻塞队列中,解锁之后唤醒消费者消费。
  • 消费者进行相关操作前先上锁,队列如果为空就需要挂起等待。队列不为空就需要从阻塞队列中取一个数据,然后解锁之后唤醒生产者生产。
  • 在临界资源判断唤醒条件应该使用while循环检测,pthread_cond_wait函数是让当前执行流进行等待的函数,是函数就意味着有可能调用失败,调用失败后该执行流就会继续往后执行
  • 我们也可以当阻塞队列当中存储的数据大于队列容量时,再唤醒消费者线程进行消费;当阻塞队列当中存储的数据小于队列容器时,再唤醒生产者线程进行生产。
    //生产接口
    void push(const T &date) 
    {
    
        lockQueue();
        while (isFull())
        {
    
            proBlockWait(); //阻塞等待,等待被唤醒
        }
        // 条件满足,可以生产
        pushCore(date); //生产完成
        if(_q.size() <= NUM){
    
            wakeupCon(); // 唤醒消费者
        }
        unlockQueue();//解锁
    }
    //消费接口
    T pop()
    {
    
        lockQueue();
        while (isEmpty())
        {
    
            conBlockwait(); //阻塞等待,等待被唤醒
        }
        // 条件满足,可以消费
        T tmp = popCore();
        if(_q.size() <= NUM){
    
            wakeupPro(); // 唤醒生产者
        }
        unlockQueue();//解锁
        return tmp;
    }

  
  • 判断队列为空或为满
  • 唤醒生产者和唤醒消费者
  • 生产者阻塞等待和消费者阻塞等待
  • 加锁和解锁
  • 生产和消费
    //加锁
    void lockQueue()
    {
    
        pthread_mutex_lock(&_lock);
    }
    //解锁
    void unlockQueue()
    {
    
        pthread_mutex_unlock(&_lock);
    }
    //判断空
    bool isEmpty()
    {
    
        return _q.empty();
    }
    //判断满
    bool isFull()
    {
    
        return _q.size() == _capacity;
    }
    //生产者阻塞等待
    void proBlockWait()
    {
    
        // 在阻塞线程的时候,会自动释放锁
        pthread_cond_wait(&_proCond, &_lock);
    }
    //消费者阻塞等待
    void conBlockwait()
    {
    
        // 在阻塞线程的时候,会自动释放锁
        pthread_cond_wait(&_conCond, &_lock);
        // 当阻塞结束,返回的时候,pthread_cond_wait,会自动帮你重新获得锁,然后才返回
    }
    // 唤醒生产者
    void wakeupPro()
    {
    
        cout << "wake up consumer...." << endl;
        pthread_cond_signal(&_proCond);
    }
    // 唤醒消费者
    void wakeupCon() 
    {
    
        cout << "wake up productor...." << endl;
        pthread_cond_signal(&_conCond);
    }
    //生产
    void pushCore(const T &date)
    {
    
        _q.push(date);
    }
    //消费
    T popCore()
    {
    
        T tmp = _q.front();
        _q.pop();
        return tmp;
    }

  

封装一个任务

实现一个任务类,生产者把这个任务放进阻塞队列中,消费者取出并进行处理。其中还有一个run的任务执行方法

#pragma once
#include <iostream>
#include <string>

using namespace std;

class Task
{
    
public:
 Task() 
     : _x(0)
     , _y(0)
     , _op('?')
 {
    }

 Task(int x, int y, char op) 
     : _x(x)
     , _y(y)
     , _op(op)
 {
    }

 ~Task()
 {
    }

 int operator()()
 {
    
     return run();
 }
 int run()
 {
    
     int result = 0;
     switch (_op)
     {
    
     case '+':
         result = _x + _y;
         break;
     case '-':
         result = _x - _y;
         break;
     case '*':
         result = _x * _y;
         break;
     case '/':
         {
    
             if (_y == 0)
             {
    
                 cout << "div zero, abort" << endl;
                 result = -1;
             }
             else
             {
    
                 result = _x / _y;
             }
         }
         break;
     case '%':
         {
    
             if (_y == 0)
             {
    
                 cout << "mod zero, abort" << endl;
                 result = -1;
             }
             else
             {
    
                 result = _x % _y;
             }
         }
         break;
     default:
         cout << "非法操作: " << _op << endl;
         break;
     }
     return result;
 }
 int get(int *e1, int *e2, char *op)
 {
    
     *e1 = _x;
     *e2 = _y;
     *op = _op;
 }
private:
 int _x;
 int _y;
 char _op;
};

  

单生产者线程和单消费者线程模型分析

  • 阻塞队列要让生产者线程向队列中Push数据,让消费者线程从队列中Pop数据,因此这个阻塞队列必须要让这两个线程同时看到,所以我们在创建生产者线程和消费者线程时,需要将该阻塞队列作为线程执行例程的参数进行传入。
  • 代码中生产者生产数据就是将获取到的随机数Push到阻塞队列,而消费者消费数据就是从阻塞队列Pop数据,为了便于观察,我们可以将生产者生产的数据和消费者消费的数据进行打印输出。
  • 其中用sleep控制生产消费的速度
#include "test.hpp"
#include "blockqueue.hpp"
#include <ctime>
using namespace std;

const string ops = "+-*/%";

void *consumer(void *args)
{
    
    BlockQueue<Task> *bqp = (BlockQueue<Task> *)args;
    while (true)
    {
    
        Task t = bqp->pop(); // 消费任务
        int result = t();    //处理任务
        int x, y;
        char op;
        t.get(&x, &y, &op);
        cout << "consumer[" << pthread_self() << "] " << (unsigned int)time(nullptr) << " 消费了一个任务: " << x << op << y << "=" << result << endl;
        sleep(1);
    }
}
void *productor(void *args)
{
    
    BlockQueue<Task> *bqp = (BlockQueue<Task> *)args;
    while (true)
    {
    
        //制作任务
        int x = rand() % 20;
        int y = rand() % 20;
        char op = ops[rand() % ops.size()];
        Task t(x, y, op);
        //生产任务
        bqp->push(t);
        cout << "producter[" << pthread_self() << "] " << (unsigned int)time(nullptr) << " 生产了一个任务: " << x << op << y << "=?" << endl;
        sleep(1);
    }
}

int main()
{
    
    srand((unsigned int)time(nullptr));
    BlockQueue<Task>* bq = new BlockQueue<Task>;

    pthread_t con, pro;
    pthread_create(&con, nullptr, consumer, bq);
    pthread_create(&pro, nullptr, productor, bq);

    pthread_join(con, nullptr);
    pthread_join(pro, nullptr);
    delete bq;
    return 0;
}

  

结果:生产一个消费一个

[Jungle@VM-20-8-centos:~/lesson36/blockqueue]$ ./main
wake up productor....
producter[wake up consumer....
consumer[140114744878848] 1669102520 消费了一个任务: 16-15=1
140114736486144] 1669102520 生产了一个任务: 16-15=?
wake up productor....
producter[140114736486144] 1669102522 生产了一个任务: 14*1=?
wake up consumer....
consumer[140114744878848] 1669102522 消费了一个任务: 14*1=14
wake up productor....
producter[140114736486144] 1669102524 生产了一个任务: 1+12=?
wake up consumer....
consumer[140114744878848] 1669102524 消费了一个任务: 1+12=13

💎三、POSIX信号量

🏆1.信号量概念

信号量本质是一个计数器,是描述临界资源中资源数目的计数器,申请一个资源就对信号量减1(P操作,当申请成功时临界资源中资源的数目应该减一),释放一个资源就对信号量加1(V操作,当释放成功时临界资源中资源的数目就应该加一)

PV操作必须是原子操作,信号量是会被多个执行流同时访问的,也就是说信号量本质也是临界资源

🏆2.信号量原理

当我们用互斥锁对临界资源进行保护的时候,我们相当于把临界资源看成一个整体,同一时间只能允许一个执行流对临界资源访问,但是我们可以把临界资源分割程多个区域,此时执行流可以同时访问临界资源的不同区域

🏆3.信号量函数

sem_init

#include<semaphore.h> 
int sem_init(sem_t *sem, int pshared, unsigned int value); 

初始化信号量

参数:

  • sem:信号量
  • pshared:0表示线程间共享,非零表示进程间共享
  • value:信号量初始值

返回值: 成功返回0,失败返回-1

注意: POSIX信号量和System V信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的,但POSIX信号量可以用于线程间同步。

sem_destroy

#include<semaphore.h> 
int sem_destroy(sem_t *sem);

销毁信号量

参数:

  • sem:信号量

返回值: 成功返回0,失败返回-1

sem_wait

#include<semaphore.h> 
int sem_wait(sem_t *sem);

等待信号量

功能: 等待信号量,会将信号量的值减1
参数:

  • sem:信号量

返回值: 成功返回0,失败返回-1

sem_post

#include<semaphore.h> 
int sem_post(sem_t *sem);

发布信号量

功能: 发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1
参数:

  • sem:信号量

返回值: 成功返回0,失败返回-1

实例:主线程当中创建四个新线程,让这四个新线程执行抢票逻辑,并且每次抢完票后打印输出此时剩余的票数

#include <iostream>
#include <string>
#include <unistd.h>
#include <pthread.h>
#include <semaphore.h>

using namespace std;

int tickets = 2000;
sem_t sem;//二元信号量

void* TicketGrabbing(void* arg)
{
    
	string name = (char*)arg;
	while (true){
    
     sem_wait(&sem);//等待信号量
		if (tickets > 0){
    
			usleep(1000);
			cout << name << " 剩余票数: " << --tickets << endl;
         sem_post(&sem);//发布信号量
		}
		else{
    
         sem_post(&sem);//发布信号量
			break;
		}
	}
	cout << name << "抢完了" << endl;
	pthread_exit(nullptr);
}

int main()
{
    
 sem_init(&sem, 0, 1);
	pthread_t tid1, tid2, tid3, tid4;
	pthread_create(&tid1, nullptr, TicketGrabbing, (void*)"thread 1");
	pthread_create(&tid2, nullptr, TicketGrabbing, (void*)"thread 2");
	pthread_create(&tid3, nullptr, TicketGrabbing, (void*)"thread 3");
	pthread_create(&tid4, nullptr, TicketGrabbing, (void*)"thread 4");

 sem_destroy(&sem);
	pthread_join(tid1, nullptr);
	pthread_join(tid2, nullptr);
	pthread_join(tid3, nullptr);
	pthread_join(tid4, nullptr);
	return 0;
}

  

结果:没有添加二元信号量的话会出现票数为负数的情况

thread 3 剩余票数: 3
thread 3 剩余票数: 2
thread 3 剩余票数: 1
thread 3 剩余票数: 0
thread 3抢完了
thread 4抢完了
thread 1抢完了
thread 2抢完了

说明: 二元信号量(value=1,一个资源)等价于互斥锁

💎四、基于环形队列的生产消费模型

🏆1.环形队列介绍

环形队列采用数组模拟

🏆2.空间资源和数据资源

  • 生产者关注的是环形队列当中是否有空间,只要有空间生产者就可以进行生产。
  • 消费者关注的是环形队列当中是否有数据,只要有数据消费者就可以进行消费。
  • 空间资源的初始值我们应该设置为环形队列的容量,因为刚开始时环形队列当中全是空间。
  • 数据资源的初始值我们应该设置为0,因为刚开始时环形队列当中没有数据。

🏆3.生产者申请和释放资源

对于生产者来说,每次生产数据前都要先申请空间资源

  • 如果空间资源不为0,则可以进行生产操作
  • 如果空间资源为0,则生产者需要在空间资源的等待队列下进行阻塞等待,直到有空间资源后才被唤醒

当生产者生产完数据后,因该释放数据资源

  • 生产者生产前对空间资源进行P(申请)操作,生产完后对数据资源进行V(释放)操作

🏆4.消费者申请和释放资源

对于消费者来说,每次消费数据前都需要先申请数据资源

  • 如果数据资源不为0,则可以进行消费操作
  • 如果数据资源为0,则消费者需要在数据资源的等待队列下进行阻塞等待,直到有数据资源后才被唤醒

当消费者消费完数据后,因该释放空间资源

  • 消费者消费前对数据资源进行P(申请)操作,消费完后对空间资源进行V(释放)操作

🏆5.信号量在环形队列中的作用

当生产者和消费者指向同一位置的情况:

  • 环形队列为空时。

  • 环形队列为满时。

  • 当环形队列为空的时,消费者一定不能进行消费,因为此时数据资源为0。

  • 当环形队列为满的时,生产者一定不能进行生产,因为此时空间资源为0。

信号量保证了生产者和消费者不会指向同一个位置

🏆6.代码

概述

概述:

  • 队列:数组模拟
  • 容量:由用户给定
  • 空间资源信号量:队列的容量大小
  • 数据资源信号量:开始为0
  • 生产者的下标位置:开始为0
  • 消费者的下标位置:开始为0
  • 生产者:需要申请空间资源(P操作),然后释放数据资源(V操作)
  • 消费者:需要申请数据资源(P操作),然后释放空间资源(V操作)

框架:

#pragma once
#include <iostream>
#include <vector>
#include <string>
#include <semaphore.h>

using namespace std;

#define NUM 10

template <class T>
class RingQueue
{
    
public:
    RingQueue(int capacity = NUM)
        :_ringqueue(capacity)
        , _ppos(0)
        , _cpos(0)
    {
    
        sem_init(&_blank_sem, 0, _ringqueue.size()); //blank_sem初始值设置为环形队列的容量
        sem_init(&_data_sem, 0, 0);                  //data_sem初始值设置为0
    }
    ~RingQueue()
    {
    
        sem_destroy(&_blank_sem);
        sem_destroy(&_data_sem);
    }
private:
    vector<T> _ringqueue; 	// 环形队列
    sem_t _blank_sem;       // 描述空间资源,productor
    sem_t _data_sem;        // 描述数据资源,consumer
    int _ppos;    	        // 当前生产者写入的位置, 如果是多线程,_ppos也是临界资源
    int _cpos;    	        // 当前消费者读取的位置,如果是多线程,_cpos也是临界资源
};

  

插入数据和获取数据

  • 生产者生成数据前需要申请空间资源信号量blank_sem,申请不成功就挂起等待,等待信号量来了继续获得信号量,然后释放数据资源信号量data_sem
  • 消费者消费数据前需要申请数据资源信号量 data_sem ,申请不成功就挂起等待,等待信号量来了继续获得信号量,然后释放空间资源信号量blank_sem
     // 生产
    void push(const T &data)
    {
    
        sem_wait(&_blank_sem); //生产者关注空间资源
        _ringqueue[_ppos] = data; //生产的过程
        sem_post(&_data_sem);//生产
        _ppos++;   // 写入位置后移
        _ppos %= _ringqueue.size(); // 更新下标,保证环形特征
    }
    // 消费
    T pop()
    {
    
        sem_wait(&_data_sem);//消费者关注数据资源
        T temp = _ringqueue[_cpos];//消费过程
        sem_post(&_blank_sem);//消费
        _cpos++;// 读取位置后移
        _cpos %= _ringqueue.size();// 更新下标,保证环形特征

        return temp;
    }

  

单生产者线程和单消费者线程模型分析

  • 默认环形队列大小是10
  • 生产者每次生产到下标为_ppos,消费者每次消费到下标为 _cpos
  • 生产者消费者每次对数据修改后,要标记下一个数据的位置,同时对下标进行取模运算实现环形
  • 主函数我们就只需要创建一个生产者线程和一个消费者线程,生产者线程不断生产数据放入环形队列,消费者线程不断从环形队列里取出数据进行消费
#include "ringqueue.hpp"
#include <ctime>
#include <unistd.h>


void *productor(void *args)
{
    
    RingQueue<int> *rqp = (RingQueue<int> *)args;
    while(true)
    {
    
        sleep(1);
        int data = rand()%10;
        rqp->push(data);
        cout << "pthread[" << pthread_self() << "]" << " 生产了一个数据: " << data << endl;
    }
}

void *consumer(void *args)
{
    
    RingQueue<int> *rqp = (RingQueue<int> *)args;
    while(true)
    {
    
        //sleep(1);
        int data = rqp->pop();
        cout << "pthread[" << pthread_self() << "]" << " 消费了一个数据: " << data << endl;
    }
}

int main()
{
    
    srand((unsigned int)time(nullptr));

    RingQueue<int>* rq = new RingQueue<int>;

    pthread_t c1,p1;
    pthread_create(&p1, nullptr, productor, rq);
    pthread_create(&c1, nullptr, consumer, rq);

    pthread_join(c1, nullptr);
    pthread_join(p1, nullptr);
    delete rq;
    return 0;
}

  

结果:生产者每隔一秒进行生产,而消费者不停的进行消费

[Jungle@VM-20-8-centos:~/lesson36/ringqueue]$ ./main
pthread[140715924961024] 生产了一个数据: 8
pthread[140715916568320] 消费了一个数据: 8
pthread[140715924961024] 生产了一个数据: 3
pthread[140715916568320] 消费了一个数据: 3
pthread[140715924961024] 生产了一个数据: 4
pthread[140715916568320] 消费了一个数据: 4


转载:https://blog.csdn.net/YQ20210216/article/details/128005338
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场