Python爬取国家税务总局纳税信用A级纳税人信息
特此声明
此程序只交流此次程序完成的心得,不包含任何商业用途。如有帮助到您,请不要恶意增加该网站的压力。原创文章,如需转载请注明出处。(https://blog.csdn.net/Wenzhe339/article/details/108307808)
环境
python 3.7.5
模块:
requests、multiprocessing、pymongo、logging
Proxy_pool (ip进程池 github项目)
谷歌浏览器
火狐浏览器 (更好的解析网页中json内容和hearder相关内容)
网页分析
打开纳税人信息网址 (http://hd.chinatax.gov.cn/nszx/InitCredit.html) 随便点一个地区之后点击F12开发者模式。会看到两个findCredit.do的请求,一个请求状态是307,一个请求状态是200。
点击状态码是200的这个请求后,我们首先看他的请求头(火狐浏览器)。需要记录的是post请求的url,观察请求头中的Cookie的组成。这个Cookie组成部分结尾会一起说明。
然后点击请求查看对改请求后的数据,显而易见,返回的数据类型是json,这样更有利于我们去存储数据。这个json的主要的信息在pageable和content中。
重点:
-
对于这个网站最最让人头痛的一点就是,无论你是不是爬虫,一段时间内对数据请求的次数超过5-10次之后,你的ip就会被封30min-6小时不等。
-
如果你需要获取这个网站的数据,那么就需要考虑两点内容:
- ip的问题
- Cookie的问题
- ip的问题需要找个代理池,模拟不同的ip对该地址进行访问,目前有免费的ip代理和付费的, 我使用的是从各大免费的ip代理池爬取的。
- cookie的问题需要我们具体分析,该网站的cookie的组成部分重要是的_Jo0OQK、JSESSIONID和C3VK。前面的几个kv值是不变的。而在同一时间段内,_Jo0OQK和JSESSIONID是不会变的,C3VK是一个随机的6位字母和数字的组合,而多次看这个值会发现,这个C3VK组合的最后一位是数字。
打开纳税人信息网址[http://hd.chinatax.gov.cn/nszx/InitCredit.html],点击F12分析网页 (浏览器:谷歌浏览器),返回主页查看它的cookie就会发现和数据页的cookie一样。只是C3VK不一样。
总结:综上对网页的分析,就会发现其实这个网页的数据很好拿,只是需要对反爬的机制进行一定的措施即可。
程序编写
前期
- 在写程序前期需要考虑最终将数据放到哪去,2014-2019年的全部省份的数据量差不多有500多万条,(本人数据库小白,只会简单的增删改查)我当时考虑的是Nosql对于json数据会好存一点,就选择了mongodb,殊不知存数据容易,取数据特别慢。如果简单玩一下那就存在内存然后用pandas存成csv或者excel就OK了。我不晓得500万条数据mysql会不会快点,就请各位大神在选择数据库时自行斟酌。
- 如果需要大量爬取数据,那cookie需要定期更换,不然会对网页发送请求获取不到数据。人为的去更换cookie不是办法,所以推荐使用requests.Session()先对主页进行请求,目的是为了获取目前的Cookie,写一个小的测试文件,用于获取一下网站的cookie,经测试,session返回的cookie只有_Jo0OQK,经过查阅度娘,后我也没理解JSESSIONID的含义。不过这个拿个有效值就OK了,也不需要更改。接下来编写一个随机生成C3VK的方法就可以自动生成cookie了。
- ip代理池是我在GitHub上找到的一个程序,名字是proxy_pool,这个项目一直在更新,目前可以用。proxy_pool我不过多介绍,有兴趣请各位大佬前去阅读他的文档。
代码部分
我所用到的方法和程序核心
- logging模块的设置
# 设置log日志本地和控制台双输出
logger = logging.getLogger()
logger.setLevel('INFO')
formatter = logging.Formatter('%(asctime)s ->>[%(process)s]<<- [%(levelname)s] --> %(message)s', '[%m-%d %H:%M:%S]')
chlr = logging.StreamHandler() # 输出到控制台的handler
chlr.setFormatter(formatter)
chlr.setLevel('INFO') # 也可以不设置,不设置就默认用logger的level
fhlr = logging.FileHandler('../log/credits.log', mode='a') # 输出到文件的handler
fhlr.setFormatter(formatter)
logger.addHandler(chlr)
logger.addHandler(fhlr)
- 获取c3vk的方法
import string
def get_C3VK():
""" 这个方法里面也可以不用string,也可以自己手动写进去a-z和A-Z,让他们组成一个列表就可以了。 """
choiceNum = '0123456789'
choiceStr = string.ascii_lowercase
return ''.join(random.choices(choiceStr + choiceNum, k=5) + random.choices(choiceNum, k=1))
- 分割cookie的方法
def split_cookie2dict(cookie_str):
""" 因为cookie复制过来之后是长字符串,这边可以写一个分割字符串的。 其实比较累赘,手动写一个字典去存一个cookie也可以 """
cookies = {}
for line in cookie_str.split('; '):
key, value = line.split('=', 1)
cookies[key] = value
return cookies
- 获取ip地址的归属地
def checkip(proxy):
"""
这个函数是可有可无的,我加进去是为了看ip代理池获取的ip都是哪里的。
ip的格式是: 111.11.11.111:1234
这里有两个获取ip的方法,
1. ip-api.com 优点:准确,成功率高,api功能比较完善。 缺电: 外国网站,访问慢
2. ip.taobao.com 优点:中国内陆网络,访问快,api功能较完善 缺点:外国的定位不是特别准确,成功率低。
"""
ip = proxy.split(':')[0]
try:
r = requests.get(f"http://ip-api.com/json/{ip}?lang=zh-CN&fields=1105951", proxies={"http": f"http://{proxy}"},
timeout=3).json()
logging.info(f"ip: {ip} --> 国家: {r['country']}, 省份: {r['regionName']}, 市县: {r['city']}")
# r = requests.post('http://ip.taobao.com/outGetIpInfo',
# data={'ip': ip, 'accessKey': 'alibaba-inc'}, headers=header,
# proxies={"http": f"http://{proxy}"})
#rJson = r.json()
#result = rJson['data'] if rJson['code'] == 0 else None
#logging.info(f"ip: {ip} --> 国家: {result['country']}, 省份: {result['region']}, 市县: {result['city']}")
# response = requests.get('https://www.baidu.com/', headers=header, proxies={"http": "http://{}".format(proxy)},
# timeout=3)
# return response.status_code is requests.codes.ok and r['countryCode'].upper() == 'CN'
except ReadTimeout as rtE:
logging.warning("获取ip归属地连接超时 -- " + str(rtE) + ' 所在行数:' + str(rtE.__traceback__.tb_lineno))
except UnicodeEncodeError as ueE:
logging.warning('获取ip归属地解码 -- ' + str(ueE) + ' 所在行数:' + str(ueE.__traceback__.tb_lineno))
except Exception as e:
logging.warning('获取ip归属地发生意外错误 -- ' + str(e) + ' 所在行数:' + str(e.__traceback__.tb_lineno))
- 主程序代码
class Credit:
def __init__(self, year, start_page, max_page, location, sets):
"""
初始化
:param year: 爬取年份
:param start_page: 开始页数
:param max_page: 最大页数
:param location: 地区码
:param sets: 任务
"""
def start_mongo(ip, sets):
""" 初始化自动连接MongoDB """
myclient = pymongo.MongoClient(f'mongodb://{ip}:27017/')
db_admin = myclient.admin
db_admin.authenticate(MongoUser, MongoPassword, mechanism='SCRAM-SHA-1')
return myclient[MongoDB][sets]
"""
collection : 操作mongo的游标
session : 当前爬取内容的session (如果重复爬取失败,则模拟重新获取session)
"""
self.collection = start_mongo(MongoIP, sets)
self.session = requests.Session()
"""
startPageSet : 本次任务开始页数 (这个值不变,是sets传入时设置的)
maxPageSet : 本次任务最大页数 (这个值不变,是sets传入时设置的)
nowPage : 当前页数 (按页数循环爬取时,nowPage + 1)
maxPage : 当前任务最大页数 (如果不设置最大页数时,由爬取的信息自动获取爬取的页数)
"""
self.startPageSet = start_page
self.maxPageSet = max_page
self.nowPage = start_page
self.maxPage = start_page + 2 if max_page is None else max_page
self.header1 = None
self.header2 = None
self.form_data = {'page': self.nowPage,
'location': location,
'code': '',
'name': '',
'evalyear': year,
# 'cPage': '',
}
"""
self.proxyPoolAddress : 先拿localhost的IP代理池
self.proxy : 当前的 "ip:port"
self.proxies : 当前的proxy对应的 {"http": "http://ip:port"}
self.cookie : 当前的cookie
self.isChangeAllHeader : 是否需要更改当前的请求头
self.ipUseTimes : ip使用次数
"""
self.proxyPoolAddress = proxyPoolList[0]
self.proxy = None
self.proxies = None
self.cookie = split_cookie2dict(
'yfx_c_g_u_id_10003701=_ck20072111214511275787055858549; yfx_c_g_u_id_10003748=_ck20072215063217551535749866138; yfx_f_l_v_t_10003748=f_t_1595401592745__r_t_1595401592745__v_t_1595401592745__r_c_0; yfx_key_10003748=; yfx_mr_10003748=%3A%3Amarket_type_free_search%3A%3A%3A%3Abaidu%3A%3A%3A%3A%3A%3A%3A%3Awww.baidu.com%3A%3A%3A%3Apmf_from_free_search; yfx_mr_f_10003748=%3A%3Amarket_type_free_search%3A%3A%3A%3Abaidu%3A%3A%3A%3A%3A%3A%3A%3Awww.baidu.com%3A%3A%3A%3Apmf_from_free_search; yfx_mr_10003701=%3A%3Amarket_type_free_search%3A%3A%3A%3Abaidu%3A%3A%3A%3A%3A%3A%3A%3Awww.baidu.com%3A%3A%3A%3Apmf_from_free_search; yfx_key_10003701=; yfx_mr_f_10003701=%3A%3Amarket_type_free_search%3A%3A%3A%3Abaidu%3A%3A%3A%3A%3A%3A%3A%3Awww.baidu.com%3A%3A%3A%3Apmf_from_free_search; Hm_lvt_8875c662941dbf07e39c556c8d97615f=1596166214; _Jo0OQK=739C60DD4BD5D8E6D45066012A62CE7422E4259E3C200B324E3E9D69772F15991F3771E249C184DDD5E75314B28BE26DEDF61C867A6DB66C3D7D86BEEA2448582432DF09B220C8082124ED14892373145994ED1489237314599BE994A6718907CADGJ1Z1dg==; yfx_f_l_v_t_10003701=f_t_1595401277678__r_t_1596436112411__v_t_1596452645819__r_c_4; JSESSIONID=3BD7DED94BFA4651D3293394F7641398; C3VK={c3vk}'.format(
c3vk=get_C3VK()))
self.isChangeAllHeader = True
self.ipUseTimes = 0
"""
requestsTimes : 总请求次数
requestsSuccessTimes : 成功请求次数
exceptsTimes : 触发错误次数
"""
self.requestsTimes = 0
self.requestsSuccessTimes = 0
self.exceptsTimes = 0
""" 下面两个参数是准备为程序增加一个计算时间长短的功能的 后来耽搁了就没写 """
self.rStartTime = time.time()
self.rStopTime = None
logging.info("类初始化完毕 -- 爬取地区:" + str(sets) + ' 起止页数' + str(self.startPageSet) + ' - ' + str(
self.maxPageSet) + ", 目前起止页数: " + str(self.nowPage) + ' - ' + str(self.maxPage))
def get_header(self):
user_agent = random.choice(UserAgents)
self.header1 = {
'User-Agent': user_agent,
'Accept-Encoding': 'gzip, deflate',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
'Connection': 'keep-alive',
}
self.header2 = {
'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Connection': 'keep-alive',
'Content-Type': "application/x-www-form-urlencoded; charset=utf-8",
'User-Agent': user_agent,
'Origin': 'http://hd.chinatax.gov.cn',
'Referer': "http://hd.chinatax.gov.cn/nszx/InitCredit.html",
'X-Requested-With': 'XMLHttpRequest',
}
if self.nowPage >= 1:
self.header2['Content-Length'] = b'55'
def get_proxy(self):
try:
self.proxy = requests.get(f"http://{self.proxyPoolAddress}/get/", timeout=3).json().get("proxy")
except ReadTimeout:
logging.warning("获取ip时出现严重错误,需要重启ip池程序 -- ")
self.proxies = {"http": "http://{}".format(self.proxy)}
def delete_proxy(self):
try:
requests.get(f"http://{self.proxyPoolAddress}/delete/?proxy={self.proxy}", timeout=3)
except ReadTimeout as rtE:
logging.warning("删除ip时出现严重错误,需要重启ip池程序 -- ")
self.proxy, self.proxies = None, None
self.ipUseTimes, self.exceptsTimes = 0, 0
self.isChangeAllHeader = True
def get(self):
"""
程序主函数,大致的逻辑是:
1. 先判断是否需要更换请求头,如果不需要,则直接对Credit.do进行请求数据,这样的目的是为了节省ip代理池的开销。
2. 如果代理池全部消耗完毕,那么会进入死循环,直到某个代理池中的有ip才会恢复爬取,也可在进行time.sleep()
3. 程序如果成功获取数据,那么数据会利用mongo的游标进行全部插入数据,并在内存中销毁此数据,以节省内存消耗。
4. 成功插入数据后,代表这个ip是可用的,那么会让他连续用10次,10次内什么时候获取不到数据什么时候更换新的ip地址。
事项: 在程序第一和第二版中,我对各个环节都会做出判断并直接进行continue,结果成功率非常低,本程序的逻辑不是特别严谨,可能在获取cookie时失败,但会成功获取到数据,后经改良,采用try..except..这种方式, 大大增加了成功率
:return:
"""
while self.nowPage < self.maxPage:
if self.isChangeAllHeader:
self.get_header()
self.get_proxy()
if self.proxy is None:
logging.info(f"{self.proxyPoolAddress} ip池 消耗完毕……")
self.proxyPoolAddress = random.choice(proxyPoolList)
# time.sleep(60)
continue
self.session = requests.Session()
self.session.headers = CaseInsensitiveDict(self.header1)
self.session.proxies = self.proxies
checkip(self.proxy)
try:
if self.isChangeAllHeader:
s = self.session.get(homeUrl, headers=self.header1, proxies=self.proxies, verify=False,
timeout=3)
logging.info("Session.get 状态码 --> " + str(s.status_code))
self.cookie.update(self.session.cookies.get_dict())
self.cookie['C3VK'] = get_C3VK()
self.form_data['page'] = self.nowPage
r = self.session.post(url, data=self.form_data, headers=self.header2, cookies=self.cookie,
proxies=self.proxies, verify=False, timeout=4)
logging.info("Session.post 状态码 --> " + str(r.status_code))
results = r.json()
recordsList = [{'code': each['code'], 'name': each['name'], 'evalyear': each['evalyear'],
'location': each['location'], 'pagenum': results['pageable']['pageNumber']} for each in
results['content']]
if not isTest:
self.collection.insert_many(recordsList)
[logging.info("成功获取并插入数据 --> " + str(records)) for records in recordsList]
del recordsList
if self.maxPageSet is None:
with open('../out/maxPage.txt', 'a', encoding='utf-8') as f:
f.write(str(self.form_data) + ' - 最大页数 - ' + str(results['totalPages']) + '\n')
logging.info("已更新最新的最大页数 -- " + str(results['totalPages']) + ' 页……')
self.maxPage = results['totalPages']
self.maxPageSet = 1
# 可以使用的ip 连续用十次
self.ipUseTimes += 1
if self.ipUseTimes < 10:
self.isChangeAllHeader = False
else:
self.ipUseTimes = 0
self.isChangeAllHeader = True
self.nowPage += 1
self.requestsSuccessTimes += 1
except ReadTimeout as rtE:
logging.warning("连接超时1 -- " + str(rtE) + ' 所在行数:' + str(rtE.__traceback__.tb_lineno))
self.delete_proxy()
continue
except ConnectTimeout as ctE:
logging.warning("连接超时2 -- " + str(ctE) + ' 所在行数:' + str(ctE.__traceback__.tb_lineno))
self.delete_proxy()
continue
except ProxyError as pE:
logging.warning("连接错误3 -- " + str(pE) + ' 所在行数:' + str(pE.__traceback__.tb_lineno))
self.delete_proxy()
continue
except JSONDecodeError as jsdE:
logging.warning("爬取信息失败 -- " + str(jsdE) + ' 所在行数:' + str(jsdE.__traceback__.tb_lineno))
self.exceptsTimes += 1
if self.exceptsTimes == 3:
self.delete_proxy()
continue
except Exception as e:
logging.warning("爬取信息失败2 -- " + str(e) + ' 所在行数:' + str(e.__traceback__.tb_lineno))
self.exceptsTimes += 1
if self.exceptsTimes == 3:
self.delete_proxy()
continue
finally:
self.requestsTimes += 1
if self.requestsTimes % 100 == 0:
logging.info(
f"已请求100次, 共请求了{self.requestsTimes}次, 成功爬取了{self.nowPage - self.startPageSet}页, 成功率为 --> [{round(self.requestsSuccessTimes / self.requestsTimes, 2) * 100}%]" + f" -- 目前正在使用 [{self.proxyPoolAddress} 池]")
logging.warning("程序结束了 -- 目前起止页数: " + str(self.nowPage) + ' - ' + str(self.maxPage) + 'form_data --> ' + str(self.form_data))
- 主程序入口和多进程
if __name__ == '__main__':
if processNum <= 1 or len(jobs) <= 1:
for job in jobs:
credit = Credit(*job)
credit.get()
else:
pool = Pool(processes=processNum)
for job in jobs:
pool.apply_async(func=run, args=(*job,))
pool.close()
pool.join()
- 代码中很多设置项的来源 settings.py
homeUrl = 'http://hd.chinatax.gov.cn/nszx/InitCredit.html'
url = 'http://hd.chinatax.gov.cn/service/findCredit.do'
UserAgents = [
'Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10_6_8; en-us) AppleWebKit/534.50 (KHTML, like Gecko) Version/5.1 Safari/534.50',
'Mozilla/5.0 (Windows; U; Windows NT 6.1; en-us) AppleWebKit/534.50 (KHTML, like Gecko) Version/5.1 Safari/534.50',
'Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0',
"Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.0; Trident/4.0)",
'Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.0)',
"Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1)",
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.6; rv:2.0.1) Gecko/20100101 Firefox/4.0.1',
'Mozilla/5.0 (Windows NT 6.1; rv:2.0.1) Gecko/20100101 Firefox/4.0.1',
'Opera/9.80 (Macintosh; Intel Mac OS X 10.6.8; U; en) Presto/2.8.131 Version/11.11',
'Opera/9.80 (Windows NT 6.1; U; en) Presto/2.8.131 Version/11.11',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_0) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.56 Safari/535.11',
'Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Maxthon 2.0)',
'Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; TencentTraveler 4.0)',
'Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1)',
'Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; The World)',
"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Trident/4.0; SE 2.X MetaSr 1.0; SE 2.X MetaSr 1.0; .NET CLR 2.0.50727; SE 2.X MetaSr 1.0)",
'Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; 360SE)',
'Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Avant Browser)',
'Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1)',
]
# Credit.do 地址码对应的城市名称
locations = {'110000': '北京', '120000': '天津', '130000': '河北', '140000': '山西', '150000': '内蒙古', '210000': '辽宁',
'210200': '大连', '220000': '吉林', '230000': '黑龙江', '310000': '上海', '320000': '江苏', '330000': '浙江',
'330200': '宁波', '340000': '安徽', '350000': '福建', '350200': '厦门', '360000': '江西', '370000': '山东',
'370200': '青岛', '410000': '河南', '420000': '湖北', '430000': '湖南', '440000': '广东', '440300': '深圳',
'450000': '广西', '460000': '海南', '500000': '重庆', '510000': '四川', '520000': '贵州', '530000': '云南',
'540000': '西藏', '610000': '陕西', '620000': '甘肃', '630000': '青海', '640000': '宁夏', '650000': '新疆'}
MongoIP = "your mongo db IP"
MongoUser = "your mongo db User"
MongoPassword = 'your mongo db PassWord'
MongoDB = 'your mongo db Name'
MongoSets = "当前sets任务所对应的 数据库名称"
def get_jobs(location, locatoin_name):
""" 如果懒得像下面自定义当前任务 则可以在job后面增加一个get_jobs()函数即可 """
return [(str(year), 0, None, location, locatoin_name) for year in range(2014, 2020)]
# task 任务 dict --> 年份,起始页数,最大页数,地区
# ex:
jobs = [
('2019', 0, 3000, '370000', 'shandong'),
('2019', 3000, 6000, '370000', 'shandong'),
('2019', 6000, 7423, '370000', 'shandong'),
] + get_jobs("110000", 'beijing') \
+ get_jobs("120000", 'tianjin')
# 可在这增加多个ip代理池对应的地址
proxyPoolList = ['localhost:5010']
# 多进程模块 当前多进程数量
processNum = len(jobs)
# 是否是测试模式
isTest = False
结语
我自己用下来成功率还蛮高的,大概30%-70%吧,这个成功率主要是由当前ip代理池决定的,有兴趣可以自行尝试。由于有很多失误浪费了时间,我爬了所有数据用了半个月,如果不出错7-10天应该可以爬完,但是想想又要会对税务局的网站进很大的压力,请各位谨慎尝试。
转载:https://blog.csdn.net/Wenzhe339/article/details/108307808