前言:本篇爬虫只做经验交流,不可用于其他用途,如果转载,请著名出处和链接即可
希望: 喜欢博主的小伙伴,希望点个关注哦~,更多爬虫项目请收藏本栏目,不定期添加干货
注意: 因为抖音那边更新版本的速度还挺快,不能保证这个博文一直能用,但是爬虫思想不会变,只要学会了思想,无论怎么变,相信你都能解决的
目录
一、抖音视频分析
①、打开抖音app,搜索想要爬取视频的抖音音乐人
②、找到主页右上角三个点
③、点击其中的分享
④、选择其中的复制链接
二、分析复制的短链接
①浏览器打开复制的链接
- 发现短连接被还原成了原始链接
- 暂时我们不知道需要下载视频需要的是什么参数,后期通过分析知道只需要sec_uid这个参数即可
- pc端打开会出现样式问题,我们选择移动端打开,模拟手机环境
- 因此,爬虫也需要模拟移动端环境进行请求
②、使用移动端打开
③、浏览器抓包
- 点击作品和喜欢,可以抓包到请求的链接,这个我们稍后分析
- 因为现在抖音app有隐藏自己收藏的功能,所以如果用户设置了隐藏,你将无法下载用户收藏的视频哦
④、分析请求链接
- 通过抓包我们知道
- 用户上传的数据链接为:https://www.iesdouyin.com/web/api/v2/aweme/post/?sec_uid=MS4wLjABAAAAYSkDWl4l-zUBPSEtsuxKHbgy5uIcwokC-OpwJrzOPmU&count=21&max_cursor=0&aid=1128&_signature=Bth1lgAAWa-w6IbGQlE-1gbYdY&dytk=
- 用户收藏的数据链接为:https://www.iesdouyin.com/web/api/v2/aweme/like/?sec_uid=MS4wLjABAAAAYSkDWl4l-zUBPSEtsuxKHbgy5uIcwokC-OpwJrzOPmU&count=21&max_cursor=0&aid=1128&_signature=Bth1lgAAWa-w6IbGQlE-1gbYdY&dytk=
- 请求结果如下,因抖音的检测机制,你需要刷很多次才有数据,这个我们通过循环判断aweme_list中是否有数据即可
- 多次刷新后的结果如下
三、分析返回的数据
①看整体数据,将数组折叠先
- status_code: 不用说,肯定是返回状态
- aweme_list: 存放视频信息的数组,等会具体分析
- max_cursor/min_cursor: 这个盲猜都知道用来分页的指针,如果是多页,且请求的不是第一页,需要传其中的某个值,这个暂时不讨论
- has_more: 是否有多页
- extra: 额外的信息,当前请求的毫秒级时间戳,以及logid,这个不重要,抖音那边用来日志记录的
②、分析视频信息
- 可以看出每个视频有两个链接,自己访问一下就知道,一个链接是用户上传的原视频,另一个是抖音那边加了水印的视频
- 其实到这里大家都知道无水印视频如何下载了,不必赘述了
四、下载源码
- 其实通过分析,抖音下载只需要拿到sec_uid这个参数即可
- 浏览器打开分享的短连接,就能看到地址栏这个参数
-
#!/usr/bin/env python
-
# encoding: utf-8
-
'''
-
#-------------------------------------------------------------------
-
# CONFIDENTIAL --- CUSTOM STUDIOS
-
#-------------------------------------------------------------------
-
#
-
# @Project Name : 抖音下载小助手
-
#
-
# @File Name : main.py
-
#
-
# @Programmer : Felix
-
#
-
# @Start Date : 2020/7/30 14:42
-
#
-
# @Last Update : 2020/7/30 14:42
-
#
-
#-------------------------------------------------------------------
-
'''
-
import os, sys, requests
-
import json, re, time
-
from retrying
import retry
-
from contextlib
import closing
-
-
class DouYin:
-
'''
-
This is a main Class, the file contains all documents.
-
One document contains paragraphs that have several sentences
-
It loads the original file and converts the original file to new content
-
Then the new content will be saved by this class
-
'''
-
def __init__(self):
-
'''
-
Initial the custom file by some url
-
'''
-
self.headers = {
-
'accept':
'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
-
'accept-language':
'zh-CN,zh;q=0.9,en;q=0.8',
-
'pragma':
'no-cache',
-
'cache-control':
'no-cache',
-
'upgrade-insecure-requests':
'1',
-
'User-Agent':
'Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0.3 Mobile/15E148 Safari/604.1',
-
}
-
-
def hello(self):
-
'''
-
This is welcome speech
-
:return: self
-
'''
-
print(
"*" *
50)
-
print(
' ' *
15 +
'抖音下载小助手')
-
print(
' ' *
5 +
'作者: Felix Date: 2020-05-20 13:14')
-
print(
' ' *
15 +
'无水印 | 有水印')
-
print(
' ' *
12 +
'输入用户的sec_uid即可')
-
print(
' ' *
2 +
'用浏览器打开用户分享链接,复制参数中sec_uid')
-
print(
"*" *
50)
-
return self
-
-
def get_video_urls(self, sec_uid, type_flag='p'):
-
'''
-
Get the video link of user
-
:param type_flag: the type of video
-
:return: nickname, video_list
-
'''
-
user_url_prefix =
'https://www.iesdouyin.com/web/api/v2/aweme/post'
if type_flag ==
'p'
else
'https://www.iesdouyin.com/web/api/v2/aweme/like'
-
print(
'---解析视频链接中...\r')
-
-
i =
0
-
result = []
-
while result == []:
-
i = i +
1
-
print(
'---正在第 {} 次尝试...\r'.format(str(i)))
-
user_url = user_url_prefix +
'/?sec_uid=%s&count=2000' % (sec_uid)
-
response = self.get_request(user_url)
-
html = json.loads(response.content.decode())
-
if html[
'aweme_list'] != []:
-
result = html[
'aweme_list']
-
-
nickname =
None
-
video_list = []
-
for item
in result:
-
if nickname
is
None:
-
nickname = item[
'author'][
'nickname']
if re.sub(
r'[\/:*?"<>|]',
'', item[
'author'][
'nickname'])
else
None
-
-
video_list.append({
-
'desc': re.sub(
r'[\/:*?"<>|]',
'', item[
'desc'])
if item[
'desc']
else
'无标题' + str(int(time.time())),
-
'url': item[
'video'][
'play_addr'][
'url_list'][
0]
-
})
-
return nickname, video_list
-
-
def get_download_url(self, video_url, watermark_flag):
-
'''
-
Whether to download watermarked videos
-
:param video_url: the url of video
-
:param watermark_flag: the type of video
-
:return: the url of o
-
'''
-
if watermark_flag ==
True:
-
download_url = video_url.replace(
'api.amemv.com',
'aweme.snssdk.com')
-
else:
-
download_url = video_url.replace(
'aweme.snssdk.com',
'api.amemv.com')
-
-
return download_url
-
-
def video_downloader(self, video_url, video_name, watermark_flag=False):
-
'''
-
Download the video
-
:param video_url: the url of video
-
:param video_name: the name of video
-
:param watermark_flag: the flag of video
-
:return: None
-
'''
-
size =
0
-
video_url = self.get_download_url(video_url, watermark_flag=watermark_flag)
-
with closing(requests.get(video_url, headers=self.headers, stream=
True))
as response:
-
chunk_size =
1024
-
content_size = int(response.headers[
'content-length'])
-
if response.status_code ==
200:
-
sys.stdout.write(
'----[文件大小]:%0.2f MB\n' % (content_size / chunk_size /
1024))
-
-
with open(video_name +
'.mp4',
'wb')
as file:
-
for data
in response.iter_content(chunk_size=chunk_size):
-
file.write(data)
-
size += len(data)
-
file.flush()
-
-
sys.stdout.write(
'----[下载进度]:%.2f%%' % float(size / content_size *
100) +
'\r')
-
sys.stdout.flush()
-
-
@retry(stop_max_attempt_number=3)
-
def get_request(self, url, params=None):
-
'''
-
Send a get request
-
:param url: the url of request
-
:param params: the params of request
-
:return: the result of request
-
'''
-
if params
is
None:
-
params = {}
-
response = requests.get(url, params=params, headers=self.headers, timeout=
10)
-
assert response.status_code ==
200
-
return response
-
-
@retry(stop_max_attempt_number=3)
-
def post_request(self, url, data=None):
-
'''
-
Send a post request
-
:param url: the url of request
-
:param data: the params of request
-
:return: the result of request
-
'''
-
if data
is
None:
-
data = {}
-
response = requests.post(url, data=data, headers=self.headers, timeout=
10)
-
assert response.status_code ==
200
-
return response
-
-
def run(self):
-
'''
-
Program entry
-
'''
-
sec_uid = input(
'请输入用户sec_uid:')
-
sec_uid = sec_uid
if sec_uid
else
'MS4wLjABAAAAle_oORaZCgYlB84cLTKSqRFvDgGmgrJsS6n3TfwxonM'
-
-
watermark_flag = input(
'是否下载带水印的视频 (0-否(默认), 1-是):')
-
watermark_flag = bool(int(watermark_flag))
if watermark_flag
else
0
-
-
type_flag = input(
'p-上传的(默认), l-收藏的:')
-
type_flag = type_flag
if type_flag
else
'p'
-
-
save_dir = input(
'保存路径 (默认"./Download/"):')
-
save_dir = save_dir
if save_dir
else
"./Download/"
-
-
nickname, video_list = self.get_video_urls(sec_uid, type_flag)
-
nickname_dir = os.path.join(save_dir, nickname)
-
-
if
not os.path.exists(nickname_dir):
-
os.makedirs(nickname_dir)
-
-
if type_flag ==
'f':
-
if
'favorite'
not
in os.listdir(nickname_dir):
-
os.mkdir(os.path.join(nickname_dir,
'favorite'))
-
-
print(
'---视频下载中: 共有%d个作品...\r' % len(video_list))
-
-
for num
in range(len(video_list)):
-
print(
'---正在解析第%d个视频链接 [%s] 中,请稍后...\n' % (num +
1, video_list[num][
'desc']))
-
-
video_path = os.path.join(nickname_dir, video_list[num][
'desc'])
if type_flag !=
'f'
else os.path.join(nickname_dir,
'favorite', video_list[num][
'desc'])
-
if os.path.isfile(video_path):
-
print(
'---视频已存在...\r')
-
else:
-
self.video_downloader(video_list[num][
'url'], video_path, watermark_flag)
-
print(
'\n')
-
print(
'---下载完成...\r')
-
-
if __name__ ==
"__main__":
-
DouYin().hello().run()
五、运行结果:
- 其中重试的次数,就是在连续请求那个数据连接,直达list中有数据
- 可以发现已经可以正常下载了
猜你喜欢:
「python爬虫实战」使用多进程教你下载M3U8加密或非加密视频
转载:https://blog.csdn.net/weixin_41635750/article/details/108866425
查看评论