目录
1.前言
大家好 我是每天走在刑的第一线的政胤
今天教大家获取阿里巴巴的列表页商品信息包含,商品title,商品主图片并且需要存入xls文件保存 我是政胤 制作不易点个免费的关注吧
2、解决方案
首先给出的方案是:
2.1、通过wxPython框架写出一个可视化界面,
2.2、因为阿里巴巴防爬比较严重,所以我直接通过selenium进行用户超过来跳过反扒机制
2.3、编写浏览器池方便实现多线程爬取数据
2.4、编写爬数据业务逻辑
3、现在开始上代码实现
3.1 首先初始先一个浏览器池子
-
from multiprocessing
import Manager
-
from time
import sleep
-
-
from tool.open_browser
import open_browser
-
-
-
class
DriverPool:
-
def
__init__(
self, max_nums,driver_path,ui,open_headless=0):
-
self.ui = ui
-
self.drivers = {}
-
self.manager = Manager()
-
self.queue = self.manager.Queue()
-
self.max_nums = max_nums
-
self.open_headless = open_headless
-
self.CreateDriver(driver_path)
-
-
def
CreateDriver(
self,driver_path):
-
'''
-
初始化浏览器池
-
:return
-
'''
-
for name
in
range(
1, self.max_nums +
1):
-
name =
f'driver_{name}'
-
d = open_browser(excute_path=driver_path,open_headless=self.open_headless)
-
d.ui = self.ui
-
self.drivers[name] = d
-
self.queue.put(name)
-
-
def
getDriver(
self):
-
'''
-
获取一个浏览器
-
:return driver
-
'''
-
if self.queue.empty():
-
sleep(
1)
-
return self.getDriver()
-
name = self.queue.get()
-
driver = self.drivers[name]
-
driver.pool_name_driver = name
-
return driver
-
-
def
putDriver(
self, name):
-
'''
-
归还一个浏览器
-
:param name:
-
:return:
-
'''
-
self.queue.put(name)
-
-
def
quit(
self):
-
'''
-
关闭浏览器,执行结束操作
-
:return:
-
'''
-
if self.drivers:
-
for driver
in self.drivers.values():
-
try:
-
driver.quit()
-
except:
-
pass
3.2 编写UI操作界面
-
def
intUIRun(
self):
-
'''
-
初始化UI主界面
-
:return:
-
'''
-
pannel = wx.Panel(self.panel_run)
-
pannel.Sizer = wx.BoxSizer(wx.VERTICAL)
-
self.text = wx.StaticText(pannel, -
1,
'状态栏目:', size=(
100,
40), pos=(
0,
10))
-
self.text_input = wx.StaticText(pannel, -
1,
'', size=(
900,
40), pos=(
100,
0))
-
-
wx.StaticText(pannel, -
1,
'当前执行ID:', size=(
100,
30), pos=(
0,
65)).SetFont(self.font)
-
self.text_time = wx.TextCtrl(pannel,
id=self.choices_id_ref, value=self.time_str, size=(
300,
30), pos=(
150,
60),
-
style=wx.TE_AUTO_URL | wx.TE_MULTILINE)
-
self.reflush_text_time = wx.Button(pannel, -
1,
'刷新ID', size=(
100,
50), pos=(
480,
50))
-
self.text_time.SetFont(self.font)
-
self.reflush_text_time.SetForegroundColour(wx.RED)
-
self.reflush_text_time.SetFont(self.font)
-
# self.text_time.SetForegroundColour(wx.RED)
-
-
self.text_input.SetBackgroundColour(wx.WHITE)
-
self.text_input.SetLabel(self.in_text)
-
self.text_input.SetFont(self.font)
-
self.text.SetFont(self.font)
-
wx.Button(pannel, self.get_product,
'获取商品保存本地', size=(
200,
100), pos=(
0,
100)).SetFont(self.font)
-
wx.Button(pannel, self.save_mysql,
'保存数据库和OSS', size=(
200,
100), pos=(
200,
100)).SetFont(self.font)
-
wx.Button(pannel, self.end_process,
'结束执行', size=(
200,
100), pos=(
400,
100)).SetFont(self.font)
-
self.log_text = wx.TextCtrl(pannel, size=(
1000,
500), pos=(
0,
210), style=wx.TE_MULTILINE | wx.TE_READONLY)
-
-
wx.LogTextCtrl(self.log_text)
-
self.Bind(wx.EVT_BUTTON, self.get_product_p,
id=self.get_product)
-
self.Bind(wx.EVT_BUTTON, self.save_mysql_p,
id=self.save_mysql)
-
self.Bind(wx.EVT_BUTTON, self.end_process_p,
id=self.end_process)
-
self.text_time.Bind(wx.EVT_COMMAND_LEFT_CLICK, self.choices_id,
id=self.choices_id_ref)
-
self.reflush_text_time.Bind(wx.EVT_BUTTON, self.reflush_time_evt)
-
self.panel_run.Sizer.Add(pannel, flag=wx.ALL | wx.EXPAND, proportion=
1)
效果图
3.3编写业务逻辑
获取商品列表页数据
-
global _getMainProduct, goods_info
-
def
_getMainProduct(
data_url):
-
'''
-
多线程获取每一页链接
-
:param data_url:
-
:return:
-
'''
-
self, url, driver_pool = data_url
-
c = Common(driver_pool.getDriver())
-
goods_urls = []
-
try:
-
self.ui.
print(
f'当前获取第{url}页数据')
-
c.d.get(url)
-
c.wait_page_loaded(url)
-
if self.is_load_cache_cookies:
-
self.load_cookies(c.d)
-
c.d.get(url)
-
c.wait_page_loaded(url)
-
ele = c.find_element(By.CSS_SELECTOR,
'[class="component-product-list"]')
-
goods_urls = ele.find_elements(By.CSS_SELECTOR,
'a[class="product-image"]')
-
goods_urls = [goods_url.get_attribute(
'href')
for goods_url
in goods_urls]
-
except SystemExit:
-
sys.exit(
1)
-
except:
-
self.
print(
f'请求页面超出范围: {url} ERROR: {traceback.format_exc()}')
-
if c.find_element_true(By.CSS_SELECTOR,
'[class="no-data common"]'):
-
return goods_urls
-
finally:
-
name = c.d.pool_name_driver
-
driver_pool.putDriver(name)
-
self.queue_print.put(
f'请求完成:{url}')
-
return goods_urls
-
-
def
getMainProduct_(
self):
-
g_dict =
globals()
-
urls = []
-
sum_l = self.pageNums[
1] +
1
-
complate =
0
-
products = []
-
for i
in
range(self.pageNums[
0], sum_l):
-
if self.ui.is_exit_process:
-
exit()
-
url = self.url.
format(i)
-
urls.append([self, url, self.drive_pool])
-
if urls:
-
p = self.pool.map_async(_getMainProduct, urls)
-
while
not p.ready():
-
if
not self.queue_print.empty():
-
complate +=
1
-
self.
print(self.queue_print.get(),
f'完成:{complate}/{sum_l - 1}')
-
products = p.get()
-
goods_info =
set()
-
for xx
in products:
-
for x
in xx:
-
if x:
-
goods_info.add(x)
-
self.goods_info = goods_info
-
return goods_info
-
goods_info = getMainProduct_(self)
获取详情页数据
-
global goods,Common,driver_pool,goods_url,sleep,re,By
-
def
get_info_(
self, data_info):
-
'''
-
多线程获取详情页数据
-
:param self:
-
:param data_info:
-
:return:
-
'''
-
if self.ui.is_exit_process:
-
exit()
-
goods_url, driver_pool = data_info
-
c = Common(driver_pool.getDriver())
-
try:
-
c.d.get(goods_url)
-
sleep(
3)
-
if self.is_load_cache_cookies:
-
self.load_cookies(c.d)
-
c.d.get(goods_url)
-
c.wait_page_loaded(goods_url)
-
for x
in
range(
400,
18000,
200):
-
sleep(
0.1)
-
c.d.execute_script(
f'document.documentElement.scrollTop={x};')
-
is_all = c.find_element_true(By.CSS_SELECTOR,
'[id="J-rich-text-description"]')
# 'J-rich-text-description'
-
if
not is_all:
-
self.
print(
f'没有发现: {is_all}')
-
is_video = c.find_elements_true(By.CSS_SELECTOR,
'[class="bc-video-player"]>video')
-
is_title = c.find_element_true(By.CSS_SELECTOR,
'[class="module-pdp-title"]')
-
is_description = c.find_element_true(By.CSS_SELECTOR,
'[name="description"]')
-
is_keywords = c.find_element_true(By.CSS_SELECTOR,
'[name="keywords"]')
-
is_overview = c.find_element_true(By.CSS_SELECTOR,
'[class="do-overview"]')
-
is_wz_goods_cat_id = c.find_element_true(By.CSS_SELECTOR,
'[class="detail-subscribe"]')
-
wz_goods_cat_id = self.wz_goods_cat_id
-
# if is_wz_goods_cat_id:
-
# wz_goods_cat_id = is_wz_goods_cat_id.find_elements(By.CSS_SELECTOR, '[class="breadcrumb-item"]>a')[
-
# -1].get_attribute('href')
-
# wz_goods_cat_id = re.search(r'(\d+)', wz_goods_cat_id).group(1)
-
# goods_id = re.search(r'(\d+)\.html$', goods_url)
-
goods_id = re.search(
r'(ssssss\d+)\.html$', goods_url)
-
goods = {
-
"商品分类ID":
int(wz_goods_cat_id)
if wz_goods_cat_id
else
0,
-
"商品ID": goods_id.group(
1)
if goods_id
else self.getMd5(
f'{time.time()}')+
'其他',
-
"商品链接": goods_url,
-
"描述": c.find_element(By.CSS_SELECTOR,
'[name="description"]').get_attribute(
-
'content')
if is_description
else
'',
-
"标题": is_title.get_attribute(
'title')
if is_title
else
'',
-
"关键字": c.find_element(By.CSS_SELECTOR,
'[name="keywords"]').get_attribute(
-
'content')
if is_keywords
else is_keywords,
-
"视频连接": c.find_element(By.CSS_SELECTOR,
'[class="bc-video-player"]>video').get_attribute(
-
'src')
if is_video
else
'',
-
"主图片": [],
-
"商品详情": c.d.execute_script(
-
'''return document.querySelectorAll('[class="do-overview"]')[0].outerHTML;''')
if is_overview
else is_overview,
-
"商品描述":
'',
-
"商品描述图片": []
-
}
-
-
# 获取商品描述图片
-
goods_desc = getDescriptionFactory1(self, c, goods_url)
-
goods.update(goods_desc)
-
-
# 获取主图片
-
m_imgs = c.find_elements(By.CSS_SELECTOR,
'[class="main-image-thumb-ul"]>li')
-
for m_img
in m_imgs:
-
try:
-
img = m_img.find_element(By.CSS_SELECTOR,
'[class="J-slider-cover-item"]').get_attribute(
'src')
-
s = re.search(
'(\d+x\d+)', img)
-
img2 =
None
-
if s:
-
img2 =
str(img).replace(s.group(
1),
'')
-
goods[
'主图片'].append(img)
-
if img2:
-
goods[
'主图片'].append(img2)
-
except:
-
pass
-
self.ui.status[
'请求成功商品数量'] +=
1
-
return goods
-
except:
-
traceback.print_exc()
-
self.
print(
f'=========================\n链接请求错误: {goods_url} \n {traceback.format_exc()}\n=========================')
-
self.error_page.append([goods_url, traceback.format_exc()])
-
self.ui.status[
'请求失败商品数量'] +=
1
-
finally:
-
name = c.d.pool_name_driver
-
driver_pool.putDriver(name)
-
self.queue_print.put(
f'请求完成:{goods_url}')
-
goods = get_info_(self,data_info)
写入excel
-
def
export_excel(
self, results):
-
'''
-
写入excel方法
-
:param results:
-
:return:
-
'''
-
now_dir_str = self.now
-
now_file_str = time.strftime(
'%Y_%m_%d__%H_%M_%S', time.localtime())
-
img_path = os.path.join(
'data',
'xls', now_dir_str)
-
if
not os.path.exists(img_path):
-
os.mkdir(img_path)
-
img_path = os.path.join(
'data',
'xls', now_dir_str, self.url_id)
-
if
not os.path.exists(img_path):
-
os.mkdir(img_path)
-
if
not os.path.exists(img_path):
-
os.mkdir(img_path)
-
img_path = os.path.join(img_path,
f"{now_file_str}.xlsx")
-
workbook = xlsxwriter.Workbook(img_path)
-
sheet = workbook.add_worksheet(name=
'阿里巴巴信息')
-
titles =
list(results[
0].keys())
-
for i, title
in
enumerate(titles):
-
sheet.write_string(
0, i, title)
-
for row, result
in
enumerate(results):
-
row = row +
1
-
col =
0
-
for value
in result.values():
-
sheet.write_string(row, col,
str(value))
-
col +=
1
-
-
workbook.close()
4、最后总结:
由于通用selenium执行浏览器操作没有接口请求效率高,所以在最后使用了多线程在执行效率上也做了一些提升。
我是政胤 期待你的关注
转载:https://blog.csdn.net/m0_69043821/article/details/125491644
查看评论