小言_互联网的博客

Python爬虫Request轮子工具

3161人阅读  评论(0)

SuperSpider

== 万字长文,建议使用目录点击查阅,有助于高效开发。建议点赞收藏 ==

Request抓取思路步骤

1】先确定是否为动态加载网站
【2】找URL规律
【3】正则表达式 | xpath表达式
【4】定义程序框架,补全并测试代码
  • 细节要点:查看页面编码charset、请求时是否需要验证verify=FALSE

多级页面数据抓取思路

1】整体思路
    1.1> 爬取一级页面,提取 所需数据+链接,继续跟进
    1.2> 爬取二级页面,提取 所需数据+链接,继续跟进
    1.3> ... ...2】代码实现思路
    2.1> 避免重复代码 - 请求、解析需定义函数

UserAgent反爬处理

1】基于User-Agent反爬
	1.1) 发送请求携带请求头: headers={
   'User-Agent' : 'Mozilla/5.0 xxxxxx'}
	1.2) 多个请求时随机切换User-Agent
        a) 定义py文件存放大量User-Agent,导入后使用random.choice()每次随机选择
        b) 使用fake_useragent模块每次访问随机生成User-Agent
           from fake_useragent import UserAgent
           agent = UserAgent().random
   细节要点:pycharm中下载fake-useragent
        
【2】响应内容存在特殊字符
	解码时使用ignore参数
    html = requests.get(url=url, headers=headers).content.decode('', 'ignore')

Cookie反爬

Cookie参数使用

  1. cookies参数的形式:字典

    cookies = {"cookie的name":"cookie的value"}

    • 该字典对应请求头中Cookie字符串,以分号、空格分割每一对字典键值对
    • 等号左边的是一个cookie的name,对应cookies字典的key
    • 等号右边对应cookies字典的value
  2. cookies参数的使用方法

    response = requests.get(url, cookies)

  3. 将cookie字符串转换为cookies参数所需的字典:

    cookies_dict = {cookie.split('=')[0]:cookie.split('=')[-1] for cookie in cookies_str.split('; ')}

  4. 注意:cookie一般是有过期时间的,一旦过期需要重新获取

CookieJar对象转换为Cookies字典

使用requests获取的resposne对象,具有cookies属性。该属性值是一个cookieJar类型,包含了对方服务器设置在本地的cookie。我们如何将其转换为cookies字典呢?

  1. 转换方法

    cookies_dict = requests.utils.dict_from_cookiejar(response.cookies)

  2. 其中response.cookies返回的就是cookieJar类型的对象

  3. requests.utils.dict_from_cookiejar函数返回cookies字典

requests模块参数总结

【1】方法一 : requests.get()
【2】参数
   2.1) url
   2.2) headers
   2.3) timeout
   2.4) proxies

【3】方法二 :requests.post()
【4】参数
    data

requests.get()

  • 思路

    【1】url
    【2】proxies -> {}
         proxies = {
            'http':'http://1.1.1.1:8888',
    	    'https':'https://1.1.1.1:8888'
         }
    【3】timeout
    【4】headers
    【5】cookies
    

requests.post()

  • 适用场景

    1】适用场景 : Post类型请求的网站
    
    【2】参数 : data={
         }
       2.1) Form表单数据: 字典
       2.2) res = requests.post(url=url,data=data,headers=headers)3】POST请求特点 : Form表单提交数据
        
        data : 字典,Form表单数据
    
  • pycharm中正则处理headers和formdata

    1】pycharm进入方法 :Ctrl + r ,选中 Regex
    【2】处理headers和formdata
        (.*): (.*)
        "$1": "$2",3】点击 Replace All
    
  • 经典Demo有道翻译

request.session()

  • requests模块中的Session类能够自动处理发送请求获取响应过程中产生的cookie,进而达到状态保持的目的。接下来我们就来学习它

作用与应用场景

  • requests.session的作用
    • 自动处理cookie,即 下一次请求会带上前一次的cookie
  • requests.session的应用场景
    • 自动处理连续的多次请求过程中产生的cookie

使用方法

session实例在请求了一个网站后,对方服务器设置在本地的cookie会保存在session中,下一次再使用session请求对方服务器的时候,会带上前一次的cookie

session = requests.session() # 实例化session对象
response = session.get(url, headers, ...)
response = session.post(url, data, ...)
  • session对象发送get或post请求的参数,与requests模块发送请求的参数完全一致

response

response.text 和response.content的区别:

  • response.text
    • 类型:str
    • 解码类型: requests模块自动根据HTTP 头部对响应的编码作出有根据的推测,推测的文本编码
  • response.content
    • 类型:bytes
    • 解码类型: 没有指定

动态加载数据抓取-Ajax

  • 特点

    1】右键 -> 查看网页源码中没有具体数据
    【2】滚动鼠标滑轮或其他动作时加载,或者页面局部刷新
    
  • 抓取

    1】F12打开控制台,页面动作抓取网络数据包
    【2】抓取json文件URL地址
       2.1) 控制台中 XHR :异步加载的数据包
       2.2) XHR -> QueryStringParameters(查询参数)
    
  • 经典Demo:豆瓣电影

json解析模块

json.loads(json)

1】作用 : 把json格式的字符串转为Python数据类型

【2】示例 : html = json.loads(res.text)

json.dump(python,f,ensure_ascii=False)

1】作用
   把python数据类型 转为 json格式的字符串,一般让你把抓取的数据保存为json文件时使用

【2】参数说明
   2.1)1个参数: python类型的数据(字典,列表等)
   2.2)2个参数: 文件对象
   2.3)3个参数: ensure_ascii=False 序列化时编码
  
【3】示例代码
    # 示例1
    import json

    item = {
   'name':'QQ','app_id':1}
    with open('小米.json','a') as f:
      json.dump(item,f,ensure_ascii=False)
  
    # 示例2
    import json

    item_list = []
    for i in range(3):
      item = {
   'name':'QQ','id':i}
      item_list.append(item)

    with open('xiaomi.json','a') as f:
        json.dump(item_list,f,ensure_ascii=False)

jsonpath

jsonpath使用示例

