小言_互联网的博客

Python爬虫实战-官方API怎么用?结合Socket实现斗鱼实时弹幕抓取-2020最新API

406人阅读  评论(0)

目录

 

一、原理概述及结果展示

1.1 计算机网络基础知识

1.2 结果展示

二、开发环境

2.1 基础环境及相关文档

2.2 Windows环境下nc的配置(因为windows不支持nc,linux用户可以直接跳过这一步)

2.3 测试工具可用性(开启两个cmd窗口,可以实现nc窗口与telnet窗口互相通信即可)

三、斗鱼弹幕抓取文档及源码

3.1 Socket编程-asyncore模块基础模板的使用

3.2 斗鱼弹幕抓取实战


一、原理概述及结果展示

1.1 计算机网络基础知识

Socket:在计算机通信领域,socket 被翻译为“套接字”,它是计算机之间进行通信的一种约定或一种方式。通过 socket 这种约定,一台计算机可以接收其他计算机的数据,也可以向其他计算机发送数据。

nc命令:nc是netcat的简写,有着网络界的瑞士军刀美誉。因为它短小精悍、功能实用,被设计为一个简单、可靠的网络工具。

(1)实现任意TCP/UDP端口的侦听,nc可以作为server以TCP或UDP方式侦听指定端口

(2)端口的扫描,nc可以作为client发起TCP或UDP连接

(3)机器之间传输文件

(4)机器之间网络测速      

telnet命令:telnet就是查看某个端口是否可访问。我们在搞开发的时候,经常要用的端口就是 8080。那么你可以启动服务器,用telnet 去查看这个端口是否可用。

1.2 结果展示

后期可以自行尝试存入数据库,以:进行正则即可

二、开发环境

2.1 基础环境及相关文档

软件环境:Windows10 + Pycharm2019

requirements.txt内容:(先创建同名文档,然后直接终端使用pip install -r requirements.txt)


  
  1. certifi==2020.4.5.1
  2. wincertstore==0.2

斗鱼官方API文档(暂时未针对个人开发者开放):弹幕相关API

项目本质:使用Python作为telnet客户端,与斗鱼服务器进行通信。

2.2 Windows环境下nc的配置(因为windows不支持nc,linux用户可以直接跳过这一步)

下载netcat:netcat下载地址

配置环境变量(将解压后的文件中的exe文件加入Path路径)

开启telnet服务(Windows默认关闭)

2.3 测试工具可用性(开启两个cmd窗口,可以实现nc窗口与telnet窗口互相通信即可)

三、斗鱼弹幕抓取文档及源码

3.1 Socket编程-asyncore模块基础模板的使用


  
  1. # socket.py
  2. #!/usr/bin/env python
  3. # -*- encoding: utf-8 -*-
  4. '''
  5. @Author : {Jack Zhao}
  6. @Time : 2020/5/12 9:52
  7. @Contact : {zc_dlmu@163.com}
  8. @Desc : Socket客户端的标准实现
  9. '''
  10. import asyncore
  11. import sys
  12. # 定义类,并继承自 asyncore.dispatcher
  13. class SocketClient(asyncore.dispatcher):
  14. # 实现类中的回调函数代码
  15. def __init__(self,host,port):
  16. # 调用父类方法
  17. asyncore.dispatcher.__init__(self)
  18. # 创建Socket对象
  19. self.create_socket()
  20. # 链接服务器
  21. address = (host,port)
  22. self.connect(address)
  23. def handle_connect(self):
  24. '''
  25. 连接成功回调该函数
  26. :return:
  27. '''
  28. print( '连接成功')
  29. def writable(self):
  30. '''
  31. 描述是否有数据要被发送到服务器
  32. :return: True表示可写,不实现默认返回True,为True时调用handle.write函数
  33. '''
  34. return False
  35. def handle_write(self):
  36. '''
  37. writable函数返回值为True时触发,编写send方法发送数据
  38. :return:
  39. '''
  40. # 内部实现对服务器发送数据的代码
  41. # send方法发送数据,参数是字节数据
  42. self.send( 'hello world\n'.encode( 'utf-8'))
  43. def readable(self):
  44. '''
  45. 描述是否有数据要从服务器中被读取
  46. :return: True表示可读,不实现默认返回True,为True时调用handle.read函数
  47. '''
  48. return True
  49. def handle_read(self):
  50. '''
  51. readable为True时触发,实际接收,通常编写recv方法
  52. :return:
  53. '''
  54. # 主动接收数据,参数是需要接收数据的长度
  55. # 返回数据是字节数据
  56. result = self.recv( 1024)
  57. print(result)
  58. def handle_error(self):
  59. '''运行出错回调函数'''
  60. # t错误类型,e错误说明,trace错误追踪
  61. t,e,trace = sys.exc_info()
  62. # print(t,e,trace)
  63. self.close()
  64. def handle_close(self):
  65. '''连接关闭时触发'''
  66. print( '连接关闭')
  67. self.close()
  68. # 创建对象,并且执行asyncore.loop进入运行循环
  69. if __name__ == '__main__':
  70. client = SocketClient( '127.0.0.1', 9000)
  71. # 开始进行运行循环,5s触发一次
  72. asyncore.loop(timeout= 5)

