Python爬虫系列之某辉超市小程序商品数据爬取
技术交流请点击这里
微信请扫描下方二维码
代码仅供学习交流,请勿用于非法用途
直接上代码
# -*- coding:utf-8 -*-
import requests
import json
from queue import Queue
import time
import os
from sign import getSign
import xlrd
import xlwt
from xlutils.copy import copy
import threading
retry = 3
timeout = 30
lat = "40.695483661585205"
lng = "117.20431182115476"
wechatunionid = ""
accessToken = ""
headers = {
"Content-Type": "application/json",
"User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/7.0.17(0x17001126) NetType/WIFI Language/zh_CN",
"Referer": "https://servicewechat.com/wxc9cf7c95499ee604/450/page-frame.html",
"X-YH-Biz-Params": "lat=40.695483661585205&lng=117.20431182115476&appid=wxc9cf7c95499ee604&cityid=2&shopid=9541&sellerid=3",
}
threadNums = 1
excelTitle = ["一级类目", "二级类目", "商品名称", "售价", "市场价", "产地", "规格", "保质期", "库存", "描述", "轮播图", "详情图"]
exceptFs = ["乳品烘培"]
unexceptSec = ["全部", "成人奶粉", "吐司面包", "蛋糕烘焙", "传统糕点"]
dataPath = os.getcwd() + "/data/"
imgBasePwd = dataPath + "imgs/"
if not os.path.exists(dataPath):
os.mkdir(dataPath)
if not os.path.exists(imgBasePwd):
os.mkdir(imgBasePwd)
def urldecode(s):
try:
return urllib.parse.unquote(s)
except Exception as e:
pass
return s
def generalUrl(url):
timestamp = str(time.time()).replace(".", "")[: 13]
url += "×tamp=" + str(timestamp)
sign = getSign(url)
return url + "&sign=" + sign
def getHtml(url):
for i in range(retry):
try:
resp = requests.get(url, headers=headers, timeout=timeout)
return json.loads(resp.content.decode("utf-8"))
except Exception as e:
pass
def getCategoryQueue():
categoryQueue = Queue(0)
url = "https://activity.yonghuivip.com/api/app/v4/search/sellercategory?cityid=2&sellerid=3&shopid=9541&lat=" + str(lat) + "&lng=" + str(lng) + "&deviceid=e990d440-9726-4969-84de-83a442814af5&access_token=" + accessToken + "&wechatunionid=" + wechatunionid + "&appid=wxc9cf7c95499ee604&sdk_version=2.13.1&wechat_version=7.0.17&app_version=6.11.3.9&distinctId=468672646018352405&platform=wechatminiprogram&channel=512&v=6.11.3.9"
url = generalUrl(url)
res = getHtml(url)
try:
categorys = res['data']['categorys']
for category in categorys:
try:
if category['categoryname'] in exceptFs:
subcategories = category['subcategory']
for subcategory in subcategories:
try:
if subcategory['categoryname'] not in unexceptSec:
subcategory['pname'] = category['categoryname']
categoryQueue.put(subcategory)
except Exception as e:
pass
except Exception as e:
pass
except Exception as e:
pass
return categoryQueue
class yhSpider(threading.Thread):
def __init__(self, categoryQueue, index, *args, **kwargs):
super(yhSpider, self).__init__(*args, **kwargs)
self.categoryQueue = categoryQueue
self.excelPath = dataPath + "永辉超市_" + str(index) + ".xls"
def initExcel(self):
f = xlwt.Workbook()
sheet1 = f.add_sheet(u'double', cell_overwrite_ok=True)
for i in range(0, len(excelTitle)):
sheet1.write(0, i, excelTitle[i])
f.save(self.excelPath)
def writeExcel(self, data):
print("----------------------------------------")
print(data)
print("----------------------------------------")
try:
workbook = xlrd.open_workbook(self.excelPath)
sheets = workbook.sheet_names()
worksheet = workbook.sheet_by_name(sheets[0])
rows_old = worksheet.nrows
new_workbook = copy(workbook)
new_worksheet = new_workbook.get_sheet(0)
for j in range(0, len(data)):
try:
new_worksheet.write(rows_old, j, str(data[j]))
except Exception as e:
continue
new_workbook.save(self.excelPath)
except Exception as e:
pass
def getTotalPage(self, categoryid):
url = "https://activity.yonghuivip.com/api/app/v4/search/sellersku?cityid=2&pickself=0&page=0&sellerid=3&shopid=9541&lat=" + lat + "&lng=" + lng + "&categoryid=" + str(categoryid) + "&aggregation=1&order=0&ordertype=0&categorylevel=2&abdata=%7B%22category_search_top_abt%22%3A%220%22%7D&deviceid=e990d440-9726-4969-84de-83a442814af5&access_token=" + accessToken + "&wechatunionid=" + wechatunionid + "&appid=wxc9cf7c95499ee604&sdk_version=2.13.1&wechat_version=7.0.17&app_version=6.11.3.9&distinctId=468672646018352405&platform=wechatminiprogram&channel=512&v=6.11.3.9"
url = generalUrl(url)
res = getHtml(url)
try:
return int(res['data']['totalpage'])
except Exception as e:
pass
def getGoodsList(self, categoryid, page):
url = "https://activity.yonghuivip.com/api/app/v4/search/sellersku?cityid=2&pickself=0&page=" + str(page) + "&sellerid=3&shopid=9541&lat=" + lat + "&lng=" + lng + "&categoryid=" + str(categoryid) + "&aggregation=1&order=0&ordertype=0&categorylevel=2&abdata=%7B%22category_search_top_abt%22%3A%220%22%7D&deviceid=e990d440-9726-4969-84de-83a442814af5&access_token=" + accessToken + "&wechatunionid=" + wechatunionid + "&appid=wxc9cf7c95499ee604&sdk_version=2.13.1&wechat_version=7.0.17&app_version=6.11.3.9&distinctId=468672646018352405&platform=wechatminiprogram&channel=512&v=6.11.3.9"
url = generalUrl(url)
res = getHtml(url)
try:
return res['data']['skus']
except Exception as e:
pass
def getAttr(self, places, key):
try:
for place in places:
try:
if place['prompt'] == key:
return place['value']
except Exception as e:
pass
except Exception as e:
pass
return ""
def downLoad(self, url, path):
try:
with open(path, "wb") as f:
for i in range(retry):
try:
img = requests.get(url, headers=headers, timeout=timeout).content
f.write(img)
return True
except Exception as e:
pass
except Exception as e:
pass
return False
def getGoodsDetail(self, code, fscategoryname, seccategoryname, imgpwd):
url = "https://activity.yonghuivip.com/api/app/item/get?code=" + str(code) + "&shopid=9541&selectedshopid=&scene=&deviceid=e990d440-9726-4969-84de-83a442814af5&access_token=" + str(accessToken) + "&wechatunionid=" + wechatunionid + "&appid=wxc9cf7c95499ee604&sdk_version=2.13.1&wechat_version=7.0.17&app_version=6.11.3.9&distinctId=468672646018352405&platform=wechatminiprogram&channel=512&v=6.11.3.9"
url = generalUrl(url)
res = getHtml(url)
try:
datas = res['data']
data = []
try:
data.append(self.getAttr(datas['place'], "保质期"))
except Exception as e:
data.append("")
try:
data.append(str(datas['stock']['count']))
except Exception as e:
data.append("")
try:
data.append(str(datas['skuStatus']['sellerservicedesc']))
except Exception as e:
data.append("")
try:
mainimgList = []
mainimgs = datas['mainimgs']
for iindex, mainimg in enumerate(mainimgs):
try:
imgPath = imgpwd + "轮播图_" + str(iindex) + ".jpg"
if not self.checkFileExists(imgPath):
self.downLoad(mainimg['imgurl'], imgPath)
mainimgList.append(imgPath)
except Exception as e:
pass
data.append(mainimgList)
except Exception as e:
data.append("")
try:
detailimgList = []
detailimgs = datas['picdetail']
except Exception as e:
data.append("")
self.writeExcel(data)
except Exception as e:
pass
def mkdir(self, path):
if not os.path.exists(path):
os.mkdir(path)
def getRightNames(self, s):
return s.replace("\\", "").replace("/", "").replace(":", "").replace("*", "").replace("?", "").replace("\"", "").replace("<", "").replace(">", "").replace("|", "")
def checkFileExists(self, path):
if os.path.exists(path):
return True
return False
def run(self):
self.initExcel()
while True:
if self.categoryQueue.empty():
break
category = self.categoryQueue.get()
fscategoryname = category['pname']
seccategoryname = category['categoryname']
categoryid = category['categoryid']
totalPage = self.getTotalPage(categoryid)
if totalPage:
for page in range(totalPage):
try:
goodsList = self.getGoodsList(categoryid, page)
for goods in goodsList:
try:
title = goods['title']
imgpwd = imgBasePwd + self.getRightNames(title) + "/"
self.getGoodsDetail(id, fscategoryname, seccategoryname, imgpwd)
except Exception as e:
pass
except Exception as e:
pass
def main():
global threadNums
categoryQueue = getCategoryQueue()
categoryLen = categoryQueue.qsize()
threadNums = categoryLen if categoryLen < threadNums else threadNums
for i in range(threadNums):
y = yhSpider(categoryQueue, i)
y.start()
if __name__ == '__main__':
main()
转载:https://blog.csdn.net/qq_41287993/article/details/109144055
查看评论