book_dict = { 
  "store": {
    "book": [ 
      { "category": "reference",
        "author": "Nigel Rees",
        "title": "Sayings of the Century",
        "price": 8.95
      },
      { "category": "fiction",
        "author": "Evelyn Waugh",
        "title": "Sword of Honour",
        "price": 12.99
      },
      { "category": "fiction",
        "author": "Herman Melville",
        "title": "Moby Dick",
        "isbn": "0-553-21311-3",
        "price": 8.99
      },
      { "category": "fiction",
        "author": "J. R. R. Tolkien",
        "title": "The Lord of the Rings",
        "isbn": "0-395-19395-8",
        "price": 22.99
      }
    ],
    "bicycle": {
      "color": "red",
      "price": 19.95
    }
  }
}

from jsonpath import jsonpath

print(jsonpath(book_dict, '$..author')) # 如果取不到将返回False # 返回列表,如果取不到将返回False

json模块总结

# 爬虫最常用1】数据抓取 - json.loads(html)
    将响应内容由: json 转为 python
【2】数据保存 - json.dump(item_list,f,ensure_ascii=False)
    将抓取的数据保存到本地 json文件

# 抓取数据一般处理方式1】txt文件
【2】csv文件
【3】json文件
【4】MySQL数据库
【5】MongoDB数据库
【6】Redis数据库

控制台抓包

  • 打开方式及常用选项

    1】打开浏览器,F12打开控制台,找到Network选项卡
    
    【2】控制台常用选项
       2.1) Network: 抓取网络数据包
         a> ALL: 抓取所有的网络数据包
         b> XHR:抓取异步加载的网络数据包
         c> JS : 抓取所有的JS文件
       2.2) Sources: 格式化输出并打断点调试JavaScript代码,助于分析爬虫中一些参数
       2.3) Console: 交互模式,可对JavaScript中的代码进行测试
        
    【3】抓取具体网络数据包后
       3.1) 单击左侧网络数据包地址,进入数据包详情,查看右侧
       3.2) 右侧:
         a> Headers: 整个请求信息
            General、Response Headers、Request Headers、Query String、Form Data
         b> Preview: 对响应内容进行预览
         c> Response:响应内容
    

代理设置

定义及分类

代理ip的匿名程度,代理IP可以分为下面三类:
    1.透明代理(Transparent Proxy):透明代理虽然可以直接“隐藏”你的IP地址,但是还是可以查到你是谁。
    2.匿名代理(Anonymous Proxy):使用匿名代理,别人只能知道你用了代理,无法知道你是谁。
    3.高匿代理(Elite proxy或High Anonymity Proxy):高匿代理让别人根本无法发现你是在用代理,所以是最好的选择。
    
代理服务请求使用的协议可以分为:
	1.http代理:目标url为http协议
	2.https代理:目标url为https协议
	3.socks隧道代理(例如socks5代理)等:
      socks 代理只是简单地传递数据包,不关心是何种应用协议(FTP、HTTP和HTTPS等)。
      socks 代理比http、https代理耗时少。
      socks 代理可以转发http和https的请求

普通代理思路

1】获取代理IP网站
   西刺代理、快代理、全网代理、代理精灵、阿布云、芝麻代理... ...2】参数类型
   proxies = {
    '协议':'协议://IP:端口号' }
   proxies = {
   
    	'http':'http://IP:端口号',
    	'https':'https://IP:端口号',
   }

普通代理

# 使用免费普通代理IP访问测试网站: http://httpbin.org/get
import requests

url = 'http://httpbin.org/get'
headers = {
   'User-Agent':'Mozilla/5.0'}
# 定义代理,在代理IP网站中查找免费代理IP
proxies = {
   
    'http':'http://112.85.164.220:9999',
    'https':'https://112.85.164.220:9999'
}
html = requests.get(url,proxies=proxies,headers=headers,timeout=5).text
print(html)

私密代理+独享代理

1】语法结构
   proxies = {
    '协议':'协议://用户名:密码@IP:端口号' }2】示例
   proxies = {
   
	  'http':'http://用户名:密码@IP:端口号',
      'https':'https://用户名:密码@IP:端口号',
   }

私密代理+独享代理 - 示例代码

import requests
url = 'http://httpbin.org/get'
proxies = {
   
    'http': 'http://309435365:szayclhp@106.75.71.140:16816',
    'https':'https://309435365:szayclhp@106.75.71.140:16816',
}
headers = {
   
    'User-Agent' : 'Mozilla/5.0',
}

html = requests.get(url,proxies=proxies,headers=headers,timeout=5).text
print(html)

建立自己的代理IP池 - 开放代理 | 私密代理

"""
收费代理:
    建立开放代理的代理IP池
思路:
    1、获取到开放代理
    2、依次对每个代理IP进行测试,能用的保存到文件中
"""
import requests

class ProxyPool:
    def __init__(self):
        self.url = '代理网站的API链接'
        self.headers = {
   'User-Agent':'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.122 Safari/537.36'}
        # 打开文件,用来存放可用的代理IP
        self.f = open('proxy.txt', 'w')

    def get_html(self):
        html = requests.get(url=self.url, headers=self.headers).text
        proxy_list = html.split('\r\n')
        for proxy in proxy_list:
            # 依次测试每个代理IP是否可用
            if self.check_proxy(proxy):
                self.f.write(proxy + '\n')

    def check_proxy(self, proxy):
        """测试1个代理IP是否可用,可用返回True,否则返回False"""
        test_url = 'http://httpbin.org/get'
        proxies = {
   
            'http' : 'http://{}'.format(proxy),
            'https': 'https://{}'.format(proxy)
        }
        try:
            res = requests.get(url=test_url, proxies=proxies, headers=self.headers, timeout=2)
            if res.status_code == 200:
                print(proxy,'\033[31m可用\033[0m')
                return True
            else:
                print(proxy,'无效')
                return False
        except:
            print(proxy,'无效')
            return False

    def run(self):
        self.get_html()
        # 关闭文件
        self.f.close()

if __name__ == '__main__':
    spider = ProxyPool()
    spider.run()

拉勾网阿布云代理

