飞道的博客

Python爬取国家税务总局纳税信用A级纳税人信息

382人阅读  评论(0)

Python爬取国家税务总局纳税信用A级纳税人信息

特此声明

此程序只交流此次程序完成的心得,不包含任何商业用途。如有帮助到您,请不要恶意增加该网站的压力。原创文章,如需转载请注明出处。(https://blog.csdn.net/Wenzhe339/article/details/108307808)

环境

python 3.7.5

​ 模块:

​ requests、multiprocessing、pymongo、logging

Proxy_pool (ip进程池 github项目)

谷歌浏览器

火狐浏览器 (更好的解析网页中json内容和hearder相关内容)

网页分析

打开纳税人信息网址 (http://hd.chinatax.gov.cn/nszx/InitCredit.html) 随便点一个地区之后点击F12开发者模式。会看到两个findCredit.do的请求,一个请求状态是307,一个请求状态是200。

点击状态码是200的这个请求后,我们首先看他的请求头(火狐浏览器)。需要记录的是post请求的url,观察请求头中的Cookie的组成。这个Cookie组成部分结尾会一起说明。

然后点击请求查看对改请求后的数据,显而易见,返回的数据类型是json,这样更有利于我们去存储数据。这个json的主要的信息在pageable和content中。


重点:

  • 对于这个网站最最让人头痛的一点就是,无论你是不是爬虫,一段时间内对数据请求的次数超过5-10次之后,你的ip就会被封30min-6小时不等。

  • 如果你需要获取这个网站的数据,那么就需要考虑两点内容:

    • ip的问题
    • Cookie的问题
      • ip的问题需要找个代理池,模拟不同的ip对该地址进行访问,目前有免费的ip代理和付费的, 我使用的是从各大免费的ip代理池爬取的。
      • cookie的问题需要我们具体分析,该网站的cookie的组成部分重要是的_Jo0OQK、JSESSIONID和C3VK。前面的几个kv值是不变的。而在同一时间段内,_Jo0OQK和JSESSIONID是不会变的,C3VK是一个随机的6位字母和数字的组合,而多次看这个值会发现,这个C3VK组合的最后一位是数字。

打开纳税人信息网址[http://hd.chinatax.gov.cn/nszx/InitCredit.html],点击F12分析网页 (浏览器:谷歌浏览器),返回主页查看它的cookie就会发现和数据页的cookie一样。只是C3VK不一样。

总结:综上对网页的分析,就会发现其实这个网页的数据很好拿,只是需要对反爬的机制进行一定的措施即可。

程序编写

前期

  • 在写程序前期需要考虑最终将数据放到哪去,2014-2019年的全部省份的数据量差不多有500多万条,(本人数据库小白,只会简单的增删改查)我当时考虑的是Nosql对于json数据会好存一点,就选择了mongodb,殊不知存数据容易,取数据特别慢。如果简单玩一下那就存在内存然后用pandas存成csv或者excel就OK了。我不晓得500万条数据mysql会不会快点,就请各位大神在选择数据库时自行斟酌。
  • 如果需要大量爬取数据,那cookie需要定期更换,不然会对网页发送请求获取不到数据。人为的去更换cookie不是办法,所以推荐使用requests.Session()先对主页进行请求,目的是为了获取目前的Cookie,写一个小的测试文件,用于获取一下网站的cookie,经测试,session返回的cookie只有_Jo0OQK,经过查阅度娘,后我也没理解JSESSIONID的含义。不过这个拿个有效值就OK了,也不需要更改。接下来编写一个随机生成C3VK的方法就可以自动生成cookie了。
  • ip代理池是我在GitHub上找到的一个程序,名字是proxy_pool,这个项目一直在更新,目前可以用。proxy_pool我不过多介绍,有兴趣请各位大佬前去阅读他的文档。

代码部分

我所用到的方法和程序核心

  1. logging模块的设置
# 设置log日志本地和控制台双输出
logger = logging.getLogger()
logger.setLevel('INFO')
formatter = logging.Formatter('%(asctime)s ->>[%(process)s]<<- [%(levelname)s] --> %(message)s', '[%m-%d %H:%M:%S]')
chlr = logging.StreamHandler() # 输出到控制台的handler
chlr.setFormatter(formatter)
chlr.setLevel('INFO')  # 也可以不设置,不设置就默认用logger的level
fhlr = logging.FileHandler('../log/credits.log', mode='a') # 输出到文件的handler
fhlr.setFormatter(formatter)
logger.addHandler(chlr)
logger.addHandler(fhlr)
  1. 获取c3vk的方法
import string 

def get_C3VK():
  	""" 这个方法里面也可以不用string,也可以自己手动写进去a-z和A-Z,让他们组成一个列表就可以了。 """
    choiceNum = '0123456789'
    choiceStr = string.ascii_lowercase
    return ''.join(random.choices(choiceStr + choiceNum, k=5) + random.choices(choiceNum, k=1))
  1. 分割cookie的方法
def split_cookie2dict(cookie_str):
  	""" 因为cookie复制过来之后是长字符串,这边可以写一个分割字符串的。 其实比较累赘,手动写一个字典去存一个cookie也可以 """
    cookies = {}
    for line in cookie_str.split('; '):
        key, value = line.split('=', 1)
        cookies[key] = value
    return cookies
  1. 获取ip地址的归属地
def checkip(proxy):
  	"""
  	这个函数是可有可无的,我加进去是为了看ip代理池获取的ip都是哪里的。
	  ip的格式是: 111.11.11.111:1234 
	  这里有两个获取ip的方法,
	  	1. ip-api.com		优点:准确,成功率高,api功能比较完善。 	缺电: 外国网站,访问慢
	  	2. ip.taobao.com  	优点:中国内陆网络,访问快,api功能较完善 缺点:外国的定位不是特别准确,成功率低。
  	"""
    ip = proxy.split(':')[0]
    try:
        r = requests.get(f"http://ip-api.com/json/{ip}?lang=zh-CN&fields=1105951", proxies={"http": f"http://{proxy}"},
                         timeout=3).json()
        logging.info(f"ip: {ip} --> 国家: {r['country']}, 省份: {r['regionName']}, 市县: {r['city']}")
        # r = requests.post('http://ip.taobao.com/outGetIpInfo',
        #                   data={'ip': ip, 'accessKey': 'alibaba-inc'}, headers=header,
        #                   proxies={"http": f"http://{proxy}"})
        #rJson = r.json()
        #result = rJson['data'] if rJson['code'] == 0 else None
        #logging.info(f"ip: {ip} --> 国家: {result['country']}, 省份: {result['region']}, 市县: {result['city']}")
        # response = requests.get('https://www.baidu.com/', headers=header, proxies={"http": "http://{}".format(proxy)},
        #                           timeout=3)
        # return response.status_code is requests.codes.ok and r['countryCode'].upper() == 'CN'
    except ReadTimeout as rtE:
        logging.warning("获取ip归属地连接超时 -- " + str(rtE) + ' 所在行数:' + str(rtE.__traceback__.tb_lineno))
    except UnicodeEncodeError as ueE:
        logging.warning('获取ip归属地解码 -- ' + str(ueE) + ' 所在行数:' + str(ueE.__traceback__.tb_lineno))
    except Exception as e:
        logging.warning('获取ip归属地发生意外错误 -- ' + str(e) + ' 所在行数:' + str(e.__traceback__.tb_lineno))
  1. 主程序代码
class Credit:
    def __init__(self, year, start_page, max_page, location, sets):
        """
        初始化
        :param year:        爬取年份
        :param start_page:  开始页数
        :param max_page:    最大页数
        :param location:    地区码
        :param sets:        任务
        """

        def start_mongo(ip, sets):
            """ 初始化自动连接MongoDB """
            myclient = pymongo.MongoClient(f'mongodb://{ip}:27017/')
            db_admin = myclient.admin
            db_admin.authenticate(MongoUser, MongoPassword, mechanism='SCRAM-SHA-1')
            return myclient[MongoDB][sets]

        """
            collection : 操作mongo的游标
            session    : 当前爬取内容的session (如果重复爬取失败,则模拟重新获取session)
        """
        self.collection = start_mongo(MongoIP, sets)
        self.session = requests.Session()

        """
            startPageSet : 本次任务开始页数  (这个值不变,是sets传入时设置的)
            maxPageSet   : 本次任务最大页数  (这个值不变,是sets传入时设置的)
            nowPage      : 当前页数         (按页数循环爬取时,nowPage + 1)
            maxPage      : 当前任务最大页数  (如果不设置最大页数时,由爬取的信息自动获取爬取的页数)
        """
        self.startPageSet = start_page
        self.maxPageSet = max_page
        self.nowPage = start_page
        self.maxPage = start_page + 2 if max_page is None else max_page

        self.header1 = None
        self.header2 = None
        self.form_data = {'page': self.nowPage,
                          'location': location,
                          'code': '',
                          'name': '',
                          'evalyear': year,
                          # 'cPage': '',
                          }
        """
            self.proxyPoolAddress  : 先拿localhost的IP代理池
            self.proxy             : 当前的 "ip:port" 
            self.proxies           : 当前的proxy对应的 {"http": "http://ip:port"}
            self.cookie            : 当前的cookie
            self.isChangeAllHeader : 是否需要更改当前的请求头
            self.ipUseTimes        : ip使用次数
        """
        self.proxyPoolAddress = proxyPoolList[0]
        self.proxy = None
        self.proxies = None
        self.cookie = split_cookie2dict(
            'yfx_c_g_u_id_10003701=_ck20072111214511275787055858549; yfx_c_g_u_id_10003748=_ck20072215063217551535749866138; yfx_f_l_v_t_10003748=f_t_1595401592745__r_t_1595401592745__v_t_1595401592745__r_c_0; yfx_key_10003748=; yfx_mr_10003748=%3A%3Amarket_type_free_search%3A%3A%3A%3Abaidu%3A%3A%3A%3A%3A%3A%3A%3Awww.baidu.com%3A%3A%3A%3Apmf_from_free_search; yfx_mr_f_10003748=%3A%3Amarket_type_free_search%3A%3A%3A%3Abaidu%3A%3A%3A%3A%3A%3A%3A%3Awww.baidu.com%3A%3A%3A%3Apmf_from_free_search; yfx_mr_10003701=%3A%3Amarket_type_free_search%3A%3A%3A%3Abaidu%3A%3A%3A%3A%3A%3A%3A%3Awww.baidu.com%3A%3A%3A%3Apmf_from_free_search; yfx_key_10003701=; yfx_mr_f_10003701=%3A%3Amarket_type_free_search%3A%3A%3A%3Abaidu%3A%3A%3A%3A%3A%3A%3A%3Awww.baidu.com%3A%3A%3A%3Apmf_from_free_search; Hm_lvt_8875c662941dbf07e39c556c8d97615f=1596166214; _Jo0OQK=739C60DD4BD5D8E6D45066012A62CE7422E4259E3C200B324E3E9D69772F15991F3771E249C184DDD5E75314B28BE26DEDF61C867A6DB66C3D7D86BEEA2448582432DF09B220C8082124ED14892373145994ED1489237314599BE994A6718907CADGJ1Z1dg==; yfx_f_l_v_t_10003701=f_t_1595401277678__r_t_1596436112411__v_t_1596452645819__r_c_4; JSESSIONID=3BD7DED94BFA4651D3293394F7641398; C3VK={c3vk}'.format(
                c3vk=get_C3VK()))
        self.isChangeAllHeader = True
        self.ipUseTimes = 0

        """
            requestsTimes          :  总请求次数
            requestsSuccessTimes   :  成功请求次数
            exceptsTimes           :  触发错误次数
        """
        self.requestsTimes = 0
        self.requestsSuccessTimes = 0
        self.exceptsTimes = 0

        """ 下面两个参数是准备为程序增加一个计算时间长短的功能的 后来耽搁了就没写 """
        self.rStartTime = time.time()
        self.rStopTime = None
        logging.info("类初始化完毕 -- 爬取地区:" + str(sets) + ' 起止页数' + str(self.startPageSet) + ' - ' + str(
            self.maxPageSet) + ", 目前起止页数: " + str(self.nowPage) + ' - ' + str(self.maxPage))

    def get_header(self):
        user_agent = random.choice(UserAgents)
        self.header1 = {
            'User-Agent': user_agent,
            'Accept-Encoding': 'gzip, deflate',
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
            'Connection': 'keep-alive',
        }
        self.header2 = {
            'Accept-Encoding': 'gzip, deflate',
            'Accept-Language': 'zh-CN,zh;q=0.9',
            'Connection': 'keep-alive',
            'Content-Type': "application/x-www-form-urlencoded; charset=utf-8",
            'User-Agent': user_agent,
            'Origin': 'http://hd.chinatax.gov.cn',
            'Referer': "http://hd.chinatax.gov.cn/nszx/InitCredit.html",
            'X-Requested-With': 'XMLHttpRequest',
        }
        if self.nowPage >= 1:
            self.header2['Content-Length'] = b'55'

    def get_proxy(self):
        try:
            self.proxy = requests.get(f"http://{self.proxyPoolAddress}/get/", timeout=3).json().get("proxy")
        except ReadTimeout:
            logging.warning("获取ip时出现严重错误,需要重启ip池程序 -- ")
        self.proxies = {"http": "http://{}".format(self.proxy)}

    def delete_proxy(self):
        try:
            requests.get(f"http://{self.proxyPoolAddress}/delete/?proxy={self.proxy}", timeout=3)
        except ReadTimeout as rtE:
            logging.warning("删除ip时出现严重错误,需要重启ip池程序 -- ")
        self.proxy, self.proxies = None, None
        self.ipUseTimes, self.exceptsTimes = 0, 0
        self.isChangeAllHeader = True

    def get(self):
        """
        程序主函数,大致的逻辑是:
            1. 先判断是否需要更换请求头,如果不需要,则直接对Credit.do进行请求数据,这样的目的是为了节省ip代理池的开销。
            2. 如果代理池全部消耗完毕,那么会进入死循环,直到某个代理池中的有ip才会恢复爬取,也可在进行time.sleep()
            3. 程序如果成功获取数据,那么数据会利用mongo的游标进行全部插入数据,并在内存中销毁此数据,以节省内存消耗。
            4. 成功插入数据后,代表这个ip是可用的,那么会让他连续用10次,10次内什么时候获取不到数据什么时候更换新的ip地址。
        事项: 在程序第一和第二版中,我对各个环节都会做出判断并直接进行continue,结果成功率非常低,本程序的逻辑不是特别严谨,可能在获取cookie时失败,但会成功获取到数据,后经改良,采用try..except..这种方式, 大大增加了成功率
        :return: 
        """
        while self.nowPage < self.maxPage:
            if self.isChangeAllHeader:
                self.get_header()
                self.get_proxy()

                if self.proxy is None:
                    logging.info(f"{self.proxyPoolAddress} ip池 消耗完毕……")
                    self.proxyPoolAddress = random.choice(proxyPoolList)
                    # time.sleep(60)
                    continue

                self.session = requests.Session()
                self.session.headers = CaseInsensitiveDict(self.header1)
                self.session.proxies = self.proxies
                checkip(self.proxy)
            try:
                if self.isChangeAllHeader:
                    s = self.session.get(homeUrl, headers=self.header1, proxies=self.proxies, verify=False,
                                         timeout=3)
                    logging.info("Session.get 状态码 --> " + str(s.status_code))
                    self.cookie.update(self.session.cookies.get_dict())
                self.cookie['C3VK'] = get_C3VK()
                self.form_data['page'] = self.nowPage
                r = self.session.post(url, data=self.form_data, headers=self.header2, cookies=self.cookie,
                                      proxies=self.proxies, verify=False, timeout=4)
                logging.info("Session.post 状态码 --> " + str(r.status_code))
                results = r.json()
                recordsList = [{'code': each['code'], 'name': each['name'], 'evalyear': each['evalyear'],
                                'location': each['location'], 'pagenum': results['pageable']['pageNumber']} for each in
                               results['content']]
                if not isTest:
                    self.collection.insert_many(recordsList)
                    [logging.info("成功获取并插入数据 --> " + str(records)) for records in recordsList]
                del recordsList
                if self.maxPageSet is None:
                    with open('../out/maxPage.txt', 'a', encoding='utf-8') as f:
                        f.write(str(self.form_data) + ' - 最大页数 - ' + str(results['totalPages']) + '\n')
                    logging.info("已更新最新的最大页数 -- " + str(results['totalPages']) + ' 页……')
                    self.maxPage = results['totalPages']
                    self.maxPageSet = 1
                # 可以使用的ip 连续用十次
                self.ipUseTimes += 1
                if self.ipUseTimes < 10:
                    self.isChangeAllHeader = False
                else:
                    self.ipUseTimes = 0
                    self.isChangeAllHeader = True
                self.nowPage += 1
                self.requestsSuccessTimes += 1
            except ReadTimeout as rtE:
                logging.warning("连接超时1 -- " + str(rtE) + ' 所在行数:' + str(rtE.__traceback__.tb_lineno))
                self.delete_proxy()
                continue
            except ConnectTimeout as ctE:
                logging.warning("连接超时2 -- " + str(ctE) + ' 所在行数:' + str(ctE.__traceback__.tb_lineno))
                self.delete_proxy()
                continue
            except ProxyError as pE:
                logging.warning("连接错误3 -- " + str(pE) + ' 所在行数:' + str(pE.__traceback__.tb_lineno))
                self.delete_proxy()
                continue
            except JSONDecodeError as jsdE:
                logging.warning("爬取信息失败 -- " + str(jsdE) + ' 所在行数:' + str(jsdE.__traceback__.tb_lineno))
                self.exceptsTimes += 1
                if self.exceptsTimes == 3:
                    self.delete_proxy()
                continue
            except Exception as e:
                logging.warning("爬取信息失败2 -- " + str(e) + ' 所在行数:' + str(e.__traceback__.tb_lineno))
                self.exceptsTimes += 1
                if self.exceptsTimes == 3:
                    self.delete_proxy()
                continue
            finally:
                self.requestsTimes += 1
                if self.requestsTimes % 100 == 0:
                    logging.info(
                        f"已请求100次, 共请求了{self.requestsTimes}次, 成功爬取了{self.nowPage - self.startPageSet}页, 成功率为 --> [{round(self.requestsSuccessTimes / self.requestsTimes, 2) * 100}%]" + f" -- 目前正在使用 [{self.proxyPoolAddress} 池]")
        logging.warning("程序结束了 --  目前起止页数: " + str(self.nowPage) + ' - ' + str(self.maxPage) + 'form_data --> ' + str(self.form_data))
  1. 主程序入口和多进程
if __name__ == '__main__':
    if processNum <= 1 or len(jobs) <= 1:
        for job in jobs:
            credit = Credit(*job)
            credit.get()
    else:
        pool = Pool(processes=processNum)
        for job in jobs:
            pool.apply_async(func=run, args=(*job,))

        pool.close()
        pool.join()
  1. 代码中很多设置项的来源 settings.py
homeUrl = 'http://hd.chinatax.gov.cn/nszx/InitCredit.html'
url = 'http://hd.chinatax.gov.cn/service/findCredit.do'

UserAgents = [
    'Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10_6_8; en-us) AppleWebKit/534.50 (KHTML, like Gecko) Version/5.1 Safari/534.50',
    'Mozilla/5.0 (Windows; U; Windows NT 6.1; en-us) AppleWebKit/534.50 (KHTML, like Gecko) Version/5.1 Safari/534.50',
    'Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0',
    "Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.0; Trident/4.0)",
    'Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.0)',
    "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1)",
    'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.6; rv:2.0.1) Gecko/20100101 Firefox/4.0.1',
    'Mozilla/5.0 (Windows NT 6.1; rv:2.0.1) Gecko/20100101 Firefox/4.0.1',
    'Opera/9.80 (Macintosh; Intel Mac OS X 10.6.8; U; en) Presto/2.8.131 Version/11.11',
    'Opera/9.80 (Windows NT 6.1; U; en) Presto/2.8.131 Version/11.11',
    'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_0) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.56 Safari/535.11',
    'Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Maxthon 2.0)',
    'Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; TencentTraveler 4.0)',
    'Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1)',
    'Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; The World)',
    "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Trident/4.0; SE 2.X MetaSr 1.0; SE 2.X MetaSr 1.0; .NET CLR 2.0.50727; SE 2.X MetaSr 1.0)",
    'Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; 360SE)',
    'Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Avant Browser)',
    'Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1)',
]

# Credit.do 地址码对应的城市名称
locations = {'110000': '北京', '120000': '天津', '130000': '河北', '140000': '山西', '150000': '内蒙古', '210000': '辽宁',
             '210200': '大连', '220000': '吉林', '230000': '黑龙江', '310000': '上海', '320000': '江苏', '330000': '浙江',
             '330200': '宁波', '340000': '安徽', '350000': '福建', '350200': '厦门', '360000': '江西', '370000': '山东',
             '370200': '青岛', '410000': '河南', '420000': '湖北', '430000': '湖南', '440000': '广东', '440300': '深圳',
             '450000': '广西', '460000': '海南', '500000': '重庆', '510000': '四川', '520000': '贵州', '530000': '云南',
             '540000': '西藏', '610000': '陕西', '620000': '甘肃', '630000': '青海', '640000': '宁夏', '650000': '新疆'}

MongoIP = "your mongo db IP"
MongoUser = "your mongo db User"
MongoPassword = 'your mongo db PassWord'
MongoDB = 'your mongo db Name'
MongoSets = "当前sets任务所对应的 数据库名称"


def get_jobs(location, locatoin_name):
  	""" 如果懒得像下面自定义当前任务 则可以在job后面增加一个get_jobs()函数即可 """
    return [(str(year), 0, None, location, locatoin_name) for year in range(2014, 2020)]


# task 任务 dict --> 年份,起始页数,最大页数,地区
# ex:
jobs = [
    ('2019', 0, 3000, '370000', 'shandong'),
    ('2019', 3000, 6000, '370000', 'shandong'),
    ('2019', 6000, 7423, '370000', 'shandong'),
       ] + get_jobs("110000", 'beijing') \
				 + get_jobs("120000", 'tianjin')

# 可在这增加多个ip代理池对应的地址
proxyPoolList = ['localhost:5010']

# 多进程模块 当前多进程数量
processNum = len(jobs)
# 是否是测试模式
isTest = False

结语

我自己用下来成功率还蛮高的,大概30%-70%吧,这个成功率主要是由当前ip代理池决定的,有兴趣可以自行尝试。由于有很多失误浪费了时间,我爬了所有数据用了半个月,如果不出错7-10天应该可以爬完,但是想想又要会对税务局的网站进很大的压力,请各位谨慎尝试。


转载:https://blog.csdn.net/Wenzhe339/article/details/108307808
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场