飞道的博客

项目(百万并发网络通信架构)12.2---内存管理之(为服务端添加智能指针)

558人阅读  评论(0)

一、C++智能指针概述

二、为服务端添加智能指针

  • 在前一篇文章中我们为服务端添加了内存池,重写了new和delete运算符。在本文中,我们将服务端程序中的普通类型指针大部分更改为智能指针shared_ptr
  • shared_ptr的内部也会使用new和delete,因此shared_ptr的内部也会使用到我们的内存池,因为new和delete被我们重写了

三、代码如下

  • 在之前文章中的所有代码中,我们只需要更改EasyTcpServer.hpp,其余不需要更改

EasyTcpServer.hpp代码如下

  • 代码如下,不能保证完全没有错误

   
  1. #ifndef _EasyTcpClient_hpp_
  2. #define _EasyTcpClient_hpp_
  3. #ifdef _WIN32
  4. #define FD_SETSIZE 10240
  5. #define WIN32_LEAN_AND_MEAN
  6. #define _WINSOCK_DEPRECATED_NO_WARNINGS //for inet_pton()
  7. #define _CRT_SECURE_NO_WARNINGS
  8. #include <windows.h>
  9. #include <WinSock2.h>
  10. #pragma comment(lib, "ws2_32.lib")
  11. #else
  12. #include <unistd.h>
  13. #include <sys/socket.h>
  14. #include <sys/types.h>
  15. #include <arpa/inet.h>
  16. #include <netinet/in.h>
  17. #include <sys/select.h>
  18. //在Unix下没有这些宏,为了兼容,自己定义
  19. #define SOCKET int
  20. #define INVALID_SOCKET (SOCKET)(~0)
  21. #define SOCKET_ERROR (-1)
  22. #endif
  23. #ifndef RECV_BUFF_SIZE
  24. #define RECV_BUFF_SIZE 10240*5 //接收缓冲区的大小
  25. #endif // !RECV_BUFF_SIZE
  26. #ifndef SEND_BUFF_SIZE
  27. #define SEND_BUFF_SIZE RECV_BUFF_SIZE //发送缓冲区的大小
  28. #endif // !SEND_BUFF_SIZE
  29. #include <iostream>
  30. #include <string.h>
  31. #include <stdio.h>
  32. #include <vector>
  33. #include <mutex>
  34. #include <atomic>
  35. #include <functional>
  36. #include <map>
  37. #include <memory>
  38. #include "MessageHeader.hpp"
  39. #include "CELLTimestamp.hpp"
  40. #include "CELLTask.hpp"
  41. //using namespace std;
  42. class CellServer;
  43. //客户端数据类型,用来封装一个客户端
  44. class ClientSocket
  45. {
  46. public:
  47. ClientSocket(SOCKET sockfd = INVALID_SOCKET) :_sock(sockfd), _lastRecvPos( 0), _lastSendPos( 0) {
  48. memset(_recvMsgBuff, 0, sizeof(_recvMsgBuff));
  49. memset(_sendMsgBuff, 0, sizeof(_sendMsgBuff));
  50. }
  51. SOCKET sockfd() { return _sock; }
  52. char *recvMsgBuff() { return _recvMsgBuff; }
  53. int getRecvLastPos() { return _lastRecvPos; }
  54. void setRecvLastPos(int pos) { _lastRecvPos = pos; }
  55. char *sendMsgBuff() { return _sendMsgBuff; }
  56. int getSendLastPos() { return _lastSendPos; }
  57. void setSendLastPos(int pos) { _lastSendPos = pos; }
  58. int SendData(std::shared_ptr<DataHeader>& header);
  59. private:
  60. SOCKET _sock; //客户端socket
  61. char _recvMsgBuff[RECV_BUFF_SIZE]; //消息接收缓冲区
  62. int _lastRecvPos; //消息接收缓冲区的数据尾部位置
  63. char _sendMsgBuff[SEND_BUFF_SIZE]; //消息发送缓冲区
  64. int _lastSendPos; //消息发送缓冲区的数据尾部位置
  65. };
  66. //网络事件接口
  67. class INetEvent {
  68. public:
  69. virtual void OnClientJoin(std::shared_ptr<ClientSocket>& pClient) = 0; //客户端加入事件
  70. virtual void OnClientLeave(std::shared_ptr<ClientSocket>& pClient) = 0; //客户端离开事件
  71. virtual void OnNetMsg(CellServer* pCellServer, std::shared_ptr<ClientSocket>& pClient, DataHeader* header) = 0; //接收到客户端消息事件
  72. virtual void OnNetRecv(std::shared_ptr<ClientSocket>& pClient) = 0; //recv函数执行事件
  73. };
  74. //发送消息的任务
  75. class CellSendMsg2ClientTask : public CellTask
  76. {
  77. public:
  78. CellSendMsg2ClientTask( std:: shared_ptr<ClientSocket>& pClient, std:: shared_ptr<DataHeader>& header)
  79. :_pClient(pClient), _pHeader(header) {}
  80. void doTask()
  81. {
  82. _pClient->SendData(_pHeader);
  83. }
  84. private:
  85. std:: shared_ptr<ClientSocket> _pClient; //发送给哪个客户端
  86. std:: shared_ptr<DataHeader> _pHeader; //要发送的数据的头指针
  87. };
  88. //处理客户的从服务器类
  89. class CellServer
  90. {
  91. public:
  92. CellServer(SOCKET sock = INVALID_SOCKET) :_sock(sock), maxSock(_sock), _pthread( nullptr), _pNetEvent( nullptr), _clients_change( false) {
  93. memset(&_fdRead_bak, 0, sizeof(_fdRead_bak));
  94. }
  95. ~CellServer() { CloseSocket(); }
  96. public:
  97. bool isRun() { return _sock != INVALID_SOCKET; }
  98. void CloseSocket();
  99. public:
  100. size_t getClientCount() { return _clients.size() + _clientsBuff.size(); } //返回当前客户端的数量
  101. void setEventObj(INetEvent* event) { _pNetEvent = event; } //设置事件对象,此处绑定的是EasyTcpServer
  102. public:
  103. bool Onrun();
  104. void AddClient(std::shared_ptr<ClientSocket>& pClient) //讲客户端加入到客户端连接缓冲队列中
  105. {
  106. //自解锁
  107. std::lock_guard< std::mutex> lock(_mutex);
  108. //_mutex.lock(); 当然也可以使用互斥锁
  109. _clientsBuff.push_back(pClient);
  110. //_mutex.unlock();
  111. }
  112. int RecvData(std::shared_ptr<ClientSocket>& pClient); //接收数据
  113. void OnNetMessage(std::shared_ptr<ClientSocket>& pClient, DataHeader* header); //处理网络消息
  114. void Start() {
  115. //启动当前服务线程
  116. //创建一个线程,线程执行函数为Onrun(),其实可以不传递this,但是为了更安全,可以传递this给Onrun()
  117. _pthread = new std::thread( std::mem_fn(&CellServer::Onrun), this);
  118. //启动任务管理
  119. _taskServer.Start();
  120. }
  121. public:
  122. void addSendTask(std::shared_ptr<ClientSocket>& pClient, std::shared_ptr<DataHeader>& header)
  123. {
  124. auto task = std::make_shared<CellSendMsg2ClientTask>(pClient, header);
  125. _taskServer.addTask((CellTaskPtr&)task);
  126. }
  127. private:
  128. SOCKET _sock; //服务端的套接字
  129. std:: map<SOCKET, std:: shared_ptr<ClientSocket>> _clients; //真正存储客户端
  130. std:: vector< std:: shared_ptr<ClientSocket>> _clientsBuff; //存储客户端连接缓冲队列,之后会被加入到_clients中去
  131. SOCKET maxSock; //当前最大的文件描述符值,select的参数1要使用
  132. //char _recvBuff[RECV_BUFF_SIZE]; //接收缓冲区
  133. std::mutex _mutex; //互斥锁
  134. std::thread* _pthread; //当前子服务端执行的线程
  135. INetEvent* _pNetEvent;
  136. fd_set _fdRead_bak; //用来保存当前的fd_set
  137. bool _clients_change; //当前是否有新客户端加入进来
  138. private:
  139. CellTaskServer _taskServer;
  140. };
  141. //服务器主类
  142. class EasyTcpServer : public INetEvent
  143. {
  144. public:
  145. EasyTcpServer() :_sock(INVALID_SOCKET), _msgCount( 0), _recvCount( 0), _clientCount( 0) {}
  146. virtual ~EasyTcpServer() { CloseSocket(); }
  147. public:
  148. void InitSocket(); //初始化socket
  149. int Bind(const char* ip, unsigned short port); //绑定端口号
  150. int Listen(int n); //监听端口号
  151. SOCKET Accept(); //接收客户端连接
  152. void addClientToCellServer(std::shared_ptr<ClientSocket>& pClient); //将新客户加入到CellServer的客户端连接缓冲队列中
  153. void Start(int nCellServer); //创建从服务器,并运行所有的从服务器。(参数为从服务器的数量)
  154. void CloseSocket(); //关闭socket
  155. bool isRun() { return _sock != INVALID_SOCKET; } //判断当前服务端是否在运行
  156. bool Onrun(); //处理网络消息
  157. void time4msg(); //每1秒统计一次收到的数据包的数量
  158. public:
  159. //客户端加入事件(这个是线程安全的,因为其只会被主服务器(自己)调用)
  160. virtual void OnClientJoin(std::shared_ptr<ClientSocket>& pClient)override { _clientCount++; }
  161. //客户端离开事件,这个里面做的事情比较简单,只是将当前客户端的数量--(如果从服务器不止一个,那么此函数不是线程安全的,因为这个函数会被多个从服务器调用的)
  162. virtual void OnClientLeave(std::shared_ptr<ClientSocket>& pClient)override { _clientCount--; }
  163. //接收到客户端消息事件,将数据包的数量增加(如果从服务器不止一个,那么此函数不是线程安全的,因为这个函数会被多个从服务器调用的)
  164. //参数1,代表哪一个CellServer来处理这个消息的
  165. virtual void OnNetMsg(CellServer* pCellServer, std::shared_ptr<ClientSocket>& pClient, DataHeader* header)override { _msgCount++; }
  166. //recv事件
  167. virtual void OnNetRecv(std::shared_ptr<ClientSocket>& pClient)override { _recvCount++; }
  168. private:
  169. SOCKET _sock; //服务端套接字
  170. std:: vector< std:: shared_ptr<CellServer>> _cellServers; //存放从服务端对象
  171. CELLTimestamp _tTime; //计时器
  172. std:: atomic_int _clientCount; //客户端的数量(这里采用一个原子操作,没什么特殊原因,使用玩玩罢了,下同)
  173. std:: atomic_int _msgCount; //表示服务端接收到客户端数据包的数量
  174. std:: atomic_int _recvCount; //recv()函数执行的次数
  175. };
  176. int ClientSocket::SendData( std:: shared_ptr<DataHeader>& header) {
  177. int ret = SOCKET_ERROR;
  178. //要发送的数据的长度
  179. int nSendLen = header->dataLength;
  180. const char* pSendData = ( const char*)header.get();
  181. //在下面第一个if之后,如果nSendLen仍然大于SEND_BUFF_SIZE,那么就需要继续执行if,继续发送数据
  182. while ( true)
  183. {
  184. //如果当前"要发送的数据的长度+缓冲区数据结尾位置"之和总的缓冲区大小,说明缓冲区满了,那么将整个缓冲区都发送出去
  185. if (_lastSendPos + nSendLen >= SEND_BUFF_SIZE)
  186. {
  187. //计算缓冲区还剩多少空间
  188. int nCopyLen = SEND_BUFF_SIZE - _lastSendPos;
  189. //然后将nCopyLen长度的pSendData数据拷贝到缓冲区中
  190. memcpy(_sendMsgBuff + _lastSendPos, pSendData, nCopyLen);
  191. //计算剩余数据位置
  192. pSendData += nCopyLen;
  193. //计算剩余数据长度
  194. nSendLen -= nCopyLen;
  195. ret = send(_sock, _sendMsgBuff, SEND_BUFF_SIZE, 0);
  196. //缓冲区数据尾部置为0
  197. _lastSendPos = 0;
  198. if (ret = SOCKET_ERROR)
  199. return -1;
  200. }
  201. //如果发送缓冲区还没满,那么将这条消息放到缓冲区中,而不直接发送
  202. else
  203. {
  204. memcpy(_sendMsgBuff + _lastSendPos, pSendData, nSendLen);
  205. //更新缓冲区数据尾部
  206. _lastSendPos += nSendLen;
  207. //直接退出,不进行while
  208. break;
  209. }
  210. }
  211. return ret;
  212. }
  213. void EasyTcpServer::InitSocket()
  214. {
  215. #ifdef _WIN32
  216. WORD ver = MAKEWORD( 2, 2);
  217. WSADATA dat;
  218. WSAStartup(ver, &dat);
  219. #endif
  220. //建立socket
  221. _sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
  222. if (INVALID_SOCKET == _sock) {
  223. std:: cout << "Server:创建socket成功" << std:: endl;
  224. }
  225. else {
  226. std:: cout << "Server:创建socket成功" << std:: endl;
  227. }
  228. }
  229. int EasyTcpServer::Bind( const char* ip, unsigned short port)
  230. {
  231. if (!isRun())
  232. InitSocket();
  233. //初始化服务端地址
  234. struct sockaddr_in _sin = {};
  235. #ifdef _WIN32
  236. if (ip)
  237. _sin.sin_addr.S_un.S_addr = inet_addr(ip);
  238. else
  239. _sin.sin_addr.S_un.S_addr = INADDR_ANY;
  240. #else
  241. if (ip)
  242. _sin.sin_addr.s_addr = inet_addr(ip);
  243. else
  244. _sin.sin_addr.s_addr = INADDR_ANY;
  245. #endif
  246. _sin.sin_family = AF_INET;
  247. _sin.sin_port = htons(port);
  248. //绑定服务端地址
  249. int ret = ::bind(_sock, (struct sockaddr*)&_sin, sizeof(_sin));
  250. if (SOCKET_ERROR == ret) {
  251. if (ip)
  252. std:: cout << "Server:绑定地址(" << ip << "," << port << ")失败!" << std:: endl;
  253. else
  254. std:: cout << "Server:绑定地址(INADDR_ANY," << port << ")失败!" << std:: endl;
  255. }
  256. else {
  257. if (ip)
  258. std:: cout << "Server:绑定地址(" << ip << "," << port << ")成功!" << std:: endl;
  259. else
  260. std:: cout << "Server:绑定地址(INADDR_ANY," << port << ")成功!" << std:: endl;
  261. }
  262. return ret;
  263. }
  264. void EasyTcpServer::CloseSocket()
  265. {
  266. if (_sock != INVALID_SOCKET)
  267. {
  268. #ifdef _WIN32
  269. closesocket(_sock);
  270. WSACleanup();
  271. #else
  272. close(_sock);
  273. #endif
  274. _sock = INVALID_SOCKET;
  275. }
  276. }
  277. int EasyTcpServer::Listen( int n)
  278. {
  279. //监听网络端口
  280. int ret = listen(_sock, n);
  281. if (SOCKET_ERROR == ret)
  282. std:: cout << "Server:监听网络端口失败!" << std:: endl;
  283. else
  284. std:: cout << "Server:监听网络端口成功!" << std:: endl;
  285. return ret;
  286. }
  287. SOCKET EasyTcpServer::Accept()
  288. {
  289. //用来保存客户端地址
  290. struct sockaddr_in _clientAddr = {};
  291. int nAddrLen = sizeof(_clientAddr);
  292. SOCKET _cSock = INVALID_SOCKET;
  293. //接收客户端连接
  294. #ifdef _WIN32
  295. _cSock = accept(_sock, (struct sockaddr*)&_clientAddr, &nAddrLen);
  296. #else
  297. _cSock = accept(_sock, (struct sockaddr*)&_clientAddr, ( socklen_t*)&nAddrLen);
  298. #endif
  299. if (INVALID_SOCKET == _cSock) {
  300. std:: cout << "Server:接收到无效客户端!" << std:: endl;
  301. }
  302. else {
  303. //通知其他已存在的所有客户端,有新的客户端加入
  304. //NewUserJoin newUserInfo(static_cast<int>(_cSock));
  305. //SendDataToAll(&newUserInfo);
  306. //将新连接的客户端加入到从服务器的客户端缓冲队列中
  307. std:: shared_ptr<ClientSocket> newClient = std::make_shared<ClientSocket>(_cSock);
  308. addClientToCellServer(newClient);
  309. OnClientJoin(newClient); //相应客户端加入事件,其函数会将客户端的数量++
  310. //std::cout << "Server:接受到新的客户端(" << _clients.size() << ")连接,IP=" << inet_ntoa(_clientAddr.sin_addr)
  311. // << ",Socket=" << static_cast<int>(_cSock) << std::endl;
  312. }
  313. return _cSock;
  314. }
  315. void EasyTcpServer::addClientToCellServer( std:: shared_ptr<ClientSocket>& pClient)
  316. {
  317. //在_cellServers中寻找,哪一个CellServer其处理的客户数量最少,那么就将新客户加入到这个CellServer对象中去
  318. auto pMinServer = _cellServers[ 0];
  319. for ( auto pCellServer : _cellServers)
  320. {
  321. if (pMinServer->getClientCount() > pCellServer->getClientCount())
  322. {
  323. pMinServer = pCellServer;
  324. }
  325. }
  326. pMinServer->AddClient(pClient);
  327. }
  328. void EasyTcpServer::Start( int nCellServer)
  329. {
  330. for ( int i = 0; i < nCellServer; ++i)
  331. {
  332. auto ser = std::make_shared<CellServer>(_sock);
  333. _cellServers.push_back(ser);
  334. ser->setEventObj( this);
  335. ser->Start();
  336. }
  337. }
  338. bool EasyTcpServer::Onrun()
  339. {
  340. if (isRun())
  341. {
  342. time4msg(); //统计当前接收到的数据包的数量
  343. fd_set fdRead;
  344. //fd_set fdWrite;
  345. //fd_set fdExp;
  346. FD_ZERO(&fdRead);
  347. //FD_ZERO(&fdWrite);
  348. //FD_ZERO(&fdExp);
  349. FD_SET(_sock, &fdRead);
  350. //FD_SET(_sock, &fdWrite);
  351. //FD_SET(_sock, &fdExp);
  352. struct timeval t = { 0, 0 };
  353. int ret = select(_sock + 1, &fdRead, nullptr, nullptr, &t);
  354. if (ret < 0)
  355. {
  356. std:: cout << "Server:select出错!" << std:: endl;
  357. //select出错,那么就不能再继续运行select,出错之后,调用CloseSocket(),
  358. //关闭所有服务端、及客所有户端套接字,那么isRun()就会返回false,从而终止server.cpp程序运行
  359. CloseSocket();
  360. return false;
  361. }
  362. if (FD_ISSET(_sock, &fdRead)) //如果一个客户端连接进来,那么服务端的socket就会变为可读的,此时我们使用accept来接收这个客户端
  363. {
  364. FD_CLR(_sock, &fdRead);
  365. Accept();
  366. return true;
  367. }
  368. return true;
  369. }
  370. return false;
  371. }
  372. void EasyTcpServer::time4msg()
  373. {
  374. auto t = _tTime.getElapsedSecond();
  375. if (t >= 1.0)
  376. {
  377. //msgCount,_recvCount为什么要除以t:
  378. //因为我们要每1秒钟打印一次接收到的数据包,如果这个函数调用的时候时间差大于1秒,那么可以将recvCount/t,获得比较平均的数据包数量/recv执行次数
  379. printf( "time<%lf>,thread numer<%d>,client number<%d>,msgCount<%d>,recvCount<%d>\n",
  380. t, _cellServers.size(), static_cast< int>(_clientCount), static_cast< int>(_msgCount / t), static_cast< int>(_recvCount / t));
  381. _recvCount = 0;
  382. _msgCount = 0;
  383. _tTime.update();
  384. }
  385. }
  386. void CellServer::CloseSocket()
  387. {
  388. if (_sock != INVALID_SOCKET)
  389. {
  390. #ifdef _WIN32
  391. //将所有的客户端套接字关闭
  392. for ( auto iter : _clients)
  393. {
  394. closesocket(iter.second->sockfd());
  395. }
  396. //关闭服务端套接字
  397. closesocket(_sock);
  398. //因为这是从服务器,所以就不要清理套接字环境了,放置主服务器的套接字环境被清除
  399. //WSACleanup();
  400. #else
  401. for ( auto iter : _clients)
  402. {
  403. close(iter.second->sockfd());
  404. }
  405. close(_sock);
  406. #endif
  407. _clients.clear();
  408. _sock = INVALID_SOCKET;
  409. delete _pthread;
  410. }
  411. }
  412. bool CellServer::Onrun()
  413. {
  414. while (isRun())
  415. {
  416. //如果客户端缓冲队列_clientsBuff中有新客户,那么就将其加入到_clients中
  417. if (_clientsBuff.size() > 0)
  418. {
  419. //自解锁lock_guard,作用域结束后自动释放锁,因此if执行结束之后,_mutex就被释放了
  420. std::lock_guard< std::mutex> lock(_mutex);
  421. for ( auto pClient : _clientsBuff)
  422. {
  423. _clients[pClient->sockfd()] = pClient;
  424. }
  425. _clientsBuff.clear();
  426. _clients_change = true;
  427. }
  428. //如果没有客户,那么休眠一秒然后继续下一次循环
  429. if (_clients.empty())
  430. {
  431. std::chrono:: milliseconds t(1);
  432. std::this_thread::sleep_for(t);
  433. continue;
  434. }
  435. fd_set fdRead;
  436. FD_ZERO(&fdRead);
  437. //在主线程的select中已经对主服务端的_sock进行查询了,所以从服务器就不需要再将_sock加入到fd_set中了,否则两个地方同时操作会出错
  438. //FD_SET(_sock, &fdRead);
  439. //根据_fdRead_change判断是否有新客户端加入,如果有那么就进行新的FD_SET
  440. if (_clients_change)
  441. {
  442. _clients_change = false;
  443. for ( auto iter : _clients)
  444. {
  445. FD_SET(iter.second->sockfd(), &fdRead);
  446. if (maxSock < iter.second->sockfd())
  447. maxSock = iter.second->sockfd();
  448. }
  449. //将更新后的fd_set保存到_fdRead_bak中
  450. memcpy(&_fdRead_bak, &fdRead, sizeof(_fdRead_bak));
  451. }
  452. //否则直接拷贝,不用再循环FD_SET了
  453. else
  454. memcpy(&fdRead, &_fdRead_bak, sizeof(_fdRead_bak));
  455. //从服务器一般只用来收取数据,所以这里设置为阻塞的也可以
  456. //struct timeval t = { 1,0 };
  457. int ret = select(maxSock + 1, &fdRead, nullptr, nullptr, nullptr);
  458. if (ret < 0)
  459. {
  460. std:: cout << "Server:select出错!" << std:: endl;
  461. CloseSocket();
  462. return false;
  463. }
  464. #ifdef _WIN32
  465. //如果是WIN下运行,fd_set拥有fd_count与fd_array成员
  466. //我们可以遍历fd_set,然后从中获取数据,不需要使用FD_ISSET了
  467. for ( int n = 0; n < fdRead.fd_count; n++)
  468. {
  469. auto iter = _clients.find(fdRead.fd_array[n]);
  470. //如果RecvData出错,那么就将该客户端从_client中移除
  471. if ( -1 == RecvData(iter->second))
  472. {
  473. if (_pNetEvent)
  474. _pNetEvent->OnClientLeave(iter->second); //通知主服务器有客户端退出
  475. //delete iter->second;
  476. _clients.erase(iter->first);
  477. _clients_change = true; //这个要设置为true,因为有客户端退出了,需要重新进行FD_SET
  478. }
  479. }
  480. #else
  481. //如果在UNIX下,fd_set无fd_count与fd_array成员,我们只能遍历_clients数组
  482. //遍历_clients map容器中所有的客户端,然后从中获取数据
  483. for ( auto iter : _clients)
  484. {
  485. //因为_clients是一个map,因此每次iter返回一个pair,其first成员为key(SOCKET),value成员为value(ClientSocket)
  486. if (FD_ISSET(iter.second->sockfd(), &fdRead))
  487. {
  488. //如果RecvData出错,那么就将该客户端从_client中移除
  489. if ( -1 == RecvData(iter.second))
  490. {
  491. if (_pNetEvent)
  492. _pNetEvent->OnClientLeave(iter.second); //通知主服务器有客户端退出
  493. _clients.erase(iter.first);
  494. _clients_change = true; //原因同上
  495. }
  496. }
  497. }
  498. #endif // _WIN32
  499. }
  500. return false;
  501. }
  502. int CellServer::RecvData( std:: shared_ptr<ClientSocket>& pClient)
  503. {
  504. char *_recvBuff = pClient->recvMsgBuff() + pClient->getRecvLastPos();
  505. //先将数据接收到_recvBuff缓冲区中
  506. int _nLen = recv(pClient->sockfd(), _recvBuff, RECV_BUFF_SIZE - pClient->getRecvLastPos(), 0);
  507. _pNetEvent->OnNetRecv(pClient);
  508. if (_nLen < 0) {
  509. //std::cout << "recv函数出错!" << std::endl;
  510. return -1;
  511. }
  512. else if (_nLen == 0) {
  513. //std::cout << "客户端<Socket=" << pClient->sockfd() << ">:已退出!" << std::endl;
  514. return -1;
  515. }
  516. //(不需要这一步了)将数据从_recvBuff中拷贝到客户端的缓冲区中
  517. //memcpy(pClient->msgBuff() + pClient->getLastPos(), _recvBuff, _nLen);
  518. pClient->setRecvLastPos(pClient->getRecvLastPos() + _nLen);
  519. //判断客户端的缓冲区中数据结尾的位置,如果有一个DataHeader的大小那么就可以对数据进行处理
  520. while (pClient->getRecvLastPos() >= sizeof(DataHeader))
  521. {
  522. //获取缓冲区的首指针
  523. DataHeader* header = (DataHeader*)pClient->recvMsgBuff();
  524. //如果当前缓冲区中数据结尾的位置大于等于一个数据包的大小,那么就对数据进行处理
  525. if (pClient->getRecvLastPos() >= header->dataLength)
  526. {
  527. //先保存剩余未处理消息缓冲区的长度
  528. int nSize = pClient->getRecvLastPos() - header->dataLength;
  529. //处理网络消息
  530. OnNetMessage(pClient, header);
  531. //处理完成之后,将_recvMsgBuff中剩余未处理部分的数据前移
  532. memcpy(pClient->recvMsgBuff(), pClient->recvMsgBuff() + header->dataLength, nSize);
  533. pClient->setRecvLastPos(nSize);
  534. }
  535. else {
  536. //消息缓冲区剩余数据不够一条完整消息
  537. break;
  538. }
  539. }
  540. return 0;
  541. }
  542. void CellServer::OnNetMessage( std:: shared_ptr<ClientSocket>& pClient, DataHeader* header)
  543. {
  544. //调用主服务OnNetMsg事件
  545. _pNetEvent->OnNetMsg( this, pClient, header);
  546. switch (header->cmd)
  547. {
  548. case CMD_LOGIN: //如果是登录
  549. {
  550. //Login *login = (Login*)header;
  551. //std::cout << "服务端:收到客户端<Socket=" << pClient->sockfd() << ">的消息CMD_LOGIN,用户名:" << login->userName << ",密码:" << login->PassWord << std::endl;
  552. //此处可以判断用户账户和密码是否正确等等(省略)
  553. //返回登录的结果给客户端
  554. auto ret = std::make_shared<LoginResult>();
  555. //在_taskServer()内部封装一个发送消息任务,然后执行该发送任务
  556. this->addSendTask(pClient, ( std:: shared_ptr<DataHeader>)ret);
  557. }
  558. break;
  559. case CMD_LOGOUT: //如果是退出
  560. {
  561. //Logout *logout = (Logout*)header;
  562. //std::cout << "服务端:收到客户端<Socket=" << pClient->sockfd() << ">的消息CMD_LOGOUT,用户名:" << logout->userName << std::endl;
  563. //返回退出的结果给客户端
  564. auto ret = std::make_shared<LogoutResult>();
  565. this->addSendTask(pClient, ( std:: shared_ptr<DataHeader>)ret);
  566. }
  567. break;
  568. default: //如果有错误
  569. {
  570. //std::cout << "服务端:收到客户端<Socket=" << pClient->sockfd() << ">的未知消息消息" << std::endl;
  571. //返回错误给客户端,DataHeader默认为错误消息
  572. auto ret = std::make_shared<DataHeader>();
  573. this->addSendTask(pClient, ret);
  574. }
  575. break;
  576. }
  577. }
  578. #endif