import json
import re
import time
import requests
import multiprocessing
from job_data_analysis.lagou_spider.handle_insert_data import lagou_mysql


class HandleLaGou(object):
    def __init__(self):
        #使用session保存cookies信息
        self.lagou_session = requests.session()
        self.header = {
   
            'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.86 Safari/537.36'
        }
        self.city_list = ""

    #获取全国所有城市列表的方法
    def handle_city(self):
        city_search = re.compile(r'www\.lagou\.com\/.*\/">(.*?)</a>')
        city_url = "https://www.lagou.com/jobs/allCity.html"
        city_result = self.handle_request(method="GET",url=city_url)
        #使用正则表达式获取城市列表
        self.city_list = set(city_search.findall(city_result))
        self.lagou_session.cookies.clear()

    def handle_city_job(self,city):
        first_request_url = "https://www.lagou.com/jobs/list_python?city=%s&cl=false&fromSearch=true&labelWords=&suginput="%city
        first_response = self.handle_request(method="GET",url=first_request_url)
        total_page_search = re.compile(r'class="span\stotalNum">(\d+)</span>')
        try:
            total_page = total_page_search.search(first_response).group(1)
            print(city,total_page)
        #由于没有岗位信息造成的exception
        except:
            return
        else:
            for i in range(1,int(total_page)+1):
                data = {
   
                    "pn":i,
                    "kd":"web"
                }
                page_url = "https://www.lagou.com/jobs/positionAjax.json?city=%s&needAddtionalResult=false"%city
                referer_url = "https://www.lagou.com/jobs/list_python?city=%s&cl=false&fromSearch=true&labelWords=&suginput="%city
                #referer的URL需要进行encode
                self.header['Referer'] = referer_url.encode()
                response = self.handle_request(method="POST",url=page_url,data=data,info=city)
                lagou_data = json.loads(response)
                job_list = lagou_data['content']['positionResult']['result']
                for job in job_list:
                    lagou_mysql.insert_item(job)



    def handle_request(self,method,url,data=None,info=None):
        while True:
            #加入阿布云的动态代理
            proxyinfo = "http://%s:%s@%s:%s" % ('账号', '密码', 'http-dyn.abuyun.com', '9020')
            proxy = {
   
                "http":proxyinfo,
                "https":proxyinfo
            }
            try:
                if method == "GET":
                    response = self.lagou_session.get(url=url,headers=self.header,proxies=proxy,timeout=6)
                elif method == "POST":
                    response = self.lagou_session.post(url=url,headers=self.header,data=data,proxies=proxy,timeout=6)
            except:
                # 需要先清除cookies信息
                self.lagou_session.cookies.clear()
                # 重新获取cookies信息
                first_request_url = "https://www.lagou.com/jobs/list_python?city=%s&cl=false&fromSearch=true&labelWords=&suginput=" % info
                self.handle_request(method="GET", url=first_request_url)
                time.sleep(10)
                continue
            response.encoding = 'utf-8'
            if '频繁' in response.text:
                print(response.text)
                #需要先清除cookies信息
                self.lagou_session.cookies.clear()
                # 重新获取cookies信息
                first_request_url = "https://www.lagou.com/jobs/list_python?city=%s&cl=false&fromSearch=true&labelWords=&suginput="%info
                self.handle_request(method="GET",url=first_request_url)
                time.sleep(10)
                continue
            return response.text

if __name__ == '__main__':
    lagou = HandleLaGou()
    #所有城市的方法
    lagou.handle_city()
    print(lagou.city_list)
    #引入多进程加速抓取
    pool = multiprocessing.Pool(2)
    for city in lagou.city_list:
        pool.apply_async(lagou.handle_city_job,args=(city,))
    pool.close()
    pool.join()

unicode与encode

  • unicode: 作用是将unicode编码转换成其他编码的字符串,将所有语言统一到unicode中 [如果内容是英文,unicode编码比ASCII多一倍的存储空间,同时传输需要多一倍的传输]

  • encode : 作用是将unicode编码转换成其他编码的字符串

  • decode:作用是将其他编码的字符串转换成unicode编码,需要指明原来的编码格式

    • 编码换:

      s = ("我爱中国,i love china")
      s.decode("gb2312").encode("utf-8")
      
    • 查看当前编码

      import sys
      sys.getdefaultencoding()
      
  • python3中用unicode进行编码,因此能直接用用unicode来编码:s.encode(“utf-8”)

  • 通常情况下window是GB2312编码

  • linux通常情况下是utf8的编码

解析模块总结

re正则解析

import re 
pattern = re.compile('正则表达式',re.S)
r_list = pattern.findall(html)

lxml+xpath解析

from lxml import etree
p = etree.HTML(res.text)
r_list = p.xpath('xpath表达式')

【谨记】只要调用了xpath,得到的结果一定为'列表'

xpath表达式

  • 匹配规则

    1】结果: 节点对象列表
       1.1) xpath示例: //div、//div[@class="student"]//div/a[@title="stu"]/span
    
    【2】结果: 字符串列表
       2.1) xpath表达式中末尾为: @src、@href、/text()
    
  • 最常用

    1】基准xpath表达式: 得到节点对象列表
    【2for r in [节点对象列表]:
           username = r.xpath('./xxxxxx')
    
    【注意】遍历后继续xpath一定要以:  . 开头,代表当前节点
    
  • 豆瓣书籍抓取

    import requests
    from lxml import etree
    from fake_useragent import UserAgent
    import time
    import random
    
    class DoubanBookSpider:
        def __init__(self):
            self.url = 'https://book.douban.com/top250?start={}'
    
        def get_html(self, url):
            """使用随机的User-Agent"""
            headers = {
         'User-Agent':UserAgent().random}
            html = requests.get(url=url, headers=headers).text
            self.parse_html(html)
    
        def parse_html(self, html):
            """lxml+xpath进行数据解析"""
            parse_obj = etree.HTML(html)
            # 1.基准xpath:提取每本书的节点对象列表
            table_list = parse_obj.xpath('//div[@class="indent"]/table')
            for table in table_list:
                item = {
         }
                # 书名
                name_list = table.xpath('.//div[@class="pl2"]/a/@title')
                item['name'] = name_list[0].strip() if name_list else None
                # 描述
                content_list = table.xpath('.//p[@class="pl"]/text()')
                item['content'] = content_list[0].strip() if content_list else None
                # 评分
                score_list = table.xpath('.//span[@class="rating_nums"]/text()')
                item['score'] = score_list[0].strip() if score_list else None
                # 评价人数
                nums_list = table.xpath('.//span[@class="pl"]/text()')
                item['nums'] = nums_list[0][1:-1].strip() if nums_list else None
                # 类别
                type_list = table.xpath('.//span[@class="inq"]/text()')
                item['type'] = type_list[0].strip() if type_list else None
    
                print(item)
    
        def run(self):
            for i in range(5):
                start = (i - 1) * 25
                page_url = self.url.format(start)
                self.get_html(page_url)
                time.sleep(random.randint(1,2))
    
    if __name__ == '__main__':
        spider = DoubanBookSpider()
        spider.run()
    

