一:什么是生产者和消费者?
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。
二:生产者消费者模式的工作机制
1、通过容器来解决生产者和消费者的强耦合问题。
生产者和消费者彼此之间不直接通讯,而是通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不直接找生产者要数据,而是从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力,解耦了生产者和消费者。
2、体现了面向对象的设计理念:低耦合
这就相当于去包子店吃包子,你要5个包子,老板把5个人包子放在一个盘子中再给你,这个盘子就是一个缓冲区。
3、生产者消费者模式的核心是“阻塞队列”也称消息队列。
三:用生产者与消费者模式爬取王者荣耀壁纸
链接:
https://pvp.qq.com/web201605/wallpaper.shtml
特别详细的URL:
http://apps.game.qq.com/cgi-bin/ams/module/ishow/V1.0/query/workList_inc.cgi?activityId=2735&sVerifyCode=ABCD&sDataType=JSON&iListNum=20&totalpage=0&page=10&iOrder=0&iSortNumClose=1&jsoncallback=jQuery17106927574791770883_1525742053044&iAMSActivityId=51991&everyRead=true&iTypeId=2&iFlowId=267733&iActId=2735&iModuleId=2735&=1525742856493
链接有点长,直接看参数表
这个参数也是很好懂,要不同的页面就给page传入不同的数字就行,0 就是第一页。
'''
基于生产消费者实现王者荣耀壁纸下载
version:01
author:jasn
date:2020-05-02
'''
import threading
import urllib
import os, random, re, queue
import requests
# 请求头
headers = {
'user-agent': '/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.130 Safari/537.36',
'referer': 'https://pvp.qq.com/web201605/wallpaper.shtml'}
# 代理ip
all_proxies = [
{'http': '183.166.20.179:9999'}, {'http': '125.108.124.168:9000'},
{'http': '182.92.113.148:8118'}, {'http': '163.204.243.51:9999'},
{'http': '175.42.158.45:9999'}] # 需要自行去找一些免费的代理,参考我其他博客案例
# 生产者
class Producer(threading.Thread):
def __init__(self, page_queue, image_queue, *args, **kwargs):
super(Producer, self).__init__(*args, **kwargs)
self.page_queue = page_queue
self.image_queue = image_queue
self.comp = re.compile('[^A-^a-z^0-9^\u4e00-\u9fa5]') # 匹配中英文
def run(self) -> None:
while not self.page_queue.empty():
try:
page_url = self.page_queue.get()
resp = requests.get(page_url, headers=headers,proxies=random.choice(all_proxies))
resp.raise_for_status() # 主动抛出一个异常
datas = resp.json()['List']
for data in datas:
name = requests.utils.unquote(data['sProdName']).strip()
image_name = self.comp.sub('', name) # 文件名清洗,替换特殊字符
image_path = os.path.join('.\image',image_name) # 拼接存储地址
if not os.path.exists(image_path): # 判断文件夹是否存在,不存在则创建
os.makedirs(image_path)
image_urls = extract_images(data) #获取图片url
for index, image_url in enumerate(image_urls):
self.image_queue.put(
{"image_url": image_url, "image_path": os.path.join(image_path, "%d.jpg" % (index + 1))})
except Exception as e:
print('错误:{}'.format(e))
continue
# 消费者,将王者获取的链接和名称保存到本地
class Consumer(threading.Thread):
def __init__(self, image_queue, *args, **kwargs):
super(Consumer, self).__init__(*args, **kwargs)
self.image_queue = image_queue
def run(self) -> None:
while True:
try:
image_dict = self.image_queue.get(timeout=10)
image_url = image_dict.get("image_url")
image_path = image_dict.get("image_path")
try:
urllib.request.urlretrieve(image_url, image_path)
print(image_path + "下载完成!")
except:
print(image_path + "下载失败!")
except:
break
# 提取图片url
def extract_images(data):
image_urls = []
for i in range(1, 9):
image_url = requests.utils.unquote(data["sProdImgNo_{}".format(i)]).replace('200', '0') # url解码
image_urls.append(image_url)
return image_urls
def main(pages):
page_queue = queue.Queue(22)
image_queue = queue.Queue(1000)
for x in range(0, pages):
page_url = "https://apps.game.qq.com/cgi-bin/ams/module/ishow/V1.0/query/workList_inc.cgi?activityId=2735&sVerifyCode=ABCD&sDataType=JSON&iListNum=20&totalpage=0&page={page}&iOrder=0&iSortNumClose=1&iAMSActivityId=51991&_everyRead=true&iTypeId=2&iFlowId=267733&iActId=2735&iModuleId=2735&_=1554457680964".format(page=x)
page_queue.put(page_url) # 将url加入队列
for x in range(5):
th = Producer(page_queue, image_queue, name="生产者%d号" % x)
th.start()
for x in range(5):
th = Consumer(image_queue, name="消费者%d号" % x)
th.start()
if __name__ == '__main__':
main(22) # 页数最多22页```
转载:https://blog.csdn.net/weixin_42444693/article/details/105893175