四、测试

server.cpp测试程序如下


   
  1. #include "EasyTcpServer.hpp"
  2. #include "MessageHeader.hpp"
  3. #include "Alloctor.h"
  4. int main()
  5. {
  6. EasyTcpServer server;
  7. server.Bind( "192.168.0.105", 4567);
  8. server.Listen( 5);
  9. server.Start( 4);
  10. while (server.isRun())
  11. {
  12. server.Onrun();
  13. }
  14. server.CloseSocket();
  15. std:: cout << "服务端停止工作!" << std:: endl;
  16. getchar(); //防止程序一闪而过
  17. return 0;
  18. }

client.cpp测试程序如下

  • 代码中1000个客户端连接服务器,每次发送10条数据包

   
  1. #include "EasyTcpClient.hpp"
  2. #include "CELLTimestamp.hpp"
  3. #include <thread>
  4. #include <atomic>
  5. bool g_bRun = false;
  6. const int cCount = 1000; //客户端的数量
  7. const int tCount = 4; //线程的数量
  8. std:: atomic_int sendCount = 0; //send()函数执行的次数
  9. std:: atomic_int readyCount = 0; //代表已经准备就绪的线程数量
  10. EasyTcpClient* client[cCount]; //客户端的数组
  11. void cmdThread();
  12. void sendThread(int id);
  13. int main()
  14. {
  15. g_bRun = true;
  16. //UI线程,可以输入命令
  17. std:: thread t(cmdThread);
  18. t.detach();
  19. //启动发送线程
  20. for ( int n = 0; n < tCount; ++n)
  21. {
  22. std:: thread t(sendThread, n + 1);
  23. t.detach();
  24. }
  25. //每1秒中打印一次信息(其中包括send()函数的执行次数)
  26. CELLTimestamp tTime;
  27. while ( true)
  28. {
  29. auto t = tTime.getElapsedSecond();
  30. if (t >= 1.0)
  31. {
  32. printf( "time<%lf>,thread numer<%d>,client number<%d>,sendCount<%d>\n",
  33. t, tCount, cCount, static_cast< int>(sendCount / t));
  34. sendCount = 0;
  35. tTime.update();
  36. }
  37. Sleep( 1);
  38. }
  39. return 0;
  40. }
  41. void cmdThread()
  42. {
  43. char cmdBuf[ 256] = {};
  44. while ( true)
  45. {
  46. std:: cin >> cmdBuf;
  47. if ( 0 == strcmp(cmdBuf, "exit"))
  48. {
  49. g_bRun = false;
  50. break;
  51. }
  52. else {
  53. std:: cout << "命令不识别,请重新输入" << std:: endl;
  54. }
  55. }
  56. }
  57. void sendThread(int id)
  58. {
  59. /*
  60. 下面这几个变量是为了平均每个线程创建的客户端的数量:
  61. 例如,本次测试时客户端数量为1000,线程数量为4,那么每个线程应该创建250个客户端
  62. 线程1:c=250,begin=0,end=250
  63. 线程2:c=250,begin=250,end=500
  64. 线程3:c=250,begin=500,end=750
  65. 线程4:c=250,begin=750,end=1000
  66. */
  67. int c = cCount / tCount;
  68. int begin = (id - 1)*c;
  69. int end = id*c;
  70. for ( int n = begin; n < end; ++n) //创建客户端
  71. {
  72. client[n] = new EasyTcpClient;
  73. }
  74. for ( int n = begin; n < end; ++n) //让每个客户端连接服务器
  75. {
  76. client[n]->ConnectServer( "192.168.0.105", 4567);
  77. }
  78. printf( "Thread<%d>,Connect=<begin=%d, end=%d>\n", id, (begin + 1), end);
  79. //将readyCount,然后判断readyCount是否达到了tCount
  80. //如果没有,说明所有的线程还没有准备好,那么就等待所有线程都准备好一起返回发送数据
  81. readyCount++;
  82. while (readyCount < tCount)
  83. {
  84. std::chrono:: microseconds t(10);
  85. std::this_thread::sleep_for(t);
  86. }
  87. //这里定义为数组,可以随根据需求修改客户端单次发送给服务端的数据包数量
  88. const int nNum = 10;
  89. Login login[nNum];
  90. for ( int n = 0; n < nNum; ++n)
  91. {
  92. strcpy(login[n].userName, "dongshao");
  93. strcpy(login[n].PassWord, "123456");
  94. }
  95. //在外面定义nLen,就不用每次在for循环中SendData时还要去sizeof计算一下login的大小
  96. int nLen = sizeof(login);
  97. //循环向服务端发送消息
  98. while (g_bRun)
  99. {
  100. for ( int n = begin; n < end; ++n)
  101. {
  102. if (client[n]->SendData(login, nLen) != SOCKET_ERROR)
  103. {
  104. sendCount++;
  105. }
  106. client[n]->Onrun();
  107. }
  108. }
  109. //关闭客户端
  110. for ( int n = begin; n < end; ++n)
  111. {
  112. client[n]->CloseSocket();
  113. delete client[n];
  114. }
  115. printf( "thread:all clients close the connection!\n");
  116. }
  • 结果如下所示:与之前未更改之前的差不多一样