数据持久化

mysql

"""
sql 表创建

mysql -uroot -p
create database maoyandb charset utf8;
use maoyandb;
create table maoyantab(
name varchar(100),
star varchar(300),
time varchar(100)
)charset=utf8;
"""
"""
    pymysql模块使用
"""
import pymysql

# 1.连接数据库
db = pymysql.connect(
    'localhost','root','123456','maoyandb',charset='utf8'
)
cur = db.cursor()
# 2.执行sql命令
ins = 'insert into maoyantab values(%s,%s,%s)'
cur.execute(ins, ['肖申克的救赎','主演:张国荣,张曼玉,刘德华','2018-06-25'])
# 3.提交到数据库执行
db.commit()

cur.close()
db.close()

Mysql模板

import pymysql

# __init__(self):
	self.db = pymysql.connect('IP',... ...)
	self.cursor = self.db.cursor()
	
# save_html(self,r_list):
	self.cursor.execute('sql',[data1])
    self.cursor.executemany('sql', [(),(),()])
	self.db.commit()
	
# run(self):
	self.cursor.close()
	self.db.close()

mongodb

  • https://www.runoob.com/python3/python-mongodb.html 【菜鸟教程】
"""
【1】非关系型数据库,数据以键值对方式存储,端口27017
【2】MongoDB基于磁盘存储
【3】MongoDB数据类型单一,值为JSON文档,而Redis基于内存,
   3.1> MySQL数据类型:数值类型、字符类型、日期时间类型、枚举类型
   3.2> Redis数据类型:字符串、列表、哈希、集合、有序集合
   3.3> MongoDB数据类型:值为JSON文档
【4】MongoDB: 库 -> 集合 -> 文档
     MySQL  : 库 -> 表  ->  表记录
"""

"""
Linux进入: mongo
>show dbs                  - 查看所有库
>use 库名                   - 切换库
>show collections          - 查看当前库中所有集合
>db.集合名.find().pretty()  - 查看集合中文档
>db.集合名.count()          - 统计文档条数
>db.集合名.drop()           - 删除集合
>db.dropDatabase()         - 删除当前库
"""
import pymongo

# 创建连接对象
connect = pymongo.MongoClient(host='127.0.0.1',port=27017)
# 连接库对象
db = connect['maoyandb']
# 创建集合对象
myset = db['maoyanset']
# 插入单条数据
# myset.insert_one({'name':'钱天一'})
# 插入多条数据
myset.insert_many[{'name':'张三'},{'name':'李四'},{'name':'王五'}]

mongodb模板

import requests
import re
import time
import random
import pymongo

