因为上手练习一个自己的数据分析项目,因此需要爬取数据。经历过两个版本的更新后,终于写出了第三版。期间也学会了selenium库的运用,API接口的调用,IP池等。
确定目标
因为想要一个量大的数据集,因此没有考虑热榜排名,因为所有区加起来也才一千左右。全部视频信息的话技术不行,然后就盯上了分区榜。
从这个榜单可以选择时间段,可以根据每个月的视频热度排名等信息,来分析月度热点,哪些视频更加容易火,以及各种因素对视频播放量的影响。虽然只是一个小分区月度热度排名,并不包含全部视频,但是数据量也是极大的。下图可以看到接近有23万条数据。
网站分析
这里存在一个难点,就是虽然浏览器上是可以查看网页源码,并且包含了视频的相关信息,但是用requests请求之后的网页源码却并没有相关的信息。因此前两个版本,我采用了selenium库的方法来获取信息,但是这个方法有一个缺点,速度慢(因为要跟浏览器一样加载整个页面信息)、信息少(只有标题、作者、视频简介、以及视频页和个人主页网址),很麻烦。于是这次我换成了API调用的方法。
我们选择一个具体的数字来查找,可以发现搜索出来一个search的接口。
点进去之后,可以发现里面的result共有20条数据,刚好对应着每页20个视频。
可以看到里面包含了作者、标题、标签、播放等一系列数据。
接口为https://s.search.bilibili.com/cate/search?callback=main_ver=v3&search_type=video&view_type=hot_rank&order=click©_right=-1&cate_id=138&page=1&pagesize=20&jsonp=jsonp&time_from=20200801&time_to=20200831
,view_type为排行类型,page为页面数,pagesize为页面最大的视频数,上限好像是100。最后面就是时间了。
但是我还需要三连数据以及UP主的粉丝量。同理分析
得到三连的API接口:https://api.bilibili.com/x/web-interface/archive/stat?aid=371876135
其中aid由BV转换。
粉丝数为https://api.bilibili.com/x/relation/stat?vmid=32172331
mid可以在第一个接口处获取。
IP池
这时候虽然已经可以开始爬取了,但是如果数据量稍微有一点大,访问稍微有点频繁,就会导致IP被屏蔽。
这时候我们就需要用到代理IP,免费的代理IP虽然也有,而且GITHUB上也有专门的项目来建立代理IP池。但是免费的终究很麻烦,于是我选择了日租独享的IP。http://www.xdaili.cn/
代码
# coding: utf-8
# Author:南岛鹋
# Blog: www.ndmiao.cn
# Date :2020/8/25 10:29
# Tool :PyCharm
import requests
import csv
import json
import random
import time
class video_data:
def __init__(self):
self.url = 'https://s.search.bilibili.com/cate/search?main_ver=v3&search_type=video&view_type=hot_rank&order=click©_right=-1&cate_id=138&page={}&pagesize=20&jsonp=jsonp&time_from=20200801&time_to=20200831'
self.page = 11507
self.alphabet = 'fZodR9XQDSUm21yCkr6zBqiveYah8bt4xsWpHnJE7jL5VG3guMTKNPAwcF'
def dec(self, x): # BV号转换成AV号
r = 0
for i, v in enumerate([11, 10, 3, 8, 4, 6]):
r += self.alphabet.find(x[v]) * 58 ** i
return (r - 0x2_0840_07c0) ^ 0x0a93_b324
def random_headers(self, path): # 随机读取一个头信息
with open(path, 'r') as f:
data = f.readlines()
f.close()
reg = []
for i in data:
k = eval(i) # 将字符串转化为字典形式
reg.append(k)
header = random.choice(reg)
return header
def get_ip(self): # 代理IP获取
print('切换IP中.......')
url = '代理IP的地址'
ip = requests.get(url).text
if ip in ['{"ERRORCODE":"10055","RESULT":"提取太频繁,请按规定频率提取!"}', '{"ERRORCODE":"10098","RESULT":"可用机器数量不足"}']: # 出现频繁或者机器不足,睡眠14秒
time.sleep(14)
ip = requests.get(url).text
print(ip)
else:
print(ip)
proxies = {
'https': 'http://' + ip,
'http': 'http://' + ip
} # 设置https和http可以按需选择
return proxies
def get_requests(self, url, proxy): # 请求的函数
headers = self.random_headers('headers.txt')
# 将头信息和IP写入,用try来减少意外对程序的影响
try:
response = requests.get(url, timeout=3, headers=headers, proxies=proxy)
except requests.exceptions.RequestException as e:
print(e)
proxy = self.get_ip()
try:
response = requests.get(url, timeout=3, headers=headers, proxies=proxy)
except requests.exceptions.RequestException as e:
print(e)
print('原始IP')
response = requests.get(url, timeout=3, headers=headers)
return response, proxy
def get_follower(self, mid, proxy): # 获取粉丝数
url = 'https://api.bilibili.com/x/relation/stat?vmid=' + str(mid)
r, proxy = self.get_requests(url, proxy)
result = json.loads(r.text) # 用json来解析文本
# 按照需求获取需要的数据,因为粉丝数是必定存在的,所以失败了需要多次尝试获取。
try:
follower = result['data']['follower']
except:
follower,proxy = self.get_follower(mid, proxy)
return follower, proxy
def get_view(self, BV, proxy): # 获取三连和播放
aid = self.dec(BV)
url = 'https://api.bilibili.com/x/web-interface/archive/stat?aid=' + str(aid)
r, proxy = self.get_requests(url, proxy)
result = json.loads(r.text)
view = {}# 因为视频虽然在排行榜,但是很可能已经删除,所以没有数据为None
try:
view['view'] = result['data']['view']
view['danmu'] = result['data']['danmaku']
view['reply'] = result['data']['reply']
view['like'] = result['data']['like']
view['coin'] = result['data']['coin']
view['favorite'] = result['data']['favorite']
view['share'] = result['data']['share']
view['rank'] = result['data']['his_rank']
except:
view['view'] = 'None'
view['danmu'] = 'None'
view['reply'] = 'None'
view['like'] = 'None'
view['coin'] = 'None'
view['favorite'] = 'None'
view['share'] = 'None'
view['rank'] = 'None'
return view, proxy
def get_parse(self, result, proxy): # 整合数据
content = []
items = result['result']
for item in items:
pubdate = item['pubdate']
title = item['title']
author = item['author']
bvid = item['bvid']
mid = item['mid']
follower, proxy = self.get_follower(mid, proxy)
video_view, proxy = self.get_view(bvid, proxy)
view = video_view['view']
danmu = video_view['danmu']
reply = video_view['reply']
like = video_view['like']
coin = video_view['coin']
favorite = video_view['favorite']
share = video_view['share']
rank = video_view['rank']
tag = item['tag']
con = [pubdate, title, author, bvid, mid, follower,view,danmu,reply,like,coin,favorite,share,rank, tag]
content.append(con)
print(con)
print(content)
self.save(content)
return proxy
def write_header(self):
header = ['日期', '标题', '作者', 'BV', 'mid', '粉丝', '播放', '弹幕', '评论', '点赞','硬币','收藏','转发','排名','标签']
with open('fun_video.csv', 'a', encoding='gb18030', newline='')as f:
write = csv.writer(f)
write.writerow(header)
def save(self,content):# 存入csv
with open('fun_video.csv', 'a', encoding='gb18030', newline='')as file:
write = csv.writer(file)
write.writerows(content)
def run(self):
#self.write_header()
proxy = self.get_ip()
for i in range(168, self.page):
url = self.url.format(i)
response, proxy = self.get_requests(url, proxy)
result = json.loads(response.text)
proxy = self.get_parse(result, proxy)
print('第{}页爬取完毕'.format(i))
if __name__ == '__main__':
video = video_data()
video.run()