第一次尝试用爬虫爬取每个分区热度排名视频信息,然后进行数据分析,但是哔哩哔哩的API接口很奇怪,我用了付费代理IP也没有一个能够访问。很烦,所以想用这篇文章来爬取的话,就需要调慢速度来爬了,几百条数据还好,上万条就太太太慢了。
from selenium import webdriver
import re
import csv
import time
class Bilibili_data:
def __init__(self):
self.start_url="https://www.bilibili.com/"
self.driver=webdriver.Chrome()
def get_item_list(self):
list = self.driver.find_elements_by_xpath("//*[@id='primaryChannelMenu']/*/*/*/span")
item_list = []
i = 0
for element in list:
item = {}
str = re.sub("[A-Za-z0-9\!\%\[\]\+\。]", "", element.text)
item["str"] = str
item["url"] = element.find_element_by_xpath("./..")
item_list.append(item)
i = i+1
if i == 15:
break
return item_list
def get_item_detail(self,url):
url.click()
list = self.driver.find_elements_by_xpath("//ul[@class='clearfix']/*[position()>1]/*")
i = 0
item_detail = []
for element in list:
item = {}
item["str"] = str(i) + ':' + element.text
i = i + 1
item["url"] = element
item_detail.append(item)
return item_detail
def choose_time(self,url):
url_last = "#/all/{}/0/1/{}"
item = ['click', 'scores', 'stow', 'coin', 'dm']
cn_item = ['播放数','评论数','收藏数','硬币数','弹幕数']
num = 0
for i in cn_item:
print(str(num) + ':' + i)
num = num+1
item_choice = int(input('请输入你选择的排序:'))
time_choice = input('请输入时间段(例如 2020-01-01,2020-01-07):')
url = url + url_last.format(item[item_choice],time_choice)
self.driver.get(url)
def get_content_list(self):
li_list = self.driver.find_elements_by_xpath("//ul[@class='vd-list mod-2']/li")
content_list = []
for li in li_list:
video_detail = {}
video_detail['title'] = li.find_element_by_xpath(".//div[@class='r']/a").text
video_detail['author'] = li.find_element_by_xpath(".//div[@class='up-info']/a").text
video_detail['href'] = li.find_element_by_xpath(".//div[@class='r']/a").get_attribute("href")
content_list.append(video_detail)
print(content_list)
next_url = self.driver.find_elements_by_xpath("//button[@class='nav-btn iconfont icon-arrowdown3']")
next_url = next_url[0] if len(next_url) > 0 else None
return content_list,next_url
def save_content_list(self,content_list):
header = ['title','author','href','bvid','view','danmu','reply_num','like_num','coin_num','favorite_num','share_num','video_type','video_time','video_rank','video_tag']
with open('video.csv', 'a', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=header) # 提前预览列名,当下面代码写入数据时,会将其一一对应。
writer.writerows(content_list) # 写入数据
def run(self):
header = ['title','author','href','bvid','view','danmu','reply_num','like_num','coin_num','favorite_num','share_num','video_type','video_time','video_rank','video_tag']
with open('video.csv', 'a', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=header) # 提前预览列名,当下面代码写入数据时,会将其一一对应。
writer.writeheader() # 写入列名
self.driver.get(self.start_url)
list = self.get_item_list()
num = 0
for i in list:
print(str(num) + ':' + i['str'])
num = num+1
choice1 = int(input("请输入你选择的分区:"))
item_detail = self.get_item_detail(list[choice1]['url'])
for detail in item_detail:
print(detail['str'])
choice2 = int(input("请输入你选择的分类:"))
url_detail = item_detail[choice2]['url'].get_attribute("href")
self.choose_time(url_detail)
content_list,next_url = self.get_content_list()
self.save_content_list(content_list)
while next_url is not None:
next_url.click()
time.sleep(3)
content_list, next_url=self.get_content_list()
self.save_content_list(content_list)
self.driver.quit()
if __name__=="__main__":
data_get=Bilibili_data()
data_get.run()
import requests
import json
from bs4 import BeautifulSoup
import re
import bs4
import time
import csv
import random
class detail_data:
def __init__(self):
# 爬虫地址
self.alphabet = 'fZodR9XQDSUm21yCkr6zBqiveYah8bt4xsWpHnJE7jL5VG3guMTKNPAwcF'
def dec(self, x): # BV号转换成AV号
r = 0
for i, v in enumerate([11, 10, 3, 8, 4, 6]):
r += self.alphabet.find(x[v]) * 58 ** i
return (r - 0x2_0840_07c0) ^ 0x0a93_b324
def url_deal(self, url):
url = url[-12:]
return url
def get_time(self, url):
# proxy = self.random_data('ip.txt')
headers = self.random_data('headers.txt')
r = requests.get(url,timeout=30,headers=headers)
r.raise_for_status()
soup = BeautifulSoup(r.text, "html.parser")
result = soup.find(class_='video-data')
timedata = []
for i in result:
if type(i) == bs4.element.Tag:
timedata.append(re.sub('\s', ' ', i.text))
else:
timedata.append('None')
return timedata
def random_data(self, path):
with open(path, 'r') as f: # 这里打开的就是你之前爬取ip保存的地址
data = f.readlines()
f.close()
reg = []
for i in data:
k = eval(i) # 将字符串转化为字典形式
reg.append(k)
ip = random.choice(reg) # 随机从reg这个存储了多个ip的list 里返回一个ip地址
return ip
def save_content_list(self,video_dict):
header = ['title', 'author', 'href', 'bvid', 'view', 'danmu', 'reply_num', 'like_num', 'coin_num', 'favorite_num','share_num', 'video_type', 'video_time', 'video_rank', 'video_tag']
with open('video_data.csv', 'a', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=header) # 提前预览列名,当下面代码写入数据时,会将其一一对应。
writer.writerow(video_dict) # 写入数据
def run(self, BV, url_video,title,author):
bid = BV
aid = self.dec(bid)
url = r'https://api.bilibili.com/x/web-interface/archive/stat?aid=' + str(aid)
# 携带cookie进行访问
url2 = r'https://api.bilibili.com/x/web-interface/view/detail/tag?aid=' + str(aid)
# proxy = self.random_data('ip.txt')
# proxy2 = self.random_data('ip.txt')
headers = self.random_data('headers.txt')
response = requests.get(url, timeout=30, headers=headers)
headers2 = self.random_data('headers.txt')
response2 = requests.get(url2, timeout=30, headers=headers2)
time.sleep(3)
if response.status_code == 200:
text = response.text
text2 = response2.text
jsonobj = json.loads(text)
jsonobj2 = json.loads(text2)
video_tags=''
for tags in jsonobj2['data']:
video_tags=video_tags+tags['tag_name']+' '
timedata = self.get_time(url_video)
# 从Json对象获取视频基本信息并转入词典中
video_dict = {'title': title,
'author': author,
'href': url_video,
'bvid': jsonobj['data']['bvid'],
'view': jsonobj['data']['view'],
'danmu': jsonobj['data']['danmaku'],
'reply_num': jsonobj['data']['reply'],
'like_num': jsonobj['data']['like'],
'coin_num': jsonobj['data']['coin'],
'favorite_num': jsonobj['data']['favorite'],
'share_num': jsonobj['data']['share'],
'video_type': timedata[0],
'video_time': timedata[1],
'video_rank': timedata[2],
'video_tag': video_tags
}
return video_dict
if __name__ == '__main__':
Detail_data = detail_data()
header = ['title', 'author', 'href', 'bvid', 'view', 'danmu', 'reply_num', 'like_num', 'coin_num', 'favorite_num','share_num', 'video_type', 'video_time', 'video_rank', 'video_tag']
with open('video_data.csv', 'a', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=header) # 提前预览列名,当下面代码写入数据时,会将其一一对应。
writer.writeheader() # 写入列名
with open(r'video.csv', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
url = row['href']
BV = Detail_data.url_deal(url)
video_dict = Detail_data.run(BV, url,row['title'],row['author'])
print(video_dict)
Detail_data.save_content_list(video_dict)