class MaoyanSpider:
    def __init__(self):
        self.url = 'https://maoyan.com/board/4?offset={}'
        self.headers = {
   
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.113 Safari/537.36'}
        # 三个对象:连接对象、库对象、库集合
        self.conn = pymongo.MongoClient('127.0.0.1',27017)
        self.db = self.conn['maoyandb']
        self.myset = self.db['maoyanset']

    ##提取网页数据
    def get_html(self,url):
        html = requests.get(url,url,headers=self.headers).text
        self.parse_html(html)

    ##解析网页数据
    def parse_html(self,html):
        regex =  '<div class="movie-item-info">.*?title="(.*?)".*?<p class="star">(.*?)</p>.*?<p class="releasetime">(.*?)</p>'
        pattern = re.compile(regex,re.S)
        r_list = pattern.findall(html)
        self.save_html(r_list)

    ## 数据处理提取
    def save_html(self,r_list):
        for r in r_list:
            item ={
   }
            item['name'] = r[0].strip()
            item['star'] = r[1].strip()
            item['time'] = r[2].strip()
            print(item)
            # 存入到mongodb数据库
            self.myset.insert_one(item)

    def run(self):
        """程序入口函数"""
        for offset in range(0, 91, 10):
            url = self.url.format(offset)
            self.get_html(url=url)
            # 控制数据抓取频率:uniform()生成指定范围内的浮点数
            time.sleep(random.uniform(0, 1))

if __name__ == '__main__':
    spider = MaoyanSpider()
    spider.run()

Json

import json
# Demo1
item = {
   'name':'QQ','e-mail':99362}
with open('qq.json','a')as f:
    json.dump(item,f,ensure_ascii=False)


# Demo2
# import json
# item_list =[]
# for i in range(3):
#     item = {'name':'QQ','e-mail':'99362'}
#     item_list.append(item)
#
# with open('demo2.json','a')as f:
#     json.dump(item_list,f,ensure_ascii=False)

Json模板

import requests
from fake_useragent import UserAgent
import time
import random
import re
import json

# 豆瓣电影全栈抓取
class DoubanSpider:
    def __init__(self):
        self.url = 'https://movie.douban.com/j/chart/top_list?'
        self.i = 0
        # 存入json文件
        self.f = open('douban.json', 'w', encoding='utf-8')
        self.all_film_list = []

    def get_agent(self):
        """获取随机的User-Agent"""
        return UserAgent().random

    def get_html(self, params):
        headers = {
   'User-Agent':self.get_agent()}
        html = requests.get(url=self.url, params=params, headers=headers).text
        # 把json格式的字符串转为python数据类型
        html = json.loads(html)

        self.parse_html(html)

    def parse_html(self, html):
        """解析"""
        # html: [{},{},{},{}]
        item = {
   }
        for one_film in html:
            item['rank'] = one_film['rank']
            item['title'] = one_film['title']
            item['score'] = one_film['score']
            print(item)
            self.all_film_list.append(item)
            self.i += 1

    def run(self):
        # d: {'剧情':'11','爱情':'13','喜剧':'5',...,...}
        d = self.get_d()
        # 1、给用户提示,让用户选择
        menu = ''
        for key in d:
            menu += key + '|'
        print(menu)
        choice = input('请输入电影类别:')
        if choice in d:
            code = d[choice]
            # 2、total: 电影总数
            total = self.get_total(code)
            for start in range(0,total,20):
                params = {
   
                    'type': code,
                    'interval_id': '100:90',
                    'action': '',
                    'start': str(start),
                    'limit': '20'
                }
                self.get_html(params=params)
                time.sleep(random.randint(1,2))

            # 把数据存入json文件
            json.dump(self.all_film_list, self.f, ensure_ascii=False)
            self.f.close()
            print('数量:',self.i)
        else:
            print('请做出正确的选择')

    def get_d(self):
        """{'剧情':'11','爱情':'13','喜剧':'5',...,...}"""
        url = 'https://movie.douban.com/chart'
        html = requests.get(url=url,headers={
   'User-Agent':self.get_agent()}).text
        regex = '<span><a href=".*?type_name=(.*?)&type=(.*?)&interval_id=100:90&action=">'
        pattern = re.compile(regex, re.S)
        # r_list: [('剧情','11'),('喜剧','5'),('爱情':'13')... ...]
        r_list = pattern.findall(html)
        # d: {'剧情': '11', '爱情': '13', '喜剧': '5', ..., ...}
        d = {
   }
        for r in r_list:
            d[r[0]] = r[1]

        return d

    def get_total(self, code):
        """获取某个类别下的电影总数"""
        url = 'https://movie.douban.com/j/chart/top_list_count?type={}&interval_id=100%3A90'.format(code)
        html = requests.get(url=url,headers={
   'User-Agent':self.get_agent()}).text
        html = json.loads(html)

        return html['total']

if __name__ == '__main__':
    spider = DoubanSpider()
    spider.run()

json常用操作

import json
 
data = {
    'name': 'pengjunlee',
    'age': 32,
    'vip': True,
    'address': {'province': 'GuangDong', 'city': 'ShenZhen'}
}
# 将 Python 字典类型转换为 JSON 对象
json_str = json.dumps(data)
print(json_str) # 结果 {"name": "pengjunlee", "age": 32, "vip": true, "address": {"province": "GuangDong", "city": "ShenZhen"}}
 
# 将 JSON 对象类型转换为 Python 字典
user_dic = json.loads(json_str)
print(user_dic['address']) # 结果 {'province': 'GuangDong', 'city': 'ShenZhen'}
 
# 将 Python 字典直接输出到文件
with open('pengjunlee.json', 'w', encoding='utf-8') as f:
    json.dump(user_dic, f, ensure_ascii=False, indent=4)
 
# 将类文件对象中的JSON字符串直接转换成 Python 字典
with open('pengjunlee.json', 'r', encoding='utf-8') as f:
    ret_dic = json.load(f)
    print(type(ret_dic)) # 结果 <class 'dict'>
    print(ret_dic['name']) # 结果 pengjunlee

json存储列表

import json

all_app_list = [
    {'name':'QQ'},
    {'name':'王者荣耀'},
    {'name':'和平精英'}
]

with open('xiaomi.json', 'w') as f:
    json.dump(all_app_list, f, ensure_ascii=False)

CSV模板

import csv
from urllib import request, parse
import re
import time
import random


class MaoyanSpider(object):
  def __init__(self):
    self.url = 'https://maoyan.com/board/4?offset={}'
    # 计数
    self.num = 0


  def get_html(self, url):
    headers = {
   
        'user-agent': 'Mozilla / 5.0(Windows NT 10.0; WOW64; Trident/7.0; rv:11.0) like Gecko'
    }
    req = request.Request(url=url, headers=headers)
    res = request.urlopen(req)
    html = res.read().decode('utf-8')
    # 直接调用解析函数
    self.parse_html(html)

  def parse_html(self, html):
    # 创建正则的编译对象
    re_ = '<div class="movie-item-info">.*?title="(.*?)".*?<p class="star">(.*?)</p>.*?<p class="releasetime">(.*?)</p> '
    pattern = re.compile(re_, re.S)
    # film_list:[('霸王别姬','张国荣','1993')]
    film_list = pattern.findall(html)
    self.write_html(film_list)

  # 存入csv文件-writerrows
  def write_html(self, film_list):
    L = []
    with open('maoyanfilm.csv', 'a',newline='') as f:
      # 初始化写入对象,注意参数f不能忘
      writer = csv.writer(f)
      for film in film_list:
        t = (
          film[0].strip(),
          film[1].strip(),
          film[2].strip()[5:15]
        )
        self.num += 1
        L.append(t)
      # writerow()参数为列表
      writer.writerows(L)
      print(L)


  def main(self):
    for offset in range(0, 91, 10):
      url = self.url.format(offset)
      self.get_html(url)
      time.sleep(random.randint(1, 2))
    print('共抓取数据', self.num, "部")


if __name__ == '__main__':
  start = time.time()
  spider = MaoyanSpider()
  spider.main()
  end = time.time()
  print('执行时间:%.2f' % (end - start))

txt模板

注意: 如果需要换行要自己在写入内容中添加\n

  • 单行写入
# 以二进制方式写入
f  = open("write.txt","wb")
# 以二进制方式追加
# f  = open("write.txt","ab")
# 写入
res = f.write(b"Big Bang Bang\n")
print(res)
res = f.write(b"Big Bang Bang\n")
print(res)
#关闭
f.close()
  • 多行写入
f  = open("write.txt","w")
# 写入信息
msg =  f.writelines(["你好","我很好","那就好"])
# 关闭
f.close()

redis增量URl爬虫实现思路

1】原理
    利用Redis集合特性,可将抓取过的指纹添加到redis集合中,根据返回值来判定是否需要抓取
	返回值为1 : 代表之前未抓取过,需要进行抓取
	返回值为0 : 代表已经抓取过,无须再次抓取
    
    hash加密:md5 32位、sha1 40位
    
    创建md5对象
    用update将href转为byte类型
    最后通过hexdigest获取加密码
    
【2】代码实现模板
import redis
from hashlib import md5
import sys

class XxxIncrSpider:
  def __init__(self):
    self.r = redis.Redis(host='localhost',port=6379,db=0)
    
  def url_md5(self,url):
    """对URL进行md5加密函数"""
    s = md5()
    s.update(url.encode())
    return s.hexdigest()
  
  def run_spider(self):
    href_list = ['url1','url2','url3','url4']
    for href in href_list:
      href_md5 = self.url_md5(href)
      if self.r.sadd('spider:urls',href_md5) == 1:
        返回值为1表示添加成功,即之前未抓取过,则开始抓取
      else:
        sys.exit()

休眠设置

# 控制数据抓取频率:uniform()生成制定范围内的浮点数
time.sleep(random.uniform(2,5))
# 生成整形的数组
time.sleep(random.randint(1,2))

时间参数

import datetime
timestape = str(datetime.datetime.now()).split(".")[0]

打码平台

  • 图鉴
  • 超级鹰

云词

import os,time,json,random,jieba,requests
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt
from wordcloud import WordCloud

# 词云形状图片
WC_MASK_IMG = 'wawa.jpg'
# 评论数据保存文件
COMMENT_FILE_PATH = 'jd_comment.txt'
# 词云字体
WC_FONT_PATH = '/Library/Fonts/Songti.ttc'

def spider_comment(page=0):
    """
    爬取京东指定页的评价数据
    :param page: 爬取第几,默认值为0
    """
    url = 'https://sclub.jd.com/comment/productPageComments.action?callback=fetchJSON_comment98vv4646&productId=1263013576&score=0&sortType=5&page=%s&pageSize=10&isShadowSku=0&fold=1' % page
    kv = {
   'user-agent': 'Mozilla/5.0', 'Referer': 'https://item.jd.com/1263013576.html'}
    # proxies = {
   
    #     '1.85.5.66':'8060',
    #     '171.11.178.223':'9999',
    #     '120.194.42.157':'38185',
    #     '161.35.4.201':'80',
    #     '175.42.123.196':'9999',
    # }
    try:
        r = requests.get(url, headers=kv)#,proxies=proxies
        r.raise_for_status()
    except:
        print('爬取失败')
    # 截取json数据字符串
    r_json_str = r.text[26:-2]
    # 字符串转json对象
    r_json_obj = json.loads(r_json_str)
    # 获取评价列表数据
    r_json_comments = r_json_obj['comments']
    # 遍历评论对象列表
    for r_json_comment in r_json_comments:
        # 以追加模式换行写入每条评价
        with open(COMMENT_FILE_PATH, 'a+') as file:
            file.write(r_json_comment['content'] + '\n')
        # 打印评论对象中的评论内容
        print(r_json_comment['content'])

def batch_spider_comment():
    """
    批量爬取某东评价
    """
    # 写入数据前先清空之前的数据
    if os.path.exists(COMMENT_FILE_PATH):
        os.remove(COMMENT_FILE_PATH)
    for i in range(3):
        spider_comment(i)
        # 模拟用户浏览,设置一个爬虫间隔,防止ip被封
        time.sleep(random.random() * 5)

def cut_word():
    """
    对数据分词
    :return: 分词后的数据
    """
    with open(COMMENT_FILE_PATH) as file:
        comment_txt = file.read()
        wordlist = jieba.cut(comment_txt, cut_all=True)
        wl = " ".join(wordlist)
        print(wl)
        return wl

def create_word_cloud():
    """
    生成词云
    :return:
    """
    # 设置词云形状图片
    wc_mask = np.array(Image.open(WC_MASK_IMG))
    # 设置词云的一些配置,如:字体,背景色,词云形状,大小
    wc = WordCloud(background_color="white", max_words=2000, mask=wc_mask, scale=4,
                   max_font_size=50, random_state=42, font_path=WC_FONT_PATH)
    # 生成词云
    wc.generate(cut_word())

    # 在只设置mask的情况下,你将会得到一个拥有图片形状的词云
    plt.imshow(wc, interpolation="bilinear")
    plt.axis("off")
    plt.figure()
    plt.show()

if __name__ == '__main__':
    # 爬取数据
    batch_spider_comment()

    # 生成词云
    create_word_cloud()

多线程爬虫

应用场景

1】多进程 :CPU密集程序
【2】多线程 :爬虫(网络I/O)、本地磁盘I/O

多线程爬虫示例【豆瓣】

# 抓取豆瓣电影剧情类别下的电影信息
"""
豆瓣电影 - 剧情 - 抓取
"""
import requests
from fake_useragent import UserAgent
import time
import random
from threading import Thread,Lock
from queue import Queue

class DoubanSpider:
    def __init__(self):
        self.url = 'https://movie.douban.com/j/chart/top_list?type=13&interval_id=100%3A90&action=&start={}&limit=20'
        self.i = 0
        # 队列 + 锁
        self.q = Queue()
        self.lock = Lock()

    def get_agent(self):
        """获取随机的User-Agent"""
        return UserAgent().random

    def url_in(self):
        """把所有要抓取的URL地址入队列"""
        for start in range(0,684,20):
            url = self.url.format(start)
            # url入队列
            self.q.put(url)

    # 线程事件函数:请求+解析+数据处理
    def get_html(self):
        while True:
            # 从队列中获取URL地址
            # 一定要在判断队列是否为空 和 get() 地址 前后加锁,防止队列中只剩一个地址时出现重复判断
            self.lock.acquire()
            if not self.q.empty():
                headers = {
   'User-Agent': self.get_agent()}
                url = self.q.get()
                self.lock.release()

                html = requests.get(url=url, headers=headers).json()
                self.parse_html(html)
            else:
                # 如果队列为空,则最终必须释放锁
                self.lock.release()
                break

    def parse_html(self, html):
        """解析"""
        # html: [{},{},{},{}]
        item = {
   }
        for one_film in html:
            item['rank'] = one_film['rank']
            item['title'] = one_film['title']
            item['score'] = one_film['score']
            print(item)
            # 加锁 + 释放锁
            self.lock.acquire()
            self.i += 1
            self.lock.release()

    def run(self):
        # 先让URL地址入队列
        self.url_in()
        # 创建多个线程,开干吧
        t_list = []
        for i in range(1):
            t = Thread(target=self.get_html)
            t_list.append(t)
            t.start()

        for t in t_list:
            t.join()

        print('数量:',self.i)

if __name__ == '__main__':
    start_time = time.time()
    spider = DoubanSpider()
    spider.run()
    end_time = time.time()
    print('执行时间:%.2f' % (end_time-start_time))

selenium+PhantomJS/Chrome/Firefox

selenium

1】定义
    1.1) 开源的Web自动化测试工具
    
