目录
补充问题二:send(字节流)和recv(1024)及sendall
一、客户端/服务端架构
服务器-客户机,即Client-Server(C/S)结构。C/S结构通常采取两层结构。服务器负责数据的管理,客户机负责完成与用户的交互任务。
二、OSI七层 和 TCP/IP五层模型
七层模型,亦称OSI(Open System Interconnection)。参考模型是国际标准化组织(ISO)制定的一个用于计算机或通信系统间互联的标准体系,一般称为OSI参考模型或七层模型。
(物理层、数据链路层、网络层、传输层、会话层、表示层、应用层)
<1> 应用层
OSI参考模型中最靠近用户的一层,是为计算机用户提供应用接口,也为用户直接提供各种网络服务。我们常见应用层的网络服务协议有:HTTP,HTTPS,FTP,POP3、SMTP等。
<2> 表示层
表示层提供各种用于应用层数据的编码和转换功能,确保一个系统的应用层发送的数据能被另一个系统的应用层识别。如果必要,该层可提供一种标准表示形式,用于将计算机内部的多种数据格式转换成通信中采用的标准表示形式。数据压缩和加密也是表示层可提供的转换功能之一。
<3> 会话层
会话层就是负责建立、管理和终止表示层实体之间的通信会话。该层的通信由不同设备中的应用程序之间的服务请求和响应组成。
<4> 传输层
传输层建立了主机端到端的链接,传输层的作用是为上层协议提供端到端的可靠和透明的数据传输服务,包括处理差错控制和流量控制等问题。该层向高层屏蔽了下层数据通信的细节,使高层用户看到的只是在两个传输实体间的一条主机到主机的、可由用户控制和设定的、可靠的数据通路。我们通常说的,TCP UDP就是在这一层。端口号既是这里的“端”。
<5> 网络层
本层通过IP寻址来建立两个节点之间的连接,为源端的运输层送来的分组,选择合适的路由和交换节点,正确无误地按照地址传送给目的端的运输层。就是通常说的IP层。这一层就是我们经常说的IP协议层。IP协议是Internet的基础。
<6> 数据链路层
将比特组合成字节,再将字节组合成帧,使用链路层地址 (以太网使用MAC地址)来访问介质,并进行差错检测。
数据链路层又分为2个子层:逻辑链路控制子层(LLC)和媒体访问控制子层(MAC)。
MAC子层处理CSMA/CD算法、数据出错校验、成帧等;LLC子层定义了一些字段使上次协议能共享数据链路层。 在实际使用中,LLC子层并非必需的。
<7> 物理层
实际最终信号的传输是通过物理层实现的。通过物理介质传输比特流。规定了电平、速度和电缆针脚。常用设备有(各种物理设备)集线器、中继器、调制解调器、网线、双绞线、同轴电缆。这些都是物理层的传输介质。
TCP/IP五层协议和OSI的七层协议对应关系
在每一层实现的协议也各不同,即每一层的服务也不同.下图列出了每层主要的协议。其中每层中具体的协议
三、socket 层
四、socket 是什么
Socket是应用层与TCP/IP协议族通信的中间软件抽象层,它是一组接口。在设计模式中,Socket其实就是一个门面模式,它把复杂的TCP/IP协议族隐藏在Socket接口后面,对用户来说,一组简单的接口就是全部,让Socket去组织数据,以符合指定的协议。
所以,我们无需深入理解tcp/udp协议,socket已经为我们封装好了,我们只需要遵循socket的规定去编程,写出的程序自然就是遵循tcp/udp标准的。
有人将socket说成ip+port,ip是用来标识互联网中的一台主机的位置,而port是用来标识这台机器上的一个应用程序,ip地址是配置到网卡上的,而port是应用程序开启的,ip与port的绑定就标识了互联网中独一无二的一个应用程序
五、套接字发展史以及分类
套接字起源于 20 世纪 70 年代加利福尼亚大学伯克利分校版本的 Unix,即人们所说的 BSD Unix。 因此,有时人们也把套接字称为“伯克利套接字”或“BSD 套接字”。一开始,套接字被设计用在同 一台主机上多个应用程序之间的通讯。这也被称进程间通讯,或 IPC。套接字有两种(或者称为有两个种族),分别是基于文件型的和基于网络型的。
基于文件类型的套接字家族
套接字家族的名字:AF_UNIX
unix一切皆文件,基于文件的套接字调用的就是底层的文件系统来取数据,两个套接字进程运行在同一机器,可以通过访问同一个文件系统间接完成通信
基于网络类型的套接字家族
套接字家族的名字:AF_INET
(还有AF_INET6被用于ipv6,还有一些其他的地址家族,不过,他们要么是只用于某个平台,要么就是已经被废弃,或者是很少被使用,或者是根本没有实现,所有地址家族中,AF_INET是使用最广泛的一个,python支持很多种地址家族,但是由于我们只关心网络编程,所以大部分时候我么只使用AF_INET)
六、套接字工作流程
先从服务器端说起。服务器端先初始化Socket,然后与端口绑定(bind),对端口进行监听(listen),调用accept阻塞,等待客户端连接。在这时如果有个客户端初始化一个Socket,然后连接服务器(connect),如果连接成功,这时客户端与服务器端的连接就建立了。客户端发送数据请求,服务器端接收请求并处理请求,然后把回应数据发送给客户端,客户端读取数据,最后关闭连接,一次交互结束。
socket()模块函数用法
-
import socket
-
socket.socket(socket_family,socket_type,protocal=
0)
-
socket_family 可以是 AF_UNIX 或 AF_INET。socket_type 可以是 SOCK_STREAM 或 SOCK_DGRAM。protocol 一般不填,默认值为
0。
-
-
获取tcp/ip套接字
-
tcpSock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-
-
获取udp/ip套接字
-
udpSock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
服务端套接字函数
s.bind() 绑定(主机,端口号)到套接字
s.listen() 开始TCP监听
s.accept() 被动接受TCP客户的连接,(阻塞式)等待连接的到来
客户端套接字函数
s.connect() 主动初始化TCP服务器连接
s.connect_ex() connect()函数的扩展版本,出错时返回出错码,而不是抛出异常
公共用途的套接字函数
s.recv() 接收TCP数据
s.send() 发送TCP数据(send在待发送数据量大于己端缓存区剩余空间时,数据丢失,不会发完)
s.sendall() 发送完整的TCP数据(本质就是循环调用send,sendall在待发送数据量大于己端缓存区剩余空间时,数据不丢失,循环调用send直到发完)
s.recvfrom() 接收UDP数据
s.sendto() 发送UDP数据
s.getpeername() 连接到当前套接字的远端的地址
s.getsockname() 当前套接字的地址
s.getsockopt() 返回指定套接字的参数
s.setsockopt() 设置指定套接字的参数
s.close() 关闭套接字
面向锁的套接字方法
s.setblocking() 设置套接字的阻塞与非阻塞模式
s.settimeout() 设置阻塞套接字操作的超时时间
s.gettimeout() 得到阻塞套接字操作的超时时间
面向文件的套接字的函数
s.fileno() 套接字的文件描述符
s.makefile() 创建一个与该套接字相关的文件
七、基于 TCP 的套接字
tcp是基于链接的,必须先启动服务端,然后再启动客户端去链接服务端
服务端
-
import socket
-
-
server = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
# 创建服务器套接字
-
server.bind((
"127.0.0.1",
8000))
# 给套接字绑定地址信息
-
server.listen(backlog=
3)
# backlog 处理创建连接的最大等待数
-
conn, add = server.accept()
# 接受客户端链接, add: 客户端的ip+port
-
print(
"conn = {}, add = {}".format(conn, add))
-
-
msg = conn.recv(
1024)
# 接收数据
-
print(
"服务端接收到的消息:{}".format(msg))
-
conn.send(msg.upper())
# 发送数据
-
-
conn.close()
# 关闭已经创建的连接
-
server.close()
# 关闭服务器套接字
客户端
-
import socket
-
-
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 创建客户端套接字
-
-
client.connect((
"127.0.0.1",
8000))
# 尝试连接服务器
-
-
client.send(
"hello".encode(
"utf-8"))
# 发送数据
-
-
data = client.recv(
1024)
# 接收数据
-
print(
"客户端接收到的消息:{}".format(data))
-
-
client.close()
# 关闭客户端套接字
服务端循环链接请求收发消息
服务端
-
from socket
import *
-
-
KEY_IP_PORT = (
"127.0.0.1",
8000)
-
KEY_BACKLOG =
3
-
KEY_BUFFER_SIZE =
1024
-
KEY_UTF8 =
"utf-8"
-
-
tcp_server = socket(AF_INET, SOCK_STREAM)
-
tcp_server.bind(KEY_IP_PORT)
-
tcp_server.listen(KEY_BACKLOG)
-
-
while
True:
-
print(
"服务端开始运行>>>")
-
conn, addr = tcp_server.accept()
-
while
True:
-
try:
-
data = conn.recv(KEY_BUFFER_SIZE)
-
print(
"服务接收到数据:{}".format(data.decode(KEY_UTF8)))
-
conn.send(
"server return: <{}>".format(data.decode(KEY_UTF8)).encode(KEY_UTF8))
-
except Exception:
-
break
-
conn.close()
-
-
tcp_server.close()
-
客户端
-
from socket
import *
-
-
KEY_IP_PORT = (
"127.0.0.1",
8000)
-
KEY_BUFFER_SIZE =
1024
-
KEY_UTF8 =
"utf-8"
-
-
tcp_client = socket(AF_INET, SOCK_STREAM)
-
tcp_client.connect(KEY_IP_PORT)
-
-
while
True:
-
msg = input(
">>>").strip()
-
if
not msg:
continue
-
print(
"客户端发送数据:{}".format(msg))
-
tcp_client.send(msg.encode(KEY_UTF8))
-
ret_msg = tcp_client.recv(KEY_BUFFER_SIZE)
-
print(
"客户端接收到的数据:{}".format(ret_msg.decode(KEY_UTF8)))
-
-
tcp_client.close()
基于TCP实现远超执行命令
服务端
-
from socket
import *
-
import subprocess
-
-
KEY_IP_PORT = (
"127.0.0.1",
8000)
-
KEY_BACKLOG =
3
-
KEY_UTF8 =
"utf-8"
-
KEY_BUFFER_SIZE =
1024
-
-
tcp_server = socket(AF_INET, SOCK_STREAM)
-
tcp_server.bind(KEY_IP_PORT)
-
tcp_server.listen(KEY_BACKLOG)
-
-
while
True:
-
conn, addr = tcp_server.accept()
-
print(
"服务端接收到的请求的addr = {}".format(addr))
-
-
while
True:
-
try:
-
cmd = conn.recv(KEY_BUFFER_SIZE)
-
if
not cmd:
break
-
print(
"执行命令:{}".format(cmd.decode(KEY_UTF8)))
-
-
# 执行命令,得到命令执行结果
-
res = subprocess.Popen(cmd.decode(KEY_UTF8), shell=
True,
-
stdin=subprocess.PIPE,
-
stdout=subprocess.PIPE,
-
stderr=subprocess.PIPE)
-
err_res = res.stderr.read()
-
if err_res:
-
cmd_res = err_res
-
else:
-
cmd_res = res.stdout.read()
-
-
# 返回命令执行结果
-
conn.send(cmd_res)
-
except Exception
as e:
-
print(
"出现异常: {}".format(e))
-
break
-
conn.close()
-
print(
"addr = {} 退出链接".format(addr))
客户端
-
from socket
import *
-
-
KEY_IP_PORT = (
"127.0.0.1",
8000)
-
KEY_BACKLOG =
3
-
KEY_UTF8 =
"utf-8"
-
KEY_GBK =
"gbk"
-
KEY_BUFFER_SIZE =
1024
-
-
tcp_client = socket(AF_INET, SOCK_STREAM)
-
tcp_client.connect(KEY_IP_PORT)
-
-
while
True:
-
cmd = input(
">>>").strip()
-
if
not cmd:
break
-
if cmd ==
"quit":
-
break
-
tcp_client.send(cmd.encode(KEY_UTF8))
-
res = tcp_client.recv(KEY_BUFFER_SIZE)
-
print(res.decode(KEY_UTF8))
-
tcp_client.close()
重启服务端时可能会遇到
这个是由于你的服务端仍然存在四次挥手的time_wait状态在占用地址(如果不懂,请深入研究1.tcp三次握手,四次挥手 2.syn洪水攻击 3.服务器高并发情况下会有大量的time_wait状态的优化方法)
解决办法:
-
#加入一条socket配置,重用ip和端口
-
-
phone=socket(AF_INET,SOCK_STREAM)
-
phone.setsockopt(SOL_SOCKET,SO_REUSEADDR,
1)
#就是它,在bind前加
-
phone.bind((
'127.0.0.1',
8080))
-
发现系统存在大量TIME_WAIT状态的连接,通过调整linux内核参数解决,
-
vi
/etc/sysctl.conf
-
-
编辑文件,加入以下内容:
-
net.ipv4.tcp_syncookies
=
1
-
net.ipv4.tcp_tw_reuse
=
1
-
net.ipv4.tcp_tw_recycle
=
1
-
net.ipv4.tcp_fin_timeout
=
30
-
-
然后执行
/sbin/sysctl
-p
让参数生效。
-
-
net.ipv4.tcp_syncookies
=
1
表示开启SYN
Cookies。当出现SYN等待队列溢出时,启用cookies来处理,可防范少量SYN攻击,默认为0,表示关闭;
-
-
net.ipv4.tcp_tw_reuse
=
1
表示开启重用。允许将TIME-WAIT
sockets重新用于新的TCP连接,默认为0,表示关闭;
-
-
net.ipv4.tcp_tw_recycle
=
1
表示开启TCP连接中TIME-WAIT
sockets的快速回收,默认为0,表示关闭。
-
-
net.ipv4.tcp_fin_timeout
修改系統默认的
TIMEOUT
时间
八、基于 UDP 的套接字
udp是无链接的,先启动哪一端都不会报错
udp服务端
-
ss = socket()
#创建一个服务器的套接字
-
ss.bind()
#绑定服务器套接字
-
inf_loop:
#服务器无限循环
-
cs = ss.recvfrom()/ss.sendto()
# 对话(接收与发送)
-
ss.close()
# 关闭服务器套接字
udp客户端
-
cs = socket()
# 创建客户套接字
-
comm_loop:
# 通讯循环
-
cs.sendto()/cs.recvfrom()
# 对话(发送/接收)
-
cs.close()
# 关闭客户套接字
udp套接字简单示例
udp服务端
-
from socket
import *
-
-
KEY_IP_PORT = (
"127.0.0.1",
8000)
-
KEY_UTF8 =
"utf-8"
-
KEY_BUFFER_SIZE =
1024
-
-
udp_server = socket(AF_INET, SOCK_DGRAM)
# 创建一个服务器的套接字
-
udp_server.bind(KEY_IP_PORT)
# 绑定ip和地址
-
-
print(
"UDP SERVER ...")
-
while
True:
-
data, addr = udp_server.recvfrom(KEY_BUFFER_SIZE)
# 接收消息
-
print(
"data = {}, addr = {}".format(data.decode(KEY_UTF8), addr))
-
res_msg =
"UDP SERVER return:<{}>".format(data.decode(KEY_UTF8)).encode(KEY_UTF8)
-
udp_server.sendto(res_msg, addr)
# 发送消息
-
-
udp_server.close()
# 关闭服务器套接字
-
udp客户端
-
from socket
import *
-
-
KEY_IP_PORT = (
"127.0.0.1",
8000)
-
KYE_UTF8 =
"utf-8"
-
KYE_BUFFER_SIZE =
1024
-
-
udp_client = socket(AF_INET, SOCK_DGRAM)
# 创建客户端套接字
-
-
while
True:
-
msg = input(
">>>").strip()
-
if
not msg:
continue
-
udp_client.sendto(msg.encode(KYE_UTF8), KEY_IP_PORT)
# 发送消息
-
data, addr = udp_client.recvfrom(KYE_BUFFER_SIZE)
# 接收消息
-
print(
"data = {}, addr = {}".format(data.decode(KYE_UTF8), addr))
-
-
udp_client.close()
# 关闭套接字
-
由于 UDP 是不需要建立链接,上面的服务端代码,同时可以处理多个客户端的请求。
简单时间服务器
服务端
-
from socket
import *
-
import time
-
-
KEY_IP_PORT = (
"127.0.0.1",
8000)
-
KEY_UTF8 =
"utf-8"
-
KEY_BUFFER_SIZE =
1024
-
-
udp_server = socket(AF_INET, SOCK_DGRAM)
# 创建一个服务器的套接字
-
udp_server.bind(KEY_IP_PORT)
# 绑定ip和地址
-
-
print(
"UDP SERVER ...")
-
while
True:
-
data, addr = udp_server.recvfrom(KEY_BUFFER_SIZE)
# 接收消息
-
if
not data:
-
fmt =
"%Y-%m-%d %X"
-
else:
-
fmt = data.decode(KEY_UTF8)
-
time_str = time.strftime(fmt)
-
udp_server.sendto(time_str.encode(KEY_UTF8), addr)
# 发送消息
-
-
udp_server.close()
# 关闭服务器套接字
-
客户端
-
from socket
import *
-
-
KEY_IP_PORT = (
"127.0.0.1",
8000)
-
KYE_UTF8 =
"utf-8"
-
KYE_BUFFER_SIZE =
1024
-
-
udp_client = socket(AF_INET, SOCK_DGRAM)
# 创建客户端套接字
-
-
while
True:
-
msg = input(
">>>").strip()
-
udp_client.sendto(msg.encode(KYE_UTF8), KEY_IP_PORT)
# 发送消息
-
data, addr = udp_client.recvfrom(KYE_BUFFER_SIZE)
# 接收消息
-
print(
"NTP 返回时间:{}".format(data.decode(KYE_UTF8)))
-
-
udp_client.close()
# 关闭套接字
-
九、粘包现象
让我们基于tcp先制作一个远程执行命令的程序(1:执行错误命令 2:执行cd 3:执行 ipconifig /all)
注意:
res=subprocess.Popen(cmd.decode('utf-8'),
shell=True,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE)
的结果的编码是以当前所在的系统为准的,如果是windows,那么res.stdout.read()读出的就是GBK编码的,在接收端需要用GBK解码
1、基于TCP进行测试
服务端
-
from socket
import *
-
import subprocess
-
-
KEY_IP_PORT = (
"127.0.0.1",
8000)
-
KEY_BACKLOG =
3
-
KEY_UTF8 =
"utf-8"
-
KEY_BUFFER_SIZE =
1024
-
-
tcp_server = socket(AF_INET, SOCK_STREAM)
-
tcp_server.bind(KEY_IP_PORT)
-
tcp_server.listen(KEY_BACKLOG)
-
-
while
True:
-
conn, addr = tcp_server.accept()
-
print(
"服务端接收到的请求的addr = {}".format(addr))
-
-
while
True:
-
try:
-
cmd = conn.recv(KEY_BUFFER_SIZE)
-
if
not cmd:
break
-
print(
"执行命令:{}".format(cmd.decode(KEY_UTF8)))
-
-
# 执行命令,得到命令执行结果
-
res = subprocess.Popen(cmd.decode(KEY_UTF8), shell=
True,
-
stdin=subprocess.PIPE,
-
stdout=subprocess.PIPE,
-
stderr=subprocess.PIPE)
-
err_res = res.stderr.read()
-
if err_res:
-
cmd_res = err_res
-
else:
-
cmd_res = res.stdout.read()
-
-
# 返回命令执行结果
-
if
not cmd_res:
-
cmd_res =
"执行成功".encode(KEY_UTF8)
-
conn.send(cmd_res)
-
except Exception
as e:
-
print(
"出现异常: {}".format(e))
-
break
-
conn.close()
-
print(
"addr = {} 退出链接".format(addr))
客户端
-
from socket
import *
-
-
KEY_IP_PORT = (
"127.0.0.1",
8000)
-
KEY_BACKLOG =
3
-
KEY_UTF8 =
"utf-8"
-
KEY_GBK =
"gbk"
-
KEY_BUFFER_SIZE =
1024
-
-
tcp_client = socket(AF_INET, SOCK_STREAM)
-
tcp_client.connect(KEY_IP_PORT)
-
-
while
True:
-
cmd = input(
">>>").strip()
-
if
not cmd:
break
-
if cmd ==
"quit":
-
break
-
tcp_client.send(cmd.encode(KEY_UTF8))
-
res = tcp_client.recv(KEY_BUFFER_SIZE)
-
print(res.decode(
"gbk"))
-
tcp_client.close()
上述程序是基于tcp的socket,在运行时会发生粘包
测试:tcp 接收端 出现粘包现象 执行 cd : 返回路径 执行 ipconfig /all : 返回所有网络配置 结果:执行 cd 时,返回的结果小于 1024 字节,没有出现粘包现象; 执行 ipconfig /all 时,返回的网络配置大于1024,出现粘包现象(在执行完 ipconfig /all 后再次执行 cd , 返回的内容是网络配置的内容) |
2、基于UDP进行测试
服务端
-
from socket
import *
-
import subprocess
-
-
KEY_IP_PORT = (
"127.0.0.1",
8000)
-
KEY_BUFFER_SIZE =
1024
-
KEY_UTF8 =
"utf-8"
-
KEY_GBK =
"gbk"
-
KEY_BACKLOG_SIZE =
3
-
-
udp_server = socket(AF_INET, SOCK_DGRAM)
-
udp_server.bind(KEY_IP_PORT)
-
-
while
True:
-
try:
-
cmd, addr = udp_server.recvfrom(KEY_BUFFER_SIZE)
-
-
res = subprocess.Popen(cmd.decode(KEY_UTF8), shell=
True,
-
stderr=subprocess.PIPE,
-
stdout=subprocess.PIPE,
-
stdin=subprocess.PIPE)
-
err_res = res.stderr.read()
-
if err_res:
-
cmd_res = err_res
-
else:
-
cmd_res = res.stdout.read()
-
-
if
not cmd_res:
-
cmd_res =
"执行成功".encode(
"gbk")
-
print(
"length = {}".format(len(cmd_res)))
-
udp_server.sendto(cmd_res, addr)
-
except Exception
as e:
-
print(e)
-
break
-
-
udp_server.close()
客户端
-
from socket
import *
-
import subprocess
-
-
KEY_IP_PORT = (
"127.0.0.1",
8000)
-
KEY_BUFFER_SIZE =
1024
-
KEY_UTF8 =
"utf-8"
-
KEY_GBK =
"gbk"
-
KEY_BACKLOG_SIZE =
3
-
-
udp_client = socket(AF_INET, SOCK_DGRAM)
-
-
while
True:
-
cmd = input(
">>>").strip()
-
if
not cmd:
break
-
if cmd ==
"quit":
-
break
-
udp_client.sendto(cmd.encode(KEY_UTF8), KEY_IP_PORT)
-
data, addr = udp_client.recvfrom(KEY_BUFFER_SIZE)
-
print(data.decode(KEY_GBK))
-
-
udp_client.close()
上述程序是基于udp的socket,在运行时永远不会发生粘包
|
十、什么是粘包
须知:只有TCP有粘包现象,UDP永远不会粘包,首先需要掌握一个socket收发消息的原理
发送端可以是一K一K地发送数据,而接收端的应用程序可以两K两K地提走数据,当然也有可能一次提走3K或6K数据,或者一次只提走几个字节的数据,也就是说,应用程序所看到的数据是一个整体,或说是一个流(stream),一条消息有多少字节对应用程序是不可见的,因此TCP协议是面向流的协议,这也是容易出现粘包问题的原因。而UDP是面向消息的协议,每个UDP段都是一条消息,应用程序必须以消息为单位提取数据,不能一次提取任意字节的数据,这一点和TCP是很不同的。怎样定义消息呢?可以认为对方一次性write/send的数据为一个消息,需要明白的是当对方send一条信息的时候,无论底层怎样分段分片,TCP协议层会把构成整条消息的数据段排序完成后才呈现在内核缓冲区。
例如基于tcp的套接字客户端往服务端上传文件,发送时文件内容是按照一段一段的字节流发送的,在接收方看了,根本不知道该文件的字节流从何处开始,在何处结束
所谓粘包问题主要还是因为接收方不知道消息之间的界限,不知道一次性提取多少字节的数据所造成的。
此外,发送方引起的粘包是由TCP协议本身造成的,TCP为提高传输效率,发送方往往要收集到足够多的数据后才发送一个TCP段。若连续几次需要send的数据都很少,通常TCP会根据优化算法把这些数据合成一个TCP段后一次发送出去,这样接收方就收到了粘包数据。
- TCP(transport control protocol,传输控制协议)是面向连接的,面向流的,提供高可靠性服务。收发两端(客户端和服务器端)都要有一一成对的socket,因此,发送端为了将多个发往接收端的包,更有效的发到对方,使用了优化方法(Nagle算法),将多次间隔较小且数据量小的数据,合并成一个大的数据块,然后进行封包。这样,接收端,就难于分辨出来了,必须提供科学的拆包机制。 即面向流的通信是无消息保护边界的。
- UDP(user datagram protocol,用户数据报协议)是无连接的,面向消息的,提供高效率服务。不会使用块的合并优化算法,, 由于UDP支持的是一对多的模式,所以接收端的skbuff(套接字缓冲区)采用了链式结构来记录每一个到达的UDP包,在每个UDP包中就有了消息头(消息来源地址,端口等信息),这样,对于接收端来说,就容易进行区分处理了。 即面向消息的通信是有消息保护边界的。
- tcp是基于数据流的,于是收发的消息不能为空,这就需要在客户端和服务端都添加空消息的处理机制,防止程序卡住,而udp是基于数据报的,即便是你输入的是空内容(直接回车),那也不是空消息,udp协议会帮你封装上消息头,实验略
udp的recvfrom是阻塞的,一个recvfrom(x)必须对唯一一个sendinto(y),收完了x个字节的数据就算完成,若是y>x数据就丢失,这意味着udp根本不会粘包,但是会丢数据,不可靠
tcp的协议数据不会丢,没有收完包,下次接收,会继续上次继续接收,己端总是在收到ack时才会清除缓冲区内容。数据是可靠的,但是会粘包。
两种情况下会发生粘包
1、发送端造成粘包
发送端需要等缓冲区满才发送出去,造成粘包(发送数据时间间隔很短,数据了很小,会合到一起,产生粘包)
服务端
-
from socket
import *
-
import subprocess
-
-
KEY_IP_PORT = (
"127.0.0.1",
8000)
-
KEY_BACKLOG =
3
-
KEY_UTF8 =
"utf-8"
-
KEY_BUFFER_SIZE =
1024
-
-
tcp_server = socket(AF_INET, SOCK_STREAM)
-
tcp_server.bind(KEY_IP_PORT)
-
tcp_server.listen(KEY_BACKLOG)
-
-
conn, addr = tcp_server.accept()
-
-
data1 = conn.recv(
15)
-
print(
"data1 = {}".format(data1))
-
data2 = conn.recv(
15)
-
print(
"data2 = {}".format(data2))
-
data3 = conn.recv(
15)
-
print(
"data3 = {}".format(data3))
客户端
-
from socket
import *
-
-
KEY_IP_PORT = (
"127.0.0.1",
8000)
-
KEY_BACKLOG =
3
-
KEY_UTF8 =
"utf-8"
-
KEY_GBK =
"gbk"
-
KEY_BUFFER_SIZE =
1024
-
-
tcp_client = socket(AF_INET, SOCK_STREAM)
-
tcp_client.connect(KEY_IP_PORT)
-
-
tcp_client.send(
"123".encode(KEY_UTF8))
-
tcp_client.send(
"abc".encode(KEY_UTF8))
-
tcp_client.send(
"ABC".encode(KEY_UTF8))
出现粘包打印结果:
-
理想接收
-
data1 = b
'123'
-
data2 = b
'abc'
-
data3 = b
'ABC'
-
-
服务端每次接收5个字节的情况(之一):
-
data1 = b
'123ab'
-
data2 = b
'cABC'
-
data3 = b
''
-
-
服务端每次接收15个字节的情况(之一):
-
data1 = b
'123abc'
-
data2 = b
'ABC'
-
data3 = b
''
-
-
data1 = b
'123abcABC'
-
data2 = b
''
-
data3 = b
''
2、接收端造成粘包
接收方不及时接收缓冲区的包,造成多个包接收(客户端发送了一段数据,服务端只收了一小部分,服务端下次再收的时候还是从缓冲区拿上次遗留的数据,产生粘包)
服务端
-
from socket
import *
-
import subprocess
-
-
KEY_IP_PORT = (
"127.0.0.1",
8000)
-
KEY_BACKLOG =
3
-
KEY_UTF8 =
"utf-8"
-
KEY_BUFFER_SIZE =
1024
-
-
tcp_server = socket(AF_INET, SOCK_STREAM)
-
tcp_server.bind(KEY_IP_PORT)
-
tcp_server.listen(KEY_BACKLOG)
-
-
conn, addr = tcp_server.accept()
-
-
data1 = conn.recv(
1)
-
print(
"data1 = {}".format(data1))
-
data2 = conn.recv(
2)
-
print(
"data2 = {}".format(data2))
-
data3 = conn.recv(
3)
-
print(
"data3 = {}".format(data3))
客户端
-
from socket
import *
-
-
KEY_IP_PORT = (
"127.0.0.1",
8000)
-
KEY_BACKLOG =
3
-
KEY_UTF8 =
"utf-8"
-
KEY_GBK =
"gbk"
-
KEY_BUFFER_SIZE =
1024
-
-
tcp_client = socket(AF_INET, SOCK_STREAM)
-
tcp_client.connect(KEY_IP_PORT)
-
-
tcp_client.send(
"123abcABC".encode(KEY_UTF8))
出现粘包结果:
-
服务端获取结果(由于客户端第一次发送的数据大于第一次接收的数据,导致数据残留):
-
data1 =
b'1'
-
data2 =
b'23'
-
data3 =
b'abc'
拆包的发生情况
当发送端缓冲区的长度大于网卡的MTU时,tcp会将这次发送的数据拆成几个数据包发送出去。
补充问题一:为何tcp是可靠传输,udp是不可靠传输
基于tcp的数据传输请参考【linhaifeng】的另一篇文章http://www.cnblogs.com/linhaifeng/articles/5937962.html,tcp在数据传输时,发送端先把数据发送到自己的缓存中,然后协议控制将缓存中的数据发往对端,对端返回一个ack=1,发送端则清理缓存中的数据,对端返回ack=0,则重新发送数据,所以tcp是可靠的
而udp发送数据,对端是不会返回确认信息的,因此不可靠
补充问题二:send(字节流)和recv(1024)及sendall
recv里指定的1024意思是从缓存里一次拿出1024个字节的数据
send的字节流是先放入己端缓存,然后由协议控制将缓存内容发往对端,如果待发送的字节流大小大于缓存剩余空间,那么数据丢失,用sendall就会循环调用send,数据不会丢失
十一、解决粘包的办法
为字节流加上自定义固定长度报头,报头中包含字节流长度,然后一次send到对端,对端在接收时,先从缓存中取出定长的报头,然后再取真实数据
服务端
-
from socket
import *
-
import subprocess
-
import struct
-
-
KEY_IP_PORT = (
"127.0.0.1",
8000)
-
KEY_BACKLOG =
3
-
KEY_UTF8 =
"utf-8"
-
KEY_BUFFER_SIZE =
1024
-
-
tcp_server = socket(AF_INET, SOCK_STREAM)
-
tcp_server.bind(KEY_IP_PORT)
-
tcp_server.listen(KEY_BACKLOG)
-
-
while
True:
-
conn, addr = tcp_server.accept()
-
print(
"服务端接收到的请求的addr = {}".format(addr))
-
-
while
True:
-
try:
-
cmd = conn.recv(KEY_BUFFER_SIZE)
-
if
not cmd:
break
-
print(
"执行命令:{}".format(cmd.decode(KEY_UTF8)))
-
-
# 执行命令,得到命令执行结果
-
res = subprocess.Popen(cmd.decode(KEY_UTF8), shell=
True,
-
stdin=subprocess.PIPE,
-
stdout=subprocess.PIPE,
-
stderr=subprocess.PIPE)
-
err_res = res.stderr.read()
-
if err_res:
-
cmd_res = err_res
-
else:
-
cmd_res = res.stdout.read()
-
-
# 返回命令执行结果
-
if
not cmd_res:
-
cmd_res =
"执行成功".encode(KEY_UTF8)
-
cmd_length = len(cmd_res)
-
cmd_length_b = struct.pack(
"i", cmd_length)
# 将返回内容的长短转换成4个字节的形式
-
print(
"发送长度 = {}".format(cmd_length))
-
conn.send(cmd_length_b)
# 将长度先发送
-
conn.send(cmd_res)
-
except Exception
as e:
-
print(
"出现异常: {}".format(e))
-
break
-
conn.close()
-
print(
"addr = {} 退出链接".format(addr))
客户端
-
from socket
import *
-
import struct
-
-
KEY_IP_PORT = (
"127.0.0.1",
8000)
-
KEY_BACKLOG =
3
-
KEY_UTF8 =
"utf-8"
-
KEY_GBK =
"gbk"
-
KEY_BUFFER_SIZE =
1024
-
-
tcp_client = socket(AF_INET, SOCK_STREAM)
-
tcp_client.connect(KEY_IP_PORT)
-
-
while
True:
-
# 发送
-
cmd = input(
">>>").strip()
-
if
not cmd:
break
-
if cmd ==
"quit":
-
break
-
tcp_client.send(cmd.encode(KEY_UTF8))
-
# 接收
-
length_res = tcp_client.recv(
4)
-
length = struct.unpack(
"i", length_res)[
0]
-
-
recv_size =
0
-
recv_msg =
b''
-
while recv_size < length:
-
# 判断剩下的是否大于 1024,如果小于1024, 按照实际接收
-
if length - recv_size >= KEY_BUFFER_SIZE:
-
recv_msg += tcp_client.recv(KEY_BUFFER_SIZE)
-
else:
-
recv_msg += tcp_client.recv(length - recv_size)
-
recv_size = len(recv_msg)
-
print(recv_msg.decode(
"gbk"))
-
print(
"length = {}".format(recv_size))
-
tcp_client.close()
十二、认证客户端的链接合法性
如果你想在分布式系统中实现一个简单的客户端链接认证功能,又不像SSL那么复杂,那么利用hmac+加盐的方式来实现
服务端
-
from socket
import *
-
import hmac
-
import struct
-
import os
-
-
-
KEY_IP_PORT = (
"127.0.0.1",
8000)
-
KEY_UTF8 =
"utf-8"
-
KEY_BACKLOG =
5
-
KEY_BUFFER_SIZE =
1024
-
KEY_SECRET =
b'1a2b3c4d5e'
-
-
-
def conn_auth(conn):
-
"""
-
检查链接是否合法
-
:param conn:
-
:return:
-
"""
-
print(
"开始认证客户端链接。。。")
-
msg = os.urandom(
32)
-
conn.sendall(msg)
-
hmac_md5 = hmac.new(KEY_SECRET, msg)
# 第三个参数digestmod默认md5
-
digest = hmac_md5.digest()
-
respone = conn.recv(len(digest))
-
flag = hmac.compare_digest(digest, respone)
-
if flag:
-
result_bytes = struct.pack(
"i",
0)
# 认证成功,发送0
-
else:
-
result_bytes = struct.pack(
"i",
1)
# 认证成功,发送1
-
conn.sendall(result_bytes)
# 发送认证结果给客户端
-
return flag
-
-
-
def data_handle(conn, buffer_size=1024):
-
"""
-
处理链接数据传输
-
:param conn:
-
:param buffer_size:
-
:return:
-
"""
-
if
not conn_auth(conn):
-
print(
"客户端认证失败,关闭链接")
-
conn.close()
-
return
-
while
True:
-
try:
-
recv_msg = conn.recv(buffer_size)
-
if
not recv_msg:
-
break
-
conn.send(
"服务端返回信息:<{}>".format(recv_msg.decode(KEY_UTF8)).encode(KEY_UTF8))
-
except Exception
as e:
-
print(e)
-
break
-
-
-
def server_handle(ip_port, buffer_size, backlog=5):
-
"""
-
处理连接请求
-
:param ip_port:
-
:param buffer_size:
-
:param backlog:
-
:return:
-
"""
-
server = socket(AF_INET, SOCK_STREAM)
-
server.bind(ip_port)
-
server.listen(backlog)
-
-
while
True:
-
conn, addr = server.accept()
-
print(
"新链接客户端:{}".format(addr))
-
data_handle(conn, buffer_size)
-
-
-
if __name__ ==
'__main__':
-
print(
"启动服务端。。。")
-
server_handle(KEY_IP_PORT, KEY_BUFFER_SIZE, KEY_BACKLOG)
客户端
-
from socket
import *
-
import hmac
-
import os
-
import struct
-
-
-
KEY_IP_PORT = (
"127.0.0.1",
8000)
-
KEY_UTF8 =
"utf-8"
-
KEY_BACKLOG =
5
-
KEY_BUFFER_SIZE =
1024
-
KEY_SECRET =
b'1a2b3c4d5e'
-
-
-
def client_auth(conn):
-
"""
-
认证客户端
-
:param conn:
-
:return:
-
"""
-
recv_msg = conn.recv(
32)
-
hmac_md5 = hmac.new(KEY_SECRET, recv_msg)
-
digest = hmac_md5.digest()
-
conn.sendall(digest)
-
result_bytes = conn.recv(
4)
-
result = struct.unpack(
"i", result_bytes)
-
print(
"认证返回结果: {}(0:成功,1:失败)".format(result[
0]))
-
if result[
0]:
-
return
False
-
else:
-
return
True
-
-
def client_handle(ip_port, buffer_size):
-
"""
-
处理客户端链接
-
:param ip_port:
-
:param buffer_size:
-
:return:
-
"""
-
client = socket(AF_INET, SOCK_STREAM)
-
client.connect(ip_port)
-
# 验证客户端是否认证通过
-
if
not client_auth(client):
-
client.close()
-
print(
"客户端认证失败")
-
return
-
-
while
True:
-
inp = input(
">>>").strip()
-
if
not inp:
break
-
if inp ==
"quit":
break
-
client.sendall(inp.encode(KEY_UTF8))
-
recv_msg = client.recv(KEY_BUFFER_SIZE)
-
print(
"客户端收到消息:{}".format(recv_msg.decode(KEY_UTF8)))
-
client.close()
-
-
-
-
if __name__ ==
'__main__':
-
client_handle(KEY_IP_PORT, KEY_BUFFER_SIZE)
十三、socketserver 实现并发
基于tcp的套接字,关键就是两个循环,一个链接循环,一个通信循环
socketserver模块中分两大类:server类(解决链接问题)和request类(解决通信问题)
socketserver源码分析
server类:
request类:
继承关系:
以下述代码为例,分析socketserver源码:
s=socketserver.ThreadingTCPServer(('127.0.0.1',8000),FtpServer)
s.serve_forever()
查找属性的顺序:ThreadingTCPServer->ThreadingMixIn->TCPServer->BaseServer
1.实例化得到ftpserver,先找类ThreadingTCPServer的__init__,在TCPServer中找到,进而执行server_bind,server_active
2.找ftpserver下的serve_forever,在BaseServer中找到,进而执行self._handle_request_noblock(),该方法同样是在BaseServer中
3.执行self._handle_request_noblock()进而执行request, client_address = self.get_request()(就是TCPServer中的self.socket.accept()),然后执行self.process_request(request, client_address)
4.在ThreadingMixIn中找到process_request,开启多线程应对并发,进而执行process_request_thread,执行self.finish_request(request, client_address)
5.上述四部分完成了链接循环,本部分开始进入处理通讯部分,在BaseServer中找到finish_request,触发我们自己定义的类的实例化,去找__init__方法,而我们自己定义的类没有该方法,则去它的父类也就是BaseRequestHandler中找....
源码分析总结
基于tcp的socketserver我们自己定义的类中的
1. self.server即套接字对象
2. self.request即一个链接
3. self.client_address即客户端地址
基于udp的socketserver我们自己定义的类中的
1. self.request是一个元组(第一个元素是客户端发来的数据,第二部分是服务端的udp套接字对象),如(b'adsf', <socket.socket fd=200, family=AddressFamily.AF_INET, type=SocketKind.SOCK_DGRAM, proto=0, laddr=('127.0.0.1', 8080)>)
2. self.client_address即客户端地址
测试代码
服务端
-
import socketserver
-
-
KEY_IP_PORT = (
"127.0.0.1",
8000)
-
KEY_BACKLOG =
3
-
KEY_UTF8 =
"utf-8"
-
KEY_BUFFER_SIZE =
1024
-
-
class MyServer(socketserver.BaseRequestHandler):
-
def handle(self):
-
print(
"conn is :{}".format(self.request))
-
print(
"addr is :{}".format(self.client_address))
-
-
while
True:
-
try:
-
# 接收数据
-
data = self.request.recv(KEY_BUFFER_SIZE)
-
if
not data:
break
# 客户端端口链接有可能一直返回 b""
-
print(
"服务端接收到{}的数据:{}".format(self.client_address, data))
-
# 发送数据
-
self.request.sendall(
"服务返回数据<{}>".format(data).encode(KEY_UTF8))
-
except Exception
as e:
-
print(e)
-
break
-
-
-
if __name__ ==
'__main__':
-
print(
"启动服务。。。")
-
s = socketserver.ThreadingTCPServer(KEY_IP_PORT, MyServer)
# 多线程
-
# s = socketserver.ForkingTCPServer(KEY_IP_PORT, MyServer) # 多进程
-
s.serve_forever()
-
print(
"关闭服务。。。")
客户端
-
from socket
import *
-
-
KEY_IP_PORT = (
"127.0.0.1",
8000)
-
KEY_BACKLOG =
3
-
KEY_UTF8 =
"utf-8"
-
KEY_GBK =
"gbk"
-
KEY_BUFFER_SIZE =
1024
-
-
tcp_client = socket(AF_INET, SOCK_STREAM)
-
tcp_client.connect(KEY_IP_PORT)
-
-
while
True:
-
# 发送
-
msg = input(
">>>").strip()
-
if
not msg:
break
-
if msg ==
"quit":
-
break
-
tcp_client.send(msg.encode(KEY_UTF8))
-
# 接收
-
recv_msg = tcp_client.recv(KEY_BUFFER_SIZE)
-
print(recv_msg.decode(KEY_UTF8))
-
tcp_client.close()
测试结果:
运行服务端,同时运行两个client,分别发送 "client1", "client2"到服务端,服务端能同时处理两个客户端的链接请求,实现并发。
十四、上传下载文件
服务端
-
import os
-
import socketserver
-
import struct
-
import json
-
-
KEY_IP_PORT = (
"127.0.0.1",
8000)
-
KEY_UTF8 =
"utf-8"
-
KEY_BACKLOG =
5
-
KEY_BUFFER_SIZE =
1024
-
-
-
class MyHandle(socketserver.BaseRequestHandler):
-
-
SERVER_DIR =
"upload_files"
-
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
-
-
def handle(self):
-
# 接收自定义头长度
-
head_length_bytes = self.request.recv(
4)
-
head_length = struct.unpack(
"i", head_length_bytes)
-
# 接收自定义头并解析
-
head_bytes = self.request.recv(head_length[
0])
-
head_json = head_bytes.decode(KEY_UTF8)
-
print(head_json)
-
head_dic = json.loads(head_json)
-
-
# 判断是否支持该操作
-
cmd = head_dic[
"cmd"]
-
if hasattr(self, cmd):
-
func = getattr(self, cmd)
-
func(head_dic)
-
-
def put(self, head_dic):
-
upload_dir = os.path.normpath(os.path.join(self.BASE_DIR, self.SERVER_DIR))
-
if
not os.path.exists(upload_dir):
-
os.mkdir(upload_dir)
-
print(
"创建文件夹:{}".format(upload_dir))
-
# 获取文件名并拼接全路径
-
filename = head_dic[
'filename']
-
filename = os.path.normpath(os.path.join(self.BASE_DIR,
-
self.SERVER_DIR,
-
filename))
-
# 获取文件大小并保存文件
-
filesize = head_dic[
"filesize"]
-
recv_size =
0
-
with open(filename,
'wb')
as f:
-
while filesize > recv_size:
-
if filesize - recv_size > KEY_BUFFER_SIZE:
-
size = KEY_BUFFER_SIZE
-
else:
-
size = filesize - recv_size
-
recv_msg = self.request.recv(size)
-
# recv_size += size
-
recv_size += len(recv_msg)
-
f.write(recv_msg)
-
print(
"recv_size = {}, realsize = {}".format(recv_size, len(recv_msg)))
-
-
-
if __name__ ==
'__main__':
-
print(
"启动FTP服务。。。")
-
s = socketserver.ThreadingTCPServer(KEY_IP_PORT, MyHandle)
-
s.serve_forever()
客户端
-
import os
-
import socket
-
import struct
-
import json
-
-
KEY_IP_PORT = (
"127.0.0.1",
8000)
-
KEY_UTF8 =
"utf-8"
-
KEY_BACKLOG =
5
-
KEY_BUFFER_SIZE =
1024
-
-
class FTPClient:
-
def __init__(self, address, connet=True):
-
self.server_address = address
-
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-
if connet:
-
try:
-
self.client_connet()
-
except Exception
as e:
-
print(
"链接服务出现异常,ftp_client 关闭")
-
self.client_close()
-
raise
-
-
def client_connet(self):
-
self.socket.connect(self.server_address)
-
-
def client_close(self):
-
self.socket.close()
-
-
def run(self):
-
while
True:
-
try:
-
inp = input(
">>>").strip()
-
if
not inp:
break
-
if inp ==
"quit":
break
-
args_list = inp.split()
-
cmd = args_list[
0]
-
if hasattr(self, cmd):
-
func = getattr(self, cmd)
-
func(args_list)
-
else:
-
print(
"不支持操作:{}".format(inp))
-
except Exception
as e:
-
print(
"客户端异常退出")
-
raise
-
break
-
-
def put(self, args):
-
# 校验输入参数是否正确
-
if len(args) <
2:
-
print(
"参数长度不争取:{}".format(str(args)))
-
return
-
filename = args[
1]
-
if
not os.path.isfile(filename):
-
print(
"文件 {} 不存在".format(filename))
-
return
-
filesize = os.path.getsize(filename)
-
-
# 拼接报文头
-
head_dic = {
-
"cmd": args[
0],
-
"filename": os.path.basename(filename),
-
"filesize": filesize}
-
head_json_str = json.dumps(head_dic)
-
print(
"发送的报文头: {}".format(head_json_str))
-
head_bytes = head_json_str.encode(KEY_UTF8)
-
head_len_bytes = struct.pack(
"i", len(head_bytes))
-
-
#发送数据
-
self.socket.send(head_len_bytes)
-
self.socket.send(head_bytes)
-
send_size =
0
-
with open(filename,
'rb')
as f:
-
for line
in f:
-
self.socket.send(line)
-
send_size += len(line)
-
print(
"已经发送大小:{}".format(send_size))
-
else:
-
print(
"文件发送成功")
-
-
-
if __name__ ==
'__main__':
-
print(
"启动客户端。。。")
-
client = FTPClient(KEY_IP_PORT)
-
client.run()
测试遇到问题:
测试的时候出现报错:ConnectionAbortedError: [WinError 10053] 你的主机中的软件中止了一个已建立的连接。
最后发现是服务端接收的时候计算接收长度没有按照实际的接收,而是使用 buffer_size 进行添加,
将服务端中代码 recv_size += size 改为 recv_size += len(recv_msg) 解决
断点续传
十四、练习
开发一个支持多用户在线的 FTP 程序:
1、用户加密认证
2、允许同时多用户登陆
3、每个用户都有自己的家目录,且只能访问自己的家目录
4、对用户进行磁盘配额,每个用户的可用空间不同
5、允许用户在 ftp server 上随意切换目录
6、允许用户查看当前目录下文件
7、允许上传和下载文件,保证文件一致性
8、文件传输过程中显示进度条
9、给服务端添加日志记录
10、附加功能:支持文件的断点续传
代码:我实现的版本
参考
https://www.cnblogs.com/linhaifeng/articles/6129246.html
https://www.cnblogs.com/qishui/p/5428938.html
转载:https://blog.csdn.net/qq_30346413/article/details/116889817