3.2 斗鱼弹幕抓取实战

3.2.1 重要概念理解

数据发送接收流程

均为先发送数据包长度,再发送数据包本身;

数据包相关计算

心跳机制:斗鱼为了防止客户端假死,需要每45s向服务器端发送一次心跳

弹幕分组:为了防止客户端弹幕流量过载而对其进行分组(新版API文档未找到“海量分组”),不清楚是不是默认海量分组。

3.2.2 弹幕抓取实战代码


  
  1. #!/usr/bin/env python
  2. # -*- encoding: utf-8 -*-
  3. '''
  4. @Author : {Jack Zhao}
  5. @Time : 2020/5/12 10:42
  6. @Contact : {zc_dlmu@163.com}
  7. @Desc : 实现斗鱼弹幕抓取
  8. '''
  9. import asyncore
  10. import sys
  11. import threading
  12. import time
  13. import hashlib
  14. from queue import Queue
  15. DATA_PACKET_TYPE_SEND = 689 # 发送数据包的消息类型
  16. DATA_PACKET_TYPE_RECV = 690 # 接收数据包的消息类型
  17. def encode_content(content):
  18. '''
  19. 序列化弹幕内容函数
  20. :param content:需要序列化的内容
  21. :return:
  22. '''
  23. if isinstance(content,str):
  24. return content.replace( r'@', r'@A').replace( r'/', r'@S')
  25. elif isinstance(content,dict):
  26. return r'/'.join([ '{}@={}'.format(encode_content(k),encode_content(v)) for k,v in content.items()])+ r'/'
  27. elif isinstance(content,list):
  28. return r'/'.join([encode_content(data) for data in content]) + r'/'
  29. return ""
  30. def decode_to_str(content):
  31. '''
  32. 反序列化
  33. :param content:字符串数据
  34. :return:
  35. '''
  36. if isinstance(content,str):
  37. return content.replace( r'@S', r'/').replace( r'@A', r'@')
  38. return ''
  39. def decode_to_dict(content):
  40. '''反序列化字典数据'''
  41. ret_dict = dict()
  42. if isinstance(content,str):
  43. item_strings = content.split( r'/')
  44. for item_string in item_strings:
  45. k_v_list = item_string.split( '@=')
  46. if k_v_list is not None and len(k_v_list)> 1:
  47. k = k_v_list[ 0]
  48. v = k_v_list[ 1]
  49. ret_dict[decode_to_str(k)] = decode_to_str(v)
  50. return ret_dict
  51. def decode_to_list(content):
  52. '''反序列列表数据'''
  53. ret_list = []
  54. if isinstance(content,str):
  55. # 排除最后一个空/
  56. items = content.split( r'/')
  57. for idx,item in enumerate(items):
  58. if idx<len(items) -1:
  59. ret_list.append(decode_to_str(item))
  60. return ret_list
  61. class DataPacket():
  62. '''
  63. 数据包封装
  64. '''
  65. def __init__(self,type=DATA_PACKET_TYPE_SEND,content="",data_bytes = None):
  66. # 数据包的类型,数据内容,加密字段,保留字段
  67. if data_bytes is None:
  68. self.type = type
  69. self.content = content
  70. self.encrypt_flag = 0
  71. self.preserve_flag = 0
  72. else:
  73. # 根据消息传递中的消息类型进行定义
  74. self.type = int.from_bytes(data_bytes[ 4: 6],byteorder= 'little',signed= False)
  75. self.encrypt_flag = int.from_bytes(data_bytes[ 6: 7],byteorder= 'little',signed= False)
  76. self.preserve_flag = int.from_bytes(data_bytes[ 7: 8],byteorder= 'little',signed= False)
  77. # 去掉最后一位\0
  78. self.content = str(data_bytes[ 8: -1],encoding= 'utf-8')
  79. def get_length(self):
  80. '''
  81. 获取当前数据包的长度(与消息请求中的一致),为以后需要发送数据包做准备
  82. :return:
  83. '''
  84. # 汇总+\0的长度
  85. return 4+ 2+ 1+ 1+len(self.content.encode( 'utf-8'))+ 1
  86. def get_bytes(self):
  87. '''将数据包转换为二进制数据'''
  88. data = bytes()
  89. # 构建4个字节的消息长度数据
  90. data_packet_length = self.get_length()
  91. # 把一个整型数据转换成二进制数据
  92. # 第一个参数表示需要转换的二进制数据占几个字节
  93. # 第二个参数描述字节序
  94. # 第三个参数设置是否有符号
  95. # 处理消息长度
  96. data += data_packet_length.to_bytes( 4,byteorder= 'little',signed= False)
  97. # 处理消息类型
  98. data += self.type.to_bytes( 2, byteorder= 'little', signed= False)
  99. # 处理加密字段
  100. data += self.encrypt_flag.to_bytes( 1, byteorder= 'little', signed= False)
  101. # 处理保留字段
  102. data += self.preserve_flag.to_bytes( 1, byteorder= 'little', signed= False)
  103. # 处理数据内容
  104. data += self.content.encode( 'utf-8')
  105. # 添加\0数据
  106. data += b'\0'
  107. return data
  108. class DouyuClient(asyncore.dispatcher):
  109. def __init__(self,host,port,callback=None):
  110. # 构建发送数据包的队列容器
  111. # 存放数据包对象
  112. self.send_queue = Queue()
  113. # 接收数据包对象
  114. self.recv_queue = Queue()
  115. asyncore.dispatcher.__init__(self)
  116. self.create_socket()
  117. address = (host,port)
  118. self.connect(address)
  119. self.callback = callback # 外部传入的自定义回调函数
  120. # 构建一个专门处理接收数据包容器中数据包的线程
  121. self.callback_thread = threading.Thread(target=self.do_callback)
  122. # 设置守护进程,当进程结束时让该线程结束
  123. self.callback_thread.setDaemon( True)
  124. self.callback_thread.start()
  125. # 构建心跳线程
  126. self.heart_thread =threading.Thread(target=self.do_ping)
  127. self.heart_thread.setDaemon( True)
  128. self.ping_running = False
  129. def handle_connect(self):
  130. print( '连接成功')
  131. self.start_ping() # 链接成功则开启心跳
  132. def writable(self):
  133. # 有数据为True
  134. return self.send_queue.qsize() > 0
  135. def handle_write(self):
  136. '''先发送长度,后发送数据,且均为二进制'''
  137. # 从发送数据包队列中获取数据包对象
  138. dp = self.send_queue.get()
  139. # 获取数据包长度,并发送给服务器
  140. dp_length = dp.get_length()
  141. dp_length_data = dp_length.to_bytes( 4,byteorder= 'little',signed= False)
  142. self.send(dp_length_data)
  143. # 发送数据包二进制数据
  144. self.send(dp.get_bytes())
  145. def readable(self):
  146. return True
  147. def handle_read(self):
  148. # 读取数据包长度,二进制数据
  149. data_length_data = self.recv( 4)
  150. # 通过二进制获取length具体数据
  151. data_length = int.from_bytes(data_length_data,byteorder= 'little',signed= False)
  152. # 通过数据包长度获取数据
  153. data = self.recv(data_length)
  154. # 通过二进制数据构建数据包对象
  155. dp = DataPacket(data_bytes=data)
  156. # 把数据包放入接收数据包容器中
  157. self.recv_queue.put(dp)
  158. def handle_error(self):
  159. t,e,trace = sys.exc_info()
  160. print(e)
  161. self.close()
  162. def handle_close(self):
  163. # 程序关闭时结束心跳
  164. self.stop_ping()
  165. print( '连接关闭')
  166. self.close()
  167. def login_room_id(self,room_id=1,aid=1,token=1,secret=3,timenow=round(time.time())):
  168. '''实现登录函数'''
  169. # 构建登录数据包
  170. # 斗鱼修改了api,仅针对企业用户开放,所以可以写死
  171. # aid = 1 # yourapplicaitonID
  172. # token = 2 # 请求http://openapi.douyu.com/api/thirdPart/token,其中data值为token
  173. # timenow = round(time.time())
  174. # secret = 3 # 开发者密钥
  175. # 保存传入的部分信息以便弹幕分组使用
  176. self.room_id = room_id
  177. self.token = token
  178. self.time =timenow
  179. # 检验码生成
  180. auth = hashlib.md5(( '{}_{}_{}_{}'.format(secret,aid,time,token)).encode( 'utf-8')).hexdigest()
  181. self.auth = auth
  182. #content = "type@=loginreq/roomid@={}/aid@={}/token@={}/time={}/auth={}/".format(room_id,aid,token,timenow,auth)
  183. send_data = {
  184. "type": "loginreq",
  185. "roomid": str(room_id),
  186. "aid": str(aid),
  187. "token":str(token),
  188. "time":str(timenow),
  189. "auth":str(auth)
  190. }
  191. content = encode_content(send_data)
  192. login_dp = DataPacket(DATA_PACKET_TYPE_SEND,content)
  193. # 将数据包添加到发送数据包容器中
  194. self.send_queue.put(login_dp)
  195. def join_room_group(self):
  196. '''加入弹幕分组,防止用户端过载'''
  197. pass
  198. send_data = {
  199. "type": "joingroup",
  200. "rid":str(self.room_id),
  201. "token":str(self.token),
  202. "time":str(self.time),
  203. "auth":str(self.auth)
  204. }
  205. content = encode_content(send_data)
  206. dp = DataPacket(type=DATA_PACKET_TYPE_SEND,content = content)
  207. self.send_queue.put(dp)
  208. def send_heart_data_packet(self):
  209. '''心跳机制,防止客户端假死,每45s向后台发送数据即可'''
  210. send_data = {
  211. "type": "mrkl"
  212. }
  213. content = encode_content(send_data)
  214. dp = DataPacket(type=DATA_PACKET_TYPE_SEND,content=content)
  215. self.send_queue.put(dp)
  216. def start_ping(self):
  217. '''开启心跳'''
  218. self.ping_running = True
  219. def stop_ping(self):
  220. '''结束心跳'''
  221. self.ping_running = False
  222. def do_ping(self):
  223. '''执行心跳'''
  224. while True:
  225. if self.ping_running:
  226. self.send_heart_data_packet()
  227. time.sleep( 40)
  228. def do_callback(self):
  229. '''
  230. 专门负责处理接收数据包容器中的数据
  231. :return:
  232. '''
  233. # 从接收容器中获取数据包
  234. while True:
  235. dp = self.recv_queue.get()
  236. # 处理数据
  237. if self.callback is not None:
  238. self.callback(self,dp)
  239. self.recv_queue.task_done()
  240. def data_callback(client,dp):
  241. '''
  242. 自定义回调函数
  243. :param dp: 数据包对象
  244. :return:
  245. '''
  246. resp_data = decode_to_dict(dp.content)
  247. # print(resp_data)
  248. if resp_data[ "type"] == "loginres":
  249. # 调用加入分组请求
  250. print( "登录成功",resp_data)
  251. client.join_room_group()
  252. elif resp_data[ "type"] == "chatmsg":
  253. print( "{}:{}".format(resp_data[ 'nn'],resp_data[ 'txt']))
  254. elif resp_data[ "type"] == "al":
  255. print( "主播离开啦!")
  256. elif resp_data[ "type"]== "ab":
  257. print( "主播回来啦!")
  258. if __name__ == '__main__':
  259. # 斗鱼第三方接入协议,https://open.douyu.com/source/api/63
  260. client = DouyuClient( 'openapi-danmu.douyu.com', 80,callback=data_callback)
  261. # 这里需要输入相关的所有信息!!!# room_id aid token secret
  262. # room_id 房间号
  263. # aid 你这次任务的applicaitonID
  264. # token 登录令牌 请求http://openapi.douyu.com/api/thirdPart/token,其中data值为token
  265. # secret = 开发者密钥
  266. client.login_room_id(room_id= '',aid= '',token= ''.secret= '')
  267. asyncore.loop(timeout= 10)
  268. # data = ['1,3,5']
  269. # data1 = 'a@=@Ax/b@=y/'
  270. # data2 = '1,3,5/'
  271. # print(decode_to_list(data2))
  272. pass

 


转载:https://blog.csdn.net/weixin_40539952/article/details/106078624
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场