五、备注

  • 因为程序中的shared_ptr内部申请与释放的内存就来自于我们的内存池,所以我们如果想要程序申请的内存都来自于我们的内存可以更改MemoryAlloctor的模板参数2来更改内存池的大小

  
  1. //内存池管理工具(单例模式)
  2. class MemoryMgr
  3. {
  4. private:
  5. //可以随意更改MemoryAlloctor的模板参数2来更改内存池的大小
  6. MemoryAlloctor< 64, 4000000> _mem64;
  7. MemoryAlloctor< 128, 1000000> _mem128;
  8. MemoryAlloc* _szAlloc[MAX_MEMORY_SIZE + 1];
  9. };
  • 如果想要知道程序是否使用了系统的内存,那么可以在MemoryAlloc的allocMemory()函数中添加一个打印信息,如果使用了那么就会打印这句话,说明我们某块内存池用完了,开始向系统申请内存了

  
  1. //内存池
  2. class MemoryAlloc
  3. {
  4. void *allocMemory(size_t nSize) //申请内存
  5. {
  6. //如果内存池使用完了,那么_pHeader应该指向某个内存块的最后nullptr尾指针
  7. //此时调用malloc向系统申请内存,不向我们的内存池进行内存申请
  8. if (_pHeader == nullptr) {
  9. //打印这条消息,可以知道我们的程序使用了
  10. printf( "MemoryAlloc::allocMem: %llx,id=%d, size=%d\n", pReturn, pReturn->nID, nSize);
  11. }
  12. }
  13. };

 


转载:https://blog.csdn.net/qq_41453285/article/details/105975568
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场