前言
进程间通信的必要性:
之前写的代码都是单进程的。是无法使用并发能力,并且无法实现多进程协同
传输数据,同步执行流,消息通知等
进程间通信不是目的,而是一种手段。
进程间通信的技术背景
进程是具有独立性的。虚拟地址空间+页表。保证进程运行的独立性(进程内核数据结构+进程的代码和数据)
通信成本比较高。
如何理解进程间通信?
进程运行具有独立性!——进程想通信,难度比较大——进程间通信的本质:需要中间媒介,先让不同的进程看到同一份资源(内存空间)
所谓的进程看到同一块“内存”,属于哪一个进程?
不能隶属于任何一个进程,而应该强调共享。
为什么要进行进程间通信?
——需要交互数据、控制、通知等目标
1. 进程间通信方式的一些标准:
进程间通信发展
- 管道——linux原生能提供
- System V进程间通信——多进程——单机通信
- POSIX进程间通信——多线程——网络通信
进程间通信分类
-
管道
匿名管道pipe 命名管道 -
System V IPC
System V 消息队列
System V 共享内存
System V 信号量 -
POSIX IPC
消息队列
共享内存
信号量
互斥量
条件变量
读写锁
标准更多是在我们使用者看来,在接口上具有一定的规律
2. 管道
2.1 什么是管道
管道是Unix中最古老的进程间通信的形式。
我们把从一个进程连接到另一个进程的一个数据流称为一个“管道”
竖画线就是管道“|”
只能单向通信,传输的都是资源(数据)
有入口,有出口
计算机通信领域的设计者,设计了一种单向通信的方式——管道
2.2 管道的原理
管道通信背后是进程之间通过管道进行通信
纯内存级通信方式——没必须要写到磁盘中
管道的底层原理就是通过文件实现的
文件是属于内核的(OS)
1.分别溢读写方式打开同一个文件
2.fork()创建子进程——进程具有独立性,此时子进程也应自己创建一个文件描述符表,然后父进程相关的数据会拷贝给子进程——拷贝只是第一次拷贝,之后不会存在父子进程相互影响的情况
3.双方进程各自关闭自己不需要的文件描述符
让不同的进程看到了同一份资源
2.3 匿名管道
#include <unistd.h>
功能:创建一无名管道
原型
int pipe(int fd[2]);
参数
fd:文件描述符数组,其中fd[0]表示读端, fd[1]表示写端
返回值:成功返回0,失败返回错误代码
2.3.1 实例代码
1. demo代码
如何做到让不同的进程看到同一份资源呢?
fork让子进程继承的——能够让具有血缘关系的进程进行进程间通信——常用父子进程
int pipe(int pipefd[2]);
int pipefd[2]是输出型参数,期望通过调用它,得到被打开的文件fd
int pipe创建成功返回值为0,失败就是-1
Makefile
mypipe:mypipe.cc
g++ -o $@ $^ -std=c++11
.PHONY:clean
clean:
rm -f mypipe
条件编译:
#ifdef DEBUG
//…
#endif
取消注释,宏定义就开始编译。
g++ -o $@ $^ -std=c++11 #-DDEBUG#调试
mypipe.cc
#include <iostream>
#include <unistd.h>
#include <assert.h>
#include <string>
#include <cstdio>
#include <cstring>
#include <sys/types.h>
#include <sys/wait.h>
using namespace std;
int main()
{
//1.创建管道
int pipefd[2] = {
0};//保存管道的一个数组,这个数组父子进程都能看到
//pipefd[0]:读端, pipefd[1]:写端
int n = pipe(pipefd);
assert(n != -1);//debug下断言才有效
(void)n;//release下要加这句代码才有效,有使用有定义
#ifdef DEBUG
cout << "pipefd[0]: " << pipefd[0] << endl;//3
cout << "pipefd[1]: " << pipefd[1] << endl;//4
#endif
//2.创建子进程
pid_t id = fork();
assert(id != -1);
if(id ==0)
{
//子进程——读
//3.构建单向通信的信道,父进程写入,子进程读取
//3.1关闭子进程不需要的fd
close(pipefd[1]);
char buffer[1024];
while(true)
{
ssize_t s = read(pipefd[0],buffer,sizeof(buffer)-1);
if(s>0)
{
buffer[s] = 0;//通信时传的是字符串
cout << "Father# " << buffer << endl;
}
}
exit(0);//终止
}
//父进程——写
//3.构建单向通信的信道,父进程写入,子进程读取
//3.1关闭父进程不需要的fd
close(pipefd[0]);
string message = "我是父进程,我正在给你发消息";
int count = 0;
char send_buffer[1024];
while(true)
{
//3.2构建一个变化的字符串
//snprintf:安全的进行格式化显示
snprintf(send_buffer,sizeof(send_buffer),"%s[%d] : %d",
message.c_str(), getpid(), count++);
//3.3写入
write(pipefd[1],send_buffer,strlen(send_buffer));//文件不需要+1
//3.4故意sleep
sleep(1);
}
pid_t ret = waitpid(id,nullptr,0);
assert(ret>0);
(void)ret;
return 0;
}
这就叫做管道。
为什么不定义全局buffer来进行通信呢?
因为有写时拷贝的存在,无法更改通信!
#include <iostream>
#include <unistd.h>
#include <assert.h>
#include <string>
#include <cstdio>
#include <cstring>
#include <sys/types.h>
#include <sys/wait.h>
using namespace std;
int main()
{
//1.创建管道
int pipefd[2] = {
0};//保存管道的一个数组,这个数组父子进程都能看到
//pipefd[0]:读端, pipefd[1]:写端
int n = pipe(pipefd);
assert(n != -1);//debug下断言才有效
(void)n;//release下要加这句代码才有效,有使用有定义
#ifdef DEBUG
cout << "pipefd[0]: " << pipefd[0] << endl;//3 默认从3开始,因为012被占用,标准输出...
cout << "pipefd[1]: " << pipefd[1] << endl;//4
#endif
//2.创建子进程
pid_t id = fork();
assert(id != -1);
if(id ==0)
{
//子进程——读
//3.构建单向通信的信道,父进程写入,子进程读取
//3.1关闭子进程不需要的fd
close(pipefd[1]);
char buffer[1024 * 8];
while(true)
{
//写入的一方,fd没有关闭,如果有数据就读,没有数据就等
//写入的一方,fd关闭,读取的一方read会返回0,表示读到了文件的结尾
ssize_t s = read(pipefd[0],buffer,sizeof(buffer)-1);
if(s>0)
{
buffer[s] = 0;//通信时传的是字符串
cout <<"child get a message["<<getpid()<< "] Father# " << buffer << endl;
}
else if(s == 0)
{
cout << "write quit(father), me quit!!!" << endl;
break;
}
}
//close(pipefd[0]);
exit(0);//终止
}
//父进程——写
//3.构建单向通信的信道,父进程写入,子进程读取
//3.1关闭父进程不需要的fd
close(pipefd[0]);
string message = "我是父进程,我正在给你发消息";
int count = 0;
char send_buffer[1024*8];
while(true)
{
//3.2构建一个变化的字符串
//snprintf:安全的进行格式化显示
snprintf(send_buffer,sizeof(send_buffer),"%s[%d] : %d",
message.c_str(), getpid(), count++);
//3.3写入
write(pipefd[1],send_buffer,strlen(send_buffer));//文件不需要+1
//3.4故意sleep
sleep(1);
cout<<count<<endl;
if(count == 5)
{
cout<<"write quit(father)"<<endl;
break;
}
}
close(pipefd[1]);
pid_t ret = waitpid(id,nullptr,0);
cout << "id : " << id << " ret: " << ret <<endl;
assert(ret>0);
(void)ret;
return 0;
}
2. 总结管道的特点,理解以前的管道 |
- 管道是用来进行具有血缘关系的进程进行进程间通信——常用语父子通信
- 管道具有通过让进程间协同实现访问控制
管道是一个文件——读取——具有访问控制
显示器也是一个文件——父子同时往显示器写入的时候——没有说一个会等另一个的情况——缺乏访问控制- 管道提供的是面向流式的通信服务——面向字节流——协议
- 管道是基于文件的,文件的生命周期是随进程的,管道的生命周期是随进程的
- 管道是单向通信的,本质上就是半双工通信的一种特殊情况
访问控制+特殊情况(前两种+后两种):
1.写快,读慢,写满就不能再写了
2.写慢,读快,管道没有数据的时候,读必须等待
3.写关,读0,表示读到了文件结尾
4.读关,写继续写,OS终止写进程
3. 扩展——进程池
代码拷贝:
cp ../pipe/MakefileMakefile
Makefile
ProcessPool:ProcessPool.cc
g++ -o $@ $^ -std=c++11 #-DDEBUG#调试
.PHONY:clean
clean:
rm -f ProcessPool
ProcessPool.cc
#include <iostream>
#include <vector>
#include <cstdlib>
#include <ctime>
#include <cassert>
#include <unistd.h>
#include <sys/wait.h>
#include <sys/types.h>
#include "Task.hpp"
#define PROCESS_NUM 5
using namespace std;
int waitCommand(int waitFd, bool &quit) //如果对方不发,我们就阻塞
{
uint32_t command = 0;
ssize_t s = read(waitFd, &command, sizeof(command));
if (s == 0)
{
quit = true;
return -1;
}
assert(s == sizeof(uint32_t));
return command;
}
void sendAndWakeup(pid_t who, int fd, uint32_t command)
{
write(fd, &command, sizeof(command));
cout << "main process: call process " << who << " execute " << desc[command] << " through " << fd << endl;
}
int main()
{
// 代码中关于fd的处理,有一个小问题,不影响我们使用,但是你能找到吗??
load();
// pid: pipefd
vector<pair<pid_t, int>> slots;
// 先创建多个进程
for (int i = 0; i < PROCESS_NUM; i++)
{
// 创建管道
int pipefd[2] = {
0};
int n = pipe(pipefd);
assert(n == 0);
(void)n;
pid_t id = fork();
assert(id != -1);
// 子进程我们让他进行读取
if (id == 0)
{
// 关闭写端
close(pipefd[1]);
// child
while (true)
{
// pipefd[0]
// 等命令
bool quit = false;
int command = waitCommand(pipefd[0], quit); //如果对方不发,我们就阻塞
if (quit)
break;
// 执行对应的命令
if (command >= 0 && command < handlerSize())
{
callbacks[command]();
}
else
{
cout << "非法command: " << command << endl;
}
}
exit(1);
}
// father,进行写入,关闭读端
close(pipefd[0]); // pipefd[1]
slots.push_back(pair<pid_t, int>(id, pipefd[1]));
}
// 父进程派发任务
srand((unsigned long)time(nullptr) ^ getpid() ^ 23323123123L); // 让数据源更随机
while (true)
{
// 选择一个任务, 如果任务是从网络里面来的?
int command = rand() % handlerSize();
//自动选择
// 选择一个进程 ,采用随机数的方式,选择进程来完成任务,随机数方式的负载均衡
int choice = rand() % slots.size();
// 把任务给指定的进程
sendAndWakeup(slots[choice].first, slots[choice].second, command);
sleep(1);
//自己写选择太过麻烦
// int select;
// int command;
// cout << "############################################" << endl;
// cout << "# 1. show funcitons 2.send command #" << endl;
// cout << "############################################" << endl;
// cout << "Please Select> ";
// cin >> select;
// if (select == 1)
// showHandler();
// else if (select == 2)
// {
// cout << "Enter Your Command> ";
// // 选择任务
// cin >> command;
// // 选择进程
// int choice = rand() % slots.size();
// // 把任务给指定的进程
// sendAndWakeup(slots[choice].first, slots[choice].second, command);
// }
// else
// {
// }
}
// 关闭fd, 所有的子进程都会退出
for (const auto &slot : slots)
{
close(slot.second);
}
// 回收所有的子进程信息
for (const auto &slot : slots)
{
waitpid(slot.first, nullptr, 0);
}
}
Task.hpp
#pragma once
#include <iostream>
#include <string>
#include <vector>
#include <unordered_map>
#include <unistd.h>
#include <functional>
typedef std::function<void()> func;
std::vector<func> callbacks;
std::unordered_map<int, std::string> desc;
void readMySQL()
{
std::cout << "sub process[" << getpid() << " ] 执行访问数据库的任务\n" << std::endl;
}
void execuleUrl()
{
std::cout << "sub process[" << getpid() << " ] 执行url解析\n" << std::endl;
}
void cal()
{
std::cout << "sub process[" << getpid() << " ] 执行加密任务\n" << std::endl;
}
void save()
{
std::cout << "sub process[" << getpid() << " ] 执行数据持久化任务\n" << std::endl;
}
void load()
{
desc.insert({
callbacks.size(), "readMySQL: 读取数据库"});
callbacks.push_back(readMySQL);
desc.insert({
callbacks.size(), "execuleUrl: 进行url解析"});
callbacks.push_back(execuleUrl);
desc.insert({
callbacks.size(), "cal: 进行加密计算"});
callbacks.push_back(cal);
desc.insert({
callbacks.size(), "save: 进行数据的文件保存"});
callbacks.push_back(save);
}
void showHandler()
{
for(const auto &iter : desc )
{
std::cout << iter.first << "\t" << iter.second << std::endl;
}
}
int handlerSize()
{
return callbacks.size();
}
2.4 管道读写规则
当没有数据可读时
O_NONBLOCK disable:read调用阻塞,即进程暂停执行,一直等到有数据来到为止。
O_NONBLOCK enable:read调用返回-1,errno值为EAGAIN。
当管道满的时候
O_NONBLOCK disable: write调用阻塞,直到有进程读走数据
O_NONBLOCK enable:调用返回-1,errno值为EAGAIN
如果所有管道写端对应的文件描述符被关闭,则read返回0
如果所有管道读端对应的文件描述符被关闭,则write操作会产生信号SIGPIPE,进而可能导致write进程退出
当要写入的数据量不大于PIPE_BUF时,linux将保证写入的原子性。
当要写入的数据量大于PIPE_BUF时,linux将不再保证写入的原子性。
2.5 命名管道
管道文件
:可以被不同进程打开,但是不会将内存数据刷新到磁盘上
该文件一定在系统路径中,因为路径具有唯一性
双方进程就可以通过管道文件的路径看到同一份资源
与匿名管道本质上是一样的,都是文件。
区别:看到资源的方式不一样
1.匿名管道——子进程继承的方式
2.管道文件——打开同一目录下的文件的方式
3.匿名管道由pipe函数创建并打开
4.命名管道由mkfifo函数创建,打开用open
- 管道应用的一个限制就是只能在具有共同祖先(具有亲缘关系)的进程间通信。
- 如果我们想在不相关的进程之间交换数据,可以使用FIFO文件来做这项工作,它经常被称为命名管道。
- 命名管道是一种特殊类型的文件
2.5.1 创建一个命名管道
命名管道可以从命令行上创建,命令行方法是使用下面这个命令:
mkfifo filename
命名管道也可以从程序里创建,相关函数有:
int mkfifo(const char *filename,mode_t mode);
创建命名管道:
int main(int argc, char *argv[])
{
mkfifo("p2", 0644);
return 0;
}
2.5.2 命名管道的打开规则
如果当前打开操作是为读而打开FIFO时
O_NONBLOCK disable:阻塞直到有相应进程为写而打开该FIFO
O_NONBLOCK enable:立刻返回成功
如果当前打开操作是为写而打开FIFO时
O_NONBLOCK disable:阻塞直到有相应进程为读而打开该FIFO
O_NONBLOCK enable:立刻返回失败,错误码为ENXIO
2.5.3 实例代码
例子1-用命名管道实现server&client通信
Log.hpp
#ifndef _LOG_H_
#define _LOG_H_
#include <iostream>
#include <ctime>
#define Debug 0
#define Notice 1
#define Warning 2
#define Error 3
const std::string msg[] = {
"Debug",
"Notice",
"Warning",
"Error"
};
std::ostream &Log(std::string message, int level)
{
std::cout << " | " << (unsigned)time(nullptr) << " | " << msg[level] << " | " << message;
return std::cout;
}
#endif
Makefile
.PHONY:all
all:client mutiServer
client:client.cc
g++ -o $@ $^ -std=c++11
mutiServer:server.cc
g++ -o $@ $^ -std=c++11
.PHONY:clean
clean:
rm -f client mutiServer
client.cc
#include "comm.hpp"
int main()
{
// 1. 获取管道文件
int fd = open(ipcPath.c_str(), O_WRONLY);
if(fd < 0)
{
perror("open");
exit(1);
}
// 2. ipc过程
string buffer;
while(true)
{
cout << "Please Enter Message Line :> ";
std::getline(std::cin, buffer);
write(fd, buffer.c_str(), buffer.size());
}
// 3. 关闭
close(fd);
return 0;
}
comm.hpp
#ifndef _COMM_H_
#define _COMM_H_
#include <iostream>
#include <string>
#include <cstdio>
#include <cstring>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include "Log.hpp"
using namespace std;
#define MODE 0666
#define SIZE 128
string ipcPath = "./fifo.ipc";
#endif
server.cc
#include "comm.hpp"
#include <sys/wait.h>
static void getMessage(int fd)
{
char buffer[SIZE];
while (true)
{
memset(buffer, '\0', sizeof(buffer));
ssize_t s = read(fd, buffer, sizeof(buffer) - 1);
if (s > 0)
{
cout <<"[" << getpid() << "] "<< "client say> " << buffer << endl;
}
else if (s == 0)
{
// end of file
cerr <<"[" << getpid() << "] " << "read end of file, clien quit, server quit too!" << endl;
break;
}
else
{
// read error
perror("read");
break;
}
}
}
int main()
{
// 1. 创建管道文件
if (mkfifo(ipcPath.c_str(), MODE) < 0)
{
perror("mkfifo");
exit(1);
}
Log("创建管道文件成功", Debug) << " step 1" << endl;
// 2. 正常的文件操作
int fd = open(ipcPath.c_str(), O_RDONLY);
if (fd < 0)
{
perror("open");
exit(2);
}
Log("打开管道文件成功", Debug) << " step 2" << endl;
int nums = 3;
for (int i = 0; i < nums; i++)
{
pid_t id = fork();
if (id == 0)
{
// 3. 编写正常的通信代码了
getMessage(fd);
exit(1);
}
}
for(int i = 0; i < nums; i++)
{
waitpid(-1, nullptr, 0);
}
// 4. 关闭文件
close(fd);
Log("关闭管道文件成功", Debug) << " step 3" << endl;
unlink(ipcPath.c_str()); // 通信完毕,就删除文件
Log("删除管道文件成功", Debug) << " step 4" << endl;
return 0;
}
例子2-用命名管道实现文件拷贝
读取文件,写入命名管道:
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#define ERR_EXIT(m) \
do \
{
\
perror(m); \
exit(EXIT_FAILURE); \
} while(0)
int main(int argc, char *argv[]) { mkfifo(“tp”, 0644); int infd; infd = open(“abc”, O_RDONLY); if (infd == -1)ERR_EXIT(“open”);
int outfd;
outfd = open("tp", O_WRONLY);
if (outfd == -1) ERR_EXIT("open");
char buf[1024];
int n;
while ((n=read(infd, buf, 1024))>0)
{
write(outfd, buf, n);
}
close(infd);
close(outfd);
return 0;
}
读取管道,写入目标文件:
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#define ERR_EXIT(m) \
do \
{
\
perror(m); \
exit(EXIT_FAILURE); \
} while(0)
int main(int argc, char *argv[]) { int outfd; outfd = open(“abc.bak”,O_WRONLY | O_CREAT | O_TRUNC,0644); if (outfd == -1) ERR_EXIT(“open”);
int infd;
infd = open("tp", O_RDONLY);
if (outfd == -1)
ERR_EXIT("open");
char buf[1024];
int n;
while ((n=read(infd, buf, 1024))>0)
{
write(outfd, buf, n);
}
close(infd);
close(outfd);
unlink("tp");
return 0;
}
3. system V 共享内存
共享内存区是最快的IPC形式。一旦这样的内存映射到共享它的进程的地址空间,这些进程间数据传递不再涉及到内核,换句话说是进程不再通过执行进入内核的系统调用来传递彼此的数据
3.1 共享内存数据结构
struct shmid_ds {
struct ipc_perm shm_perm; /* operation perms */
int shm_segsz; /* size of segment (bytes) */
__kernel_time_t shm_atime; /* last attach time */
__kernel_time_t shm_dtime; /* last detach time */
__kernel_time_t shm_ctime; /* last change time */
__kernel_ipc_pid_t shm_cpid; /* pid of creator */
__kernel_ipc_pid_t shm_lpid; /* pid of last operator */
unsigned short shm_nattch; /* no. of current attaches */
unsigned short shm_unused; /* compatibility */
void *shm_unused2; /* ditto - used by DIPC */
void *shm_unused3; /* unused */
};
3.2 共享内存函数
shmget函数
功能:用来创建共享内存
原型
int shmget(key_t key, size_t size, int shmflg);
参数
key:这个共享内存段名字
size:共享内存大小
shmflg:由九个权限标志构成,它们的用法和创建文件时使用的mode模式标志是一样的
返回值:成功返回一个非负整数,即该共享内存段的标识码;失败返回-1
shmat函数
功能:将共享内存段连接到进程地址空间
原型
**void shmat(int shmid, const void shmaddr, int shmflg);
参数
shmid: 共享内存标识
shmaddr:指定连接的地址
shmflg:它的两个可能取值是SHM_RND和SHM_RDONLY
返回值:成功返回一个指针,指向共享内存第一个节;失败返回-1
说明:
shmaddr为NULL,核心自动选择一个地址
shmaddr不为NULL且shmflg无SHM_RND标记,则以shmaddr为连接地址。
shmaddr不为NULL且shmflg设置了SHM_RND标记,则连接的地址会自动向下调整为SHMLBA的整数倍。公式:shmaddr -
(shmaddr % SHMLBA)
shmflg=SHM_RDONLY,表示连接操作用来只读共享内存
shmadt函数
功能:将共享内存段与当前进程脱离
原型
int shmdt(const void *shmaddr);
参数
shmaddr: 由shmat所返回的指针
返回值:成功返回0;失败返回-1
注意:将共享内存段与当前进程脱离不等于删除共享内存段
shmctl函数
功能:用于控制共享内存
原型
int shmctl(int shmid, int cmd, struct shmid_ds *buf);
参数
shmid:由shmget返回的共享内存标识码
cmd:将要采取的动作(有三个可取值)
buf:指向一个保存着共享内存的模式状态和访问权限的数据结构
返回值:成功返回0;失败返回-1
命令 | 说明 |
---|---|
IPC_STAT | 把shmid_ds结构中的数据设置为共享内存的当前关联值 |
IPC_SET | 在进程有足够权限的前提下,把共享内存的当前关联值设置为shmid_ds数据结构中给出的值 |
IPC_RMID | 删除共享内存段 |
共享内存的建立:
共享内存的提供者是操作系统
操作系统要不要管理共享内存?——要——先描述再组织——共享内存 = 共享内存块+对应的共享内存的内核数据结构
创建代码:
使用统一的算法规则形成唯一值
#include <sys/ipc.h>
#include <sys/shm.h>
IPC_CREAT and IPC_EXCL
IPC_CREAT(单独):
如果创建共享内存,如果底层已经存在,获取之并且返回,如果不存在,创建之,并返回。
如果底层不存在,创建之,并返回,如果底层存在,出错返回;返回成功一定是一个全新的shm
单独使用 IPC_EXCL没有意义
int shmget(key_t key, size_t size,int shmflg);
int 共享内存的用户标识符,类似曾经的fd
key_t key
要通信的对方进程时,怎么保证对方能看到并且看到的就是我创建的共享内存呢?
通过key,数据是多少不重要,只要数据能在系统中唯一即可——server && client——使用同一个key——只要key值相同就是看到了同一个共享内存
shmid vs key
只有创建的时候使用key,大部分情况用户访问共享内存,都用的是shmid!
当进程运行结束是我们的共享内存还存在
system V IPC
资源,生命周期随内核
手动删除
代码删除
堆栈直接的共享区域是属于用户空间——不用经过系统调用,直接可以访问!——双方进程要通信,直接进行内存级的读和写即可,不需要调用接口。
那么我们之前说的pipe,fifo都要通过read、write(管道)来进行通信?为什么?
read、write——系统调用接口(基础IO)
管道属于文件——文件是系统内核中的一种特定数据结构——操作系统自己维护——无权直接访问——内核空间
3.3 实例代码
Log.hpp
#ifndef _LOG_H_
#define _LOG_H_
#include <iostream>
#include <ctime>
#define Debug 0
#define Notice 1
#define Warning 2
#define Error 3
const std::string msg[] = {
"Debug",
"Notice",
"Warning",
"Error"
};
std::ostream &Log(std::string message, int level)
{
std::cout << " | " << (unsigned)time(nullptr) << " | " << msg[level] << " | " << message;
return std::cout;
}
#endif
Makefile
.PHONY:all
all:shmClient shmServer
shmClient:shmClient.cc
g++ -o $@ $^ -std=c++11
shmServer:shmServer.cc
g++ -o $@ $^ -std=c++11
.PHONY:clean
clean:
rm -f shmClient shmServer
comm.hpp
#pragma once
#include <iostream>
#include <cstdio>
#include <unistd.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <cassert>
#include <cstring>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include "Log.hpp"
using namespace std; //不推荐
#define PATH_NAME "/home/whb"
#define PROJ_ID 0x66
#define SHM_SIZE 4096 //共享内存的大小,最好是页(PAGE: 4096)的整数倍
#define FIFO_NAME "./fifo"
class Init
{
public:
Init()
{
umask(0);
int n = mkfifo(FIFO_NAME, 0666);
assert(n == 0);
(void)n;
Log("create fifo success",Notice) << "\n";
}
~Init()
{
unlink(FIFO_NAME);
Log("remove fifo success",Notice) << "\n";
}
};
#define READ O_RDONLY
#define WRITE O_WRONLY
int OpenFIFO(std::string pathname, int flags)
{
int fd = open(pathname.c_str(), flags);
assert(fd >= 0);
return fd;
}
void Wait(int fd)
{
Log("等待中....", Notice) << "\n";
uint32_t temp = 0;
ssize_t s = read(fd, &temp, sizeof(uint32_t));
assert(s == sizeof(uint32_t));
(void)s;
}
void Signal(int fd)
{
uint32_t temp = 1;
ssize_t s = write(fd, &temp, sizeof(uint32_t));
assert(s == sizeof(uint32_t));
(void)s;
Log("唤醒中....", Notice) << "\n";
}
void CloseFifo(int fd)
{
close(fd);
}
shmClient.cc
#include "comm.hpp"
int main()
{
Log("child pid is : ", Debug) << getpid() << endl;
key_t k = ftok(PATH_NAME, PROJ_ID);
if (k < 0)
{
Log("create key failed", Error) << " client key : " << k << endl;
exit(1);
}
Log("create key done", Debug) << " client key : " << k << endl;
// 获取共享内存
int shmid = shmget(k, SHM_SIZE, 0);
if(shmid < 0)
{
Log("create shm failed", Error) << " client key : " << k << endl;
exit(2);
}
Log("create shm success", Error) << " client key : " << k << endl;
// sleep(10);
char *shmaddr = (char *)shmat(shmid, nullptr, 0);
if(shmaddr == nullptr)
{
Log("attach shm failed", Error) << " client key : " << k << endl;
exit(3);
}
Log("attach shm success", Error) << " client key : " << k << endl;
// sleep(10);
int fd = OpenFIFO(FIFO_NAME, WRITE);
// 使用
// client将共享内存看做一个char 类型的buffer
while(true)
{
ssize_t s = read(0, shmaddr, SHM_SIZE-1);
if(s > 0)
{
shmaddr[s-1] = 0;
Signal(fd);
if(strcmp(shmaddr,"quit") == 0) break;
}
}
CloseFifo(fd);
// char a = 'a';
// for(; a <= 'z'; a++)
// {
// shmaddr[a-'a'] = a;
// // 我们是每一次都向shmaddr[共享内存的起始地址]写入
// // snprintf(shmaddr, SHM_SIZE - 1,\
// // "hello server, 我是其他进程,我的pid: %d, inc: %c\n",\
// // getpid(), a);
// sleep(5);
// }
// strcpy(shmaddr, "quit");
// 去关联
int n = shmdt(shmaddr);
assert(n != -1);
Log("detach shm success", Error) << " client key : " << k << endl;
// sleep(10);
// client 要不要chmctl删除呢?不需要!!
return 0;
}
shmServer.cc
#include "comm.hpp"
// 是不是对应的程序,在加载的时候,会自动构建全局变量,就要调用该类的构造函数 -- 创建管道文件
// 程序退出的时候,全局变量会被析构,自动调用析构函数,会自动删除管道文件
Init init;
string TransToHex(key_t k)
{
char buffer[32];
snprintf(buffer, sizeof buffer, "0x%x", k);
return buffer;
}
int main()
{
// 我们之前为了通信,所做的所有的工作,属于什么工作呢:让不同的进程看到了同一份资源(内存)
// 1. 创建公共的Key值
key_t k = ftok(PATH_NAME, PROJ_ID);
assert(k != -1);
Log("create key done", Debug) << " server key : " << TransToHex(k) << endl;
// 2. 创建共享内存 -- 建议要创建一个全新的共享内存 -- 通信的发起者
int shmid = shmget(k, SHM_SIZE, IPC_CREAT | IPC_EXCL | 0666); //
if (shmid == -1)
{
perror("shmget");
exit(1);
}
Log("create shm done", Debug) << " shmid : " << shmid << endl;
// sleep(10);
// 3. 将指定的共享内存,挂接到自己的地址空间
char *shmaddr = (char *)shmat(shmid, nullptr, 0);
Log("attach shm done", Debug) << " shmid : " << shmid << endl;
// sleep(10);
// 这里就是通信的逻辑了
// 将共享内存当成一个大字符串
// char buffer[SHM_SIZE];
// 结论1: 只要是通信双方使用shm,一方直接向共享内存中写入数据,另一方,就可以立马看到对方写入的数据。
// 共享内存是所有进程间通信(IPC),速度最快的!不需要过多的拷贝!!(不需要将数据给操作系统)
// 结论2: 共享内存缺乏访问控制!会带来并发问题 【如果我想一定程度的访问控制呢? 能】
int fd = OpenFIFO(FIFO_NAME, READ);
for(;;)
{
Wait(fd);
// 临界区
printf("%s\n", shmaddr);
if(strcmp(shmaddr, "quit") == 0) break;
// sleep(1);
}
// 4. 将指定的共享内存,从自己的地址空间中去关联
int n = shmdt(shmaddr);
assert(n != -1);
(void)n;
Log("detach shm done", Debug) << " shmid : " << shmid << endl;
// sleep(10);
// 5. 删除共享内存,IPC_RMID即便是有进程和当下的shm挂接,依旧删除共享内存
n = shmctl(shmid, IPC_RMID, nullptr);
assert(n != -1);
(void)n;
Log("delete shm done", Debug) << " shmid : " << shmid << endl;
CloseFifo(fd);
return 0;
}
注意:共享内存没有进行同步与互斥
共享内存是速度最快的
4. 信号量
基于对共享内存的理解:
为了让进程间通信——让不同的进程之间,看到同一份资源——之前学习的所有通信方式,本质都是优先解决一个问题:让不同的进程看到同一份资源
问题:
让不同的进程看到了同一份资源,比如共享内存,也带来了一些时序问题——造成数据不一致的问题!
- 我们把多个进程(执行流)看到的公共的一份资源–临界资源
- 我们把自己的进程,访问临界资源的代码–临界区
所以,多个执行流,互相运行的时候互相干扰,主要是我们不加保护的访问了同样的资源(临界资源),在非临界区多个执行流互相是不影响的! - 为了更好地进行临界区的保护,可以让多执行流
在任何时刻,都只能有一个进程进入临界区——互斥 - 原子性:要么不做,要么做完,没有中间状态,就称之为原子性
每一个进程想进入临界资源,访问临界资源中的一部分,不能让进程直接去使用临界资源(不能让用户直接去电影院内部占座位),你的先申请信号量(你的先买票!! )
本质是一个计数器
,类似 int count = n;(不准确)
先申请信号量
1.申请信号量的本质:让信号量计数器–
2.主要申请信号量成功,临界资源内部,一定给你预留了你想要的资源——申请信号量本质其实是对临界资源的一种预定机制
申请信号量–
访问临界资源——进程执行自己的临界区代码
释放信号量++
信号量是一个计数器,int n = 10;用一个整数,能不能标识信号量呢?
假设让多个进程(整数n在共享内存里),看到同一个全局变量,大家都进行申请信号量n --—–也是不可以的!!
inr n = 10;
子进程
父进程
client:
n–
写入数据到共享内存
n++
sever:
n–
读取共享内存
n++
计算要在CPU内,数据在内存的n变量里
CPU执行指令的时候
1.将内存中的数据加载到cpu内的寄存器中(读指令)
2.n–(分析&&执行指令)
n–:因为时序问题,而导致n有中间状态,可能导致数据不一致!是不安全的!如果一个n–操作只有一行汇编,该操作是原子的!
3.将CPU修改完毕的n写回内存(写回结果)
执行流在执行的时候,在任何时刻都可能被切换!!
寄存器只有一套,被所有的执行流共享,但是寄存器里面的数据,属于每一个执行流,属于该执行流的上下文数据!!
上下文保护&&上下文恢复的!
信号量计数器:是对临界资源的预定机制
申请信号量——计数器–——P操作——必须是原子的
释放信号量——计数器++——V操作——必须是原子
5. 进程互斥
由于各进程要求共享资源,而且有些资源需要互斥使用,因此各进程间竞争使用这些资源,进程的这种
关系为进程的互斥
系统中某些资源一次只允许一个进程使用,称这样的资源为临界资源或互斥资源。
在进程中涉及到互斥资源的程序段叫临界区
特性方面
:
IPC资源必须删除,否则不会自动清除,除非重启,所以system V IPC资源的生命周期随内核
转载:https://blog.csdn.net/Ll_R_lL/article/details/128754517