【2】用途
    2.1) 对Web系统进行功能性测试,版本迭代时避免重复劳动
    2.2) 兼容性测试(测试web程序在不同操作系统和不同浏览器中是否运行正常)
    2.3) 对web系统进行大数量测试
    
【3】特点
    3.1) 可根据指令操控浏览器
    3.2) 只是工具,必须与第三方浏览器结合使用
    
【4】安装
    4.1) Linux: sudo pip3 install selenium
    4.2) Windows: python -m pip install selenium

绕过selenium检测

from selenium import webdriver
options = webdriver.ChromeOptions()
# 此步骤很重要,设置为开发者模式,防止被各大网站识别出来使用了Selenium
options.add_experimental_option('excludeSwitches', ['enable-automation'])
#停止加载图片
options.add_experimental_option("prefs", {
   "profile.managed_default_content_settings.images": 2})
browser = webdriver.Chrome(options=options)
browser.get('https://www.taobao.com/')

selenium对cookie的处理

selenium能够帮助我们处理页面中的cookie,比如获取、删除,接下来我们就学习这部分知识

获取cookie

driver.get_cookies()返回列表,其中包含的是完整的cookie信息!不光有name、value,还有domain等cookie其他维度的信息。所以如果想要把获取的cookie信息和requests模块配合使用的话,需要转换为name、value作为键值对的cookie字典

