目录
2.2 Windows环境下nc的配置(因为windows不支持nc,linux用户可以直接跳过这一步)
2.3 测试工具可用性(开启两个cmd窗口,可以实现nc窗口与telnet窗口互相通信即可)
3.1 Socket编程-asyncore模块基础模板的使用
一、原理概述及结果展示
1.1 计算机网络基础知识
Socket:在计算机通信领域,socket 被翻译为“套接字”,它是计算机之间进行通信的一种约定或一种方式。通过 socket 这种约定,一台计算机可以接收其他计算机的数据,也可以向其他计算机发送数据。
nc命令:nc是netcat的简写,有着网络界的瑞士军刀美誉。因为它短小精悍、功能实用,被设计为一个简单、可靠的网络工具。
(1)实现任意TCP/UDP端口的侦听,nc可以作为server以TCP或UDP方式侦听指定端口
(2)端口的扫描,nc可以作为client发起TCP或UDP连接
(3)机器之间传输文件
(4)机器之间网络测速
telnet命令:telnet就是查看某个端口是否可访问。我们在搞开发的时候,经常要用的端口就是 8080。那么你可以启动服务器,用telnet 去查看这个端口是否可用。
1.2 结果展示
后期可以自行尝试存入数据库,以:进行正则即可
二、开发环境
2.1 基础环境及相关文档
软件环境:Windows10 + Pycharm2019
requirements.txt内容:(先创建同名文档,然后直接终端使用pip install -r requirements.txt)
-
certifi==2020.4.5.1
-
wincertstore==0.2
斗鱼官方API文档(暂时未针对个人开发者开放):弹幕相关API
项目本质:使用Python作为telnet客户端,与斗鱼服务器进行通信。
2.2 Windows环境下nc的配置(因为windows不支持nc,linux用户可以直接跳过这一步)
下载netcat:netcat下载地址
配置环境变量(将解压后的文件中的exe文件加入Path路径)
开启telnet服务(Windows默认关闭)
2.3 测试工具可用性(开启两个cmd窗口,可以实现nc窗口与telnet窗口互相通信即可)
三、斗鱼弹幕抓取文档及源码
3.1 Socket编程-asyncore模块基础模板的使用
-
# socket.py
-
#!/usr/bin/env python
-
-
# -*- encoding: utf-8 -*-
-
-
'''
-
@Author : {Jack Zhao}
-
-
@Time : 2020/5/12 9:52
-
-
@Contact : {zc_dlmu@163.com}
-
-
@Desc : Socket客户端的标准实现
-
'''
-
-
import asyncore
-
import sys
-
-
# 定义类,并继承自 asyncore.dispatcher
-
class SocketClient(asyncore.dispatcher):
-
# 实现类中的回调函数代码
-
def __init__(self,host,port):
-
# 调用父类方法
-
asyncore.dispatcher.__init__(self)
-
# 创建Socket对象
-
self.create_socket()
-
# 链接服务器
-
address = (host,port)
-
self.connect(address)
-
-
def handle_connect(self):
-
'''
-
连接成功回调该函数
-
:return:
-
'''
-
print(
'连接成功')
-
-
def writable(self):
-
'''
-
描述是否有数据要被发送到服务器
-
:return: True表示可写,不实现默认返回True,为True时调用handle.write函数
-
'''
-
return
False
-
-
def handle_write(self):
-
'''
-
writable函数返回值为True时触发,编写send方法发送数据
-
:return:
-
'''
-
# 内部实现对服务器发送数据的代码
-
# send方法发送数据,参数是字节数据
-
self.send(
'hello world\n'.encode(
'utf-8'))
-
-
def readable(self):
-
'''
-
描述是否有数据要从服务器中被读取
-
:return: True表示可读,不实现默认返回True,为True时调用handle.read函数
-
'''
-
return
True
-
-
-
def handle_read(self):
-
'''
-
readable为True时触发,实际接收,通常编写recv方法
-
:return:
-
'''
-
# 主动接收数据,参数是需要接收数据的长度
-
# 返回数据是字节数据
-
result = self.recv(
1024)
-
print(result)
-
-
-
def handle_error(self):
-
'''运行出错回调函数'''
-
# t错误类型,e错误说明,trace错误追踪
-
t,e,trace = sys.exc_info()
-
# print(t,e,trace)
-
self.close()
-
-
def handle_close(self):
-
'''连接关闭时触发'''
-
print(
'连接关闭')
-
self.close()
-
-
# 创建对象,并且执行asyncore.loop进入运行循环
-
-
if __name__ ==
'__main__':
-
client = SocketClient(
'127.0.0.1',
9000)
-
-
# 开始进行运行循环,5s触发一次
-
asyncore.loop(timeout=
5)
3.2 斗鱼弹幕抓取实战
3.2.1 重要概念理解
数据发送接收流程
均为先发送数据包长度,再发送数据包本身;
数据包相关计算
心跳机制:斗鱼为了防止客户端假死,需要每45s向服务器端发送一次心跳
弹幕分组:为了防止客户端弹幕流量过载而对其进行分组(新版API文档未找到“海量分组”),不清楚是不是默认海量分组。
3.2.2 弹幕抓取实战代码
-
#!/usr/bin/env python
-
-
# -*- encoding: utf-8 -*-
-
-
'''
-
@Author : {Jack Zhao}
-
-
@Time : 2020/5/12 10:42
-
-
@Contact : {zc_dlmu@163.com}
-
-
@Desc : 实现斗鱼弹幕抓取
-
'''
-
-
import asyncore
-
import sys
-
import threading
-
import time
-
import hashlib
-
from queue
import Queue
-
-
DATA_PACKET_TYPE_SEND =
689
# 发送数据包的消息类型
-
DATA_PACKET_TYPE_RECV =
690
# 接收数据包的消息类型
-
-
-
def encode_content(content):
-
'''
-
序列化弹幕内容函数
-
:param content:需要序列化的内容
-
:return:
-
'''
-
if isinstance(content,str):
-
return content.replace(
r'@',
r'@A').replace(
r'/',
r'@S')
-
elif isinstance(content,dict):
-
return
r'/'.join([
'{}@={}'.format(encode_content(k),encode_content(v))
for k,v
in content.items()])+
r'/'
-
elif isinstance(content,list):
-
return
r'/'.join([encode_content(data)
for data
in content]) +
r'/'
-
return
""
-
-
-
def decode_to_str(content):
-
'''
-
反序列化
-
:param content:字符串数据
-
:return:
-
'''
-
if isinstance(content,str):
-
return content.replace(
r'@S',
r'/').replace(
r'@A',
r'@')
-
return
''
-
-
def decode_to_dict(content):
-
'''反序列化字典数据'''
-
ret_dict = dict()
-
if isinstance(content,str):
-
item_strings = content.split(
r'/')
-
for item_string
in item_strings:
-
k_v_list = item_string.split(
'@=')
-
if k_v_list
is
not
None
and len(k_v_list)>
1:
-
k = k_v_list[
0]
-
v = k_v_list[
1]
-
ret_dict[decode_to_str(k)] = decode_to_str(v)
-
return ret_dict
-
-
def decode_to_list(content):
-
'''反序列列表数据'''
-
ret_list = []
-
-
if isinstance(content,str):
-
# 排除最后一个空/
-
items = content.split(
r'/')
-
for idx,item
in enumerate(items):
-
if idx<len(items)
-1:
-
ret_list.append(decode_to_str(item))
-
return ret_list
-
-
class DataPacket():
-
'''
-
数据包封装
-
'''
-
def __init__(self,type=DATA_PACKET_TYPE_SEND,content="",data_bytes = None):
-
# 数据包的类型,数据内容,加密字段,保留字段
-
if data_bytes
is
None:
-
self.type = type
-
self.content = content
-
self.encrypt_flag =
0
-
self.preserve_flag =
0
-
else:
-
# 根据消息传递中的消息类型进行定义
-
self.type = int.from_bytes(data_bytes[
4:
6],byteorder=
'little',signed=
False)
-
self.encrypt_flag = int.from_bytes(data_bytes[
6:
7],byteorder=
'little',signed=
False)
-
self.preserve_flag = int.from_bytes(data_bytes[
7:
8],byteorder=
'little',signed=
False)
-
# 去掉最后一位\0
-
self.content = str(data_bytes[
8:
-1],encoding=
'utf-8')
-
-
-
-
def get_length(self):
-
'''
-
获取当前数据包的长度(与消息请求中的一致),为以后需要发送数据包做准备
-
:return:
-
'''
-
# 汇总+\0的长度
-
return
4+
2+
1+
1+len(self.content.encode(
'utf-8'))+
1
-
-
def get_bytes(self):
-
'''将数据包转换为二进制数据'''
-
data = bytes()
-
# 构建4个字节的消息长度数据
-
data_packet_length = self.get_length()
-
# 把一个整型数据转换成二进制数据
-
# 第一个参数表示需要转换的二进制数据占几个字节
-
# 第二个参数描述字节序
-
# 第三个参数设置是否有符号
-
# 处理消息长度
-
data += data_packet_length.to_bytes(
4,byteorder=
'little',signed=
False)
-
# 处理消息类型
-
data += self.type.to_bytes(
2, byteorder=
'little', signed=
False)
-
# 处理加密字段
-
data += self.encrypt_flag.to_bytes(
1, byteorder=
'little', signed=
False)
-
# 处理保留字段
-
data += self.preserve_flag.to_bytes(
1, byteorder=
'little', signed=
False)
-
# 处理数据内容
-
data += self.content.encode(
'utf-8')
-
# 添加\0数据
-
data +=
b'\0'
-
return data
-
-
-
class DouyuClient(asyncore.dispatcher):
-
def __init__(self,host,port,callback=None):
-
# 构建发送数据包的队列容器
-
# 存放数据包对象
-
self.send_queue = Queue()
-
# 接收数据包对象
-
self.recv_queue = Queue()
-
asyncore.dispatcher.__init__(self)
-
self.create_socket()
-
address = (host,port)
-
self.connect(address)
-
self.callback = callback
# 外部传入的自定义回调函数
-
-
-
# 构建一个专门处理接收数据包容器中数据包的线程
-
self.callback_thread = threading.Thread(target=self.do_callback)
-
# 设置守护进程,当进程结束时让该线程结束
-
self.callback_thread.setDaemon(
True)
-
self.callback_thread.start()
-
-
# 构建心跳线程
-
self.heart_thread =threading.Thread(target=self.do_ping)
-
self.heart_thread.setDaemon(
True)
-
self.ping_running =
False
-
-
def handle_connect(self):
-
print(
'连接成功')
-
self.start_ping()
# 链接成功则开启心跳
-
-
def writable(self):
-
# 有数据为True
-
return self.send_queue.qsize() >
0
-
-
def handle_write(self):
-
'''先发送长度,后发送数据,且均为二进制'''
-
# 从发送数据包队列中获取数据包对象
-
dp = self.send_queue.get()
-
# 获取数据包长度,并发送给服务器
-
dp_length = dp.get_length()
-
dp_length_data = dp_length.to_bytes(
4,byteorder=
'little',signed=
False)
-
self.send(dp_length_data)
-
# 发送数据包二进制数据
-
self.send(dp.get_bytes())
-
-
def readable(self):
-
return
True
-
-
def handle_read(self):
-
# 读取数据包长度,二进制数据
-
data_length_data = self.recv(
4)
-
# 通过二进制获取length具体数据
-
data_length = int.from_bytes(data_length_data,byteorder=
'little',signed=
False)
-
# 通过数据包长度获取数据
-
data = self.recv(data_length)
-
# 通过二进制数据构建数据包对象
-
dp = DataPacket(data_bytes=data)
-
# 把数据包放入接收数据包容器中
-
self.recv_queue.put(dp)
-
-
def handle_error(self):
-
t,e,trace = sys.exc_info()
-
print(e)
-
self.close()
-
-
def handle_close(self):
-
# 程序关闭时结束心跳
-
self.stop_ping()
-
print(
'连接关闭')
-
self.close()
-
-
def login_room_id(self,room_id=1,aid=1,token=1,secret=3,timenow=round(time.time())):
-
'''实现登录函数'''
-
# 构建登录数据包
-
# 斗鱼修改了api,仅针对企业用户开放,所以可以写死
-
# aid = 1 # yourapplicaitonID
-
# token = 2 # 请求http://openapi.douyu.com/api/thirdPart/token,其中data值为token
-
# timenow = round(time.time())
-
# secret = 3 # 开发者密钥
-
# 保存传入的部分信息以便弹幕分组使用
-
self.room_id = room_id
-
self.token = token
-
self.time =timenow
-
-
# 检验码生成
-
auth = hashlib.md5((
'{}_{}_{}_{}'.format(secret,aid,time,token)).encode(
'utf-8')).hexdigest()
-
self.auth = auth
-
#content = "type@=loginreq/roomid@={}/aid@={}/token@={}/time={}/auth={}/".format(room_id,aid,token,timenow,auth)
-
send_data = {
-
"type":
"loginreq",
-
"roomid": str(room_id),
-
"aid": str(aid),
-
"token":str(token),
-
"time":str(timenow),
-
"auth":str(auth)
-
}
-
content = encode_content(send_data)
-
-
login_dp = DataPacket(DATA_PACKET_TYPE_SEND,content)
-
# 将数据包添加到发送数据包容器中
-
self.send_queue.put(login_dp)
-
-
def join_room_group(self):
-
'''加入弹幕分组,防止用户端过载'''
-
pass
-
send_data = {
-
"type":
"joingroup",
-
"rid":str(self.room_id),
-
"token":str(self.token),
-
"time":str(self.time),
-
"auth":str(self.auth)
-
}
-
content = encode_content(send_data)
-
dp = DataPacket(type=DATA_PACKET_TYPE_SEND,content = content)
-
self.send_queue.put(dp)
-
-
-
-
def send_heart_data_packet(self):
-
'''心跳机制,防止客户端假死,每45s向后台发送数据即可'''
-
send_data = {
-
"type":
"mrkl"
-
}
-
content = encode_content(send_data)
-
dp = DataPacket(type=DATA_PACKET_TYPE_SEND,content=content)
-
self.send_queue.put(dp)
-
-
def start_ping(self):
-
'''开启心跳'''
-
self.ping_running =
True
-
-
def stop_ping(self):
-
'''结束心跳'''
-
self.ping_running =
False
-
-
def do_ping(self):
-
'''执行心跳'''
-
while
True:
-
if self.ping_running:
-
self.send_heart_data_packet()
-
time.sleep(
40)
-
-
def do_callback(self):
-
'''
-
专门负责处理接收数据包容器中的数据
-
:return:
-
'''
-
# 从接收容器中获取数据包
-
while
True:
-
dp = self.recv_queue.get()
-
# 处理数据
-
if self.callback
is
not
None:
-
self.callback(self,dp)
-
self.recv_queue.task_done()
-
-
def data_callback(client,dp):
-
'''
-
自定义回调函数
-
:param dp: 数据包对象
-
:return:
-
'''
-
resp_data = decode_to_dict(dp.content)
-
# print(resp_data)
-
if resp_data[
"type"] ==
"loginres":
-
# 调用加入分组请求
-
print(
"登录成功",resp_data)
-
client.join_room_group()
-
elif resp_data[
"type"] ==
"chatmsg":
-
print(
"{}:{}".format(resp_data[
'nn'],resp_data[
'txt']))
-
elif resp_data[
"type"] ==
"al":
-
print(
"主播离开啦!")
-
elif resp_data[
"type"]==
"ab":
-
print(
"主播回来啦!")
-
-
-
if __name__ ==
'__main__':
-
# 斗鱼第三方接入协议,https://open.douyu.com/source/api/63
-
client = DouyuClient(
'openapi-danmu.douyu.com',
80,callback=data_callback)
-
# 这里需要输入相关的所有信息!!!# room_id aid token secret
-
# room_id 房间号
-
# aid 你这次任务的applicaitonID
-
# token 登录令牌 请求http://openapi.douyu.com/api/thirdPart/token,其中data值为token
-
# secret = 开发者密钥
-
client.login_room_id(room_id=
'',aid=
'',token=
''.secret=
'')
-
asyncore.loop(timeout=
10)
-
# data = ['1,3,5']
-
# data1 = 'a@=@Ax/b@=y/'
-
# data2 = '1,3,5/'
-
# print(decode_to_list(data2))
-
pass
转载:https://blog.csdn.net/weixin_40539952/article/details/106078624