# 获取当前标签页的全部cookie信息
print(driver.get_cookies())
# 把cookie转化为字典
cookies_dict = {cookie[‘name’]: cookie[‘value’] for cookie in driver.get_cookies()}

删除cookie

#删除一条cookie
driver.delete_cookie("CookieName")

# 删除所有的cookie
driver.delete_all_cookies()

selenium使用代理ip

selenium控制浏览器也是可以使用代理ip的!

  • 使用代理ip的方法

    • 实例化配置对象
      • options = webdriver.ChromeOptions()
    • 配置对象添加使用代理ip的命令
      • options.add_argument('--proxy-server=http://202.20.16.82:9527')
    • 实例化带有配置对象的driver对象
      • driver = webdriver.Chrome('./chromedriver', chrome_options=options)
  • 参考代码如下:

    from selenium import webdriver
    
    options = webdriver.ChromeOptions() # 创建一个配置对象
    options.add_argument('--proxy-server=http://202.20.16.82:9527') # 使用代理ip
    
    driver = webdriver.Chrome(chrome_options=options) # 实例化带有配置的driver对象
    
    driver.get('http://www.itcast.cn')
    print(driver.title)
    driver.quit()
    

selenium替换user-agent

selenium控制谷歌浏览器时,User-Agent默认是谷歌浏览器的,这一小节我们就来学习使用不同的User-Agent

  • 替换user-agent的方法

    • 实例化配置对象
      • options = webdriver.ChromeOptions()
    • 配置对象添加替换UA的命令
      • options.add_argument('--user-agent=Mozilla/5.0 HAHA')
    • 实例化带有配置对象的driver对象
      • driver = webdriver.Chrome('./chromedriver', chrome_options=options)
  • 参考代码如下:

    from selenium import webdriver
    
    options = webdriver.ChromeOptions() # 创建一个配置对象
    options.add_argument('--user-agent=Mozilla/5.0 HAHA') # 替换User-Agent
    
    driver = webdriver.Chrome('./chromedriver', chrome_options=options)
    
    driver.get('http://www.itcast.cn')
    print(driver.title)
    driver.quit()
    

  • selenium替换user-agent

指定端口遥控

from selenium import webdriver
chrome_options = webdriver.ChromeOptions()
chrome_options.add_experimental_option('debuggerAddress','127.0.0.1:9222')
browser=webdriver.Chrome(executable_path=r'C:\Users\TR\AppData\Local\Google\Chrome
\Application\chromedriver.exe',chrome_options=chrome_options)
browser.get('http://www.zhihu.com')

PhantomJS浏览器

1】定义
    phantomjs为无界面浏览器(又称无头浏览器),在内存中进行页面加载,高效
 
【2】下载地址
    2.1) chromedriver : 下载对应版本
       http://npm.taobao.org/mirrors/chromedriver/
    
    2.2) geckodriver
       https://github.com/mozilla/geckodriver/releases
            
    2.3) phantomjs
       https://phantomjs.org/download.html

【3】Ubuntu安装
    3.1) 下载后解压 : tar -zxvf geckodriver.tar.gz 
        
    3.2) 拷贝解压后文件到 /usr/bin/ (添加环境变量)
         sudo cp geckodriver /usr/bin/
        
    3.3) 添加可执行权限
         sudo chmod 777 /usr/bin/geckodriver

【4】Windows安装
    4.1) 下载对应版本的phantomjs、chromedriver、geckodriver
    4.2) 把chromedriver.exe拷贝到python安装目录的Scripts目录下(添加到系统环境变量)
         # 查看python安装路径: where python
    4.3) 验证
         cmd命令行: chromedriver
 
***************************总结**************************************1】解压 - 放到用户主目录(chromedriver、geckodriver、phantomjs)2】拷贝 - sudo cp /home/tarena/chromedriver /usr/bin/3】权限 - sudo chmod 777 /usr/bin/chromedriver

# 验证
【Ubuntu | Windows】
ipython3
from selenium import webdriver
webdriver.Chrome()
或者
webdriver.Firefox()

【mac】
ipython3
from selenium import webdriver
webdriver.Chrome(executable_path='/Users/xxx/chromedriver')
或者
webdriver.Firefox(executable_path='/User/xxx/geckodriver')

百度示例代码

"""示例代码一:使用 selenium+浏览器 打开百度"""

# 导入seleinum的webdriver接口
from selenium import webdriver
import time

# 创建浏览器对象
browser = webdriver.Chrome()
browser.get('http://www.baidu.com/')
# 5秒钟后关闭浏览器
time.sleep(5)
browser.quit()
"""示例代码二:打开百度,搜索赵丽颖,点击搜索,查看"""

from selenium import webdriver
import time

# 1.创建浏览器对象 - 已经打开了浏览器
browser = webdriver.Chrome()
# 2.输入: http://www.baidu.com/
browser.get('http://www.baidu.com/')
# 3.找到搜索框,向这个节点发送文字: 赵丽颖
browser.find_element_by_xpath('//*[@id="kw"]').send_keys('赵丽颖')
# 4.找到 百度一下 按钮,点击一下
browser.find_element_by_xpath('//*[@id="su"]').click()

浏览器对象(browser)方法

1】browser.get(url=url)   - 地址栏输入url地址并确认   
【2】browser.quit()         - 关闭浏览器
【3】browser.close()        - 关闭当前页
【4】browser.page_source    - HTML结构源码
【5】browser.page_source.find('字符串')
    从html源码中搜索指定字符串,没有找到返回:-1,经常用于判断是否为最后一页
【6】browser.maximize_window() - 浏览器窗口最大化

Selenium 定位节点八种方法

1】单元素查找('结果为1个节点对象')
    1.1) 【最常用】browser.find_element_by_id('id属性值')
    1.2) 【最常用】browser.find_element_by_name('name属性值')
    1.3) 【最常用】browser.find_element_by_class_name('class属性值')
    1.4) 【最万能】browser.find_element_by_xpath('xpath表达式')
    1.5) 【匹配a节点时常用】browser.find_element_by_link_text('链接文本')
    1.6) 【匹配a节点时常用】browser.find_element_by_partical_link_text('部分链接文本')
    1.7) 【最没用】browser.find_element_by_tag_name('标记名称')
    1.8) 【较常用】browser.find_element_by_css_selector('css表达式')2】多元素查找('结果为[节点对象列表]')
    2.1) browser.find_elements_by_id('id属性值')
    2.2) browser.find_elements_by_name('name属性值')
    2.3) browser.find_elements_by_class_name('class属性值')
    2.4) browser.find_elements_by_xpath('xpath表达式')
    2.5) browser.find_elements_by_link_text('链接文本')
    2.6) browser.find_elements_by_partical_link_text('部分链接文本')
    2.7) browser.find_elements_by_tag_name('标记名称')
    2.8) browser.find_elements_by_css_selector('css表达式')

猫眼电影示例

from selenium import webdriver
import time

url = 'https://maoyan.com/board/4'
browser = webdriver.Chrome()
browser.get(url)

def get_data():
    # 基准xpath: [<selenium xxx li at xxx>,<selenium xxx li at>]
    li_list = browser.find_elements_by_xpath('//*[@id="app"]/div/div/div[1]/dl/dd')
    for li in li_list:
        item = {
   }
        # info_list: ['1', '霸王别姬', '主演:张国荣', '上映时间:1993-01-01', '9.5']
        info_list = li.text.split('\n')
        item['number'] = info_list[0]
        item['name'] = info_list[1]
        item['star'] = info_list[2]
        item['time'] = info_list[3]
        item['score'] = info_list[4]

        print(item)

while True:
    get_data()
    try:
        browser.find_element_by_link_text('下一页').click()
        time.sleep(2)
    except Exception as e:
        print('恭喜你!抓取结束')
        browser.quit()
        break
  • 节点对象操作

    1】文本框操作
        1.1) node.send_keys('')  - 向文本框发送内容
        1.2) node.clear()        - 清空文本
        1.3) node.get_attribute('value') - 获取文本内容
        
    【2】按钮操作
        1.1) node.click()      - 点击
        1.2) node.is_enabled() - 判断按钮是否可用
        1.3) node.get_attribute('value') - 获取按钮文本
    

chromedriver设置无界面模式

from selenium import webdriver

options = webdriver.ChromeOptions()
# 添加无界面参数
options.add_argument('--headless')
browser = webdriver.Chrome(options=options)

selenium - 鼠标操作

from selenium import webdriver
# 导入鼠标事件类
from selenium.webdriver import ActionChains

driver = webdriver.Chrome()
driver.get('http://www.baidu.com/')

# 移动到 设置,perform()是真正执行操作,必须有
element = driver.find_element_by_xpath('//*[@id="u1"]/a[8]')
ActionChains(driver).move_to_element(element).perform()

# 单击,弹出的Ajax元素,根据链接节点的文本内容查找
driver.find_element_by_link_text('高级搜索').click()

selenium处理iframe反爬

  • 经典案例 民政局抓取、QQ邮箱登录、163邮箱登录
1】鼠标操作
    from selenium.webdriver import ActionChains
    ActionChains(browser).move_to_element(node).perform()2】切换句柄
    all_handles = browser.window_handles
    time.sleep(1)
    browser.switch_to.window(all_handles[1])3】iframe子框架
    browser.switch_to.frame(iframe_element)
    # 写法1 - 任何场景都可以: 
    iframe_node = browser.find_element_by_xpath('')
    browser.switch_to.frame(iframe_node)
    
    # 写法2 - 默认支持 id 和 name 两个属性值:
    browser.switch_to.frame('id属性值|name属性值')

转载:https://blog.csdn.net/weixin_38640052/article/details/116124192
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场