第一次尝试用爬虫爬取每个分区热度排名视频信息,然后进行数据分析,但是哔哩哔哩的API接口很奇怪,我用了付费代理IP也没有一个能够访问。很烦,所以想用这篇文章来爬取的话,就需要调慢速度来爬了,几百条数据还好,上万条就太太太慢了。

from selenium import webdriver
import re
import csv
import time
class Bilibili_data:
    def __init__(self):
        self.start_url="https://www.bilibili.com/"
        self.driver=webdriver.Chrome()
    def get_item_list(self):
        list = self.driver.find_elements_by_xpath("//*[@id='primaryChannelMenu']/*/*/*/span")
        item_list = []
        i = 0
        for element in list:
            item = {}
            str = re.sub("[A-Za-z0-9\!\%\[\]\+\。]", "", element.text)
            item["str"] = str
            item["url"] = element.find_element_by_xpath("./..")
            item_list.append(item)
            i = i+1
            if i == 15:
                break
        return item_list
    def get_item_detail(self,url):
        url.click()
        list = self.driver.find_elements_by_xpath("//ul[@class='clearfix']/*[position()>1]/*")
        i = 0
        item_detail = []
        for element in list:
            item = {}
            item["str"] = str(i) + ':' + element.text
            i = i + 1
            item["url"] = element
            item_detail.append(item)
        return item_detail

    def choose_time(self,url):
        url_last = "#/all/{}/0/1/{}"
        item = ['click', 'scores', 'stow', 'coin', 'dm']
        cn_item = ['播放数','评论数','收藏数','硬币数','弹幕数']
        num = 0
        for i in cn_item:
            print(str(num) + ':' + i)
            num = num+1
        item_choice = int(input('请输入你选择的排序:'))
        time_choice = input('请输入时间段(例如 2020-01-01,2020-01-07):')
        url = url + url_last.format(item[item_choice],time_choice)
        self.driver.get(url)

    def get_content_list(self):
        li_list = self.driver.find_elements_by_xpath("//ul[@class='vd-list mod-2']/li")
        content_list = []
        for li in li_list:
            video_detail = {}
            video_detail['title'] = li.find_element_by_xpath(".//div[@class='r']/a").text
            video_detail['author'] = li.find_element_by_xpath(".//div[@class='up-info']/a").text
            video_detail['href'] = li.find_element_by_xpath(".//div[@class='r']/a").get_attribute("href")
            content_list.append(video_detail)
        print(content_list)
        next_url = self.driver.find_elements_by_xpath("//button[@class='nav-btn iconfont icon-arrowdown3']")
        next_url = next_url[0] if len(next_url) > 0 else None
        return content_list,next_url

    def save_content_list(self,content_list):
        header = ['title','author','href','bvid','view','danmu','reply_num','like_num','coin_num','favorite_num','share_num','video_type','video_time','video_rank','video_tag']
        with open('video.csv', 'a', newline='', encoding='utf-8') as f:
            writer = csv.DictWriter(f, fieldnames=header)  # 提前预览列名,当下面代码写入数据时,会将其一一对应。
            writer.writerows(content_list)  # 写入数据

    def run(self):
        header = ['title','author','href','bvid','view','danmu','reply_num','like_num','coin_num','favorite_num','share_num','video_type','video_time','video_rank','video_tag']
        with open('video.csv', 'a', newline='', encoding='utf-8') as f:
            writer = csv.DictWriter(f, fieldnames=header)  # 提前预览列名,当下面代码写入数据时,会将其一一对应。
            writer.writeheader()  # 写入列名
        self.driver.get(self.start_url)
        list = self.get_item_list()
        num = 0
        for i in list:
            print(str(num) + ':' + i['str'])
            num = num+1
        choice1 = int(input("请输入你选择的分区:"))
        item_detail = self.get_item_detail(list[choice1]['url'])
        for detail in item_detail:
            print(detail['str'])
        choice2 = int(input("请输入你选择的分类:"))
        url_detail = item_detail[choice2]['url'].get_attribute("href")
        self.choose_time(url_detail)
        content_list,next_url = self.get_content_list()
        self.save_content_list(content_list)
        while next_url is not None:
            next_url.click()
            time.sleep(3)
            content_list, next_url=self.get_content_list()
            self.save_content_list(content_list)
        self.driver.quit()


if __name__=="__main__":
    data_get=Bilibili_data()
    data_get.run()
import requests
import json
from bs4 import BeautifulSoup
import re
import bs4
import time
import csv
import random
class detail_data:

    def __init__(self):
        # 爬虫地址
        self.alphabet = 'fZodR9XQDSUm21yCkr6zBqiveYah8bt4xsWpHnJE7jL5VG3guMTKNPAwcF'

    def dec(self, x):  # BV号转换成AV号
        r = 0
        for i, v in enumerate([11, 10, 3, 8, 4, 6]):
            r += self.alphabet.find(x[v]) * 58 ** i
        return (r - 0x2_0840_07c0) ^ 0x0a93_b324

    def url_deal(self, url):
        url = url[-12:]
        return url

    def get_time(self, url):
        # proxy = self.random_data('ip.txt')
        headers = self.random_data('headers.txt')
        r = requests.get(url,timeout=30,headers=headers)
        r.raise_for_status()
        soup = BeautifulSoup(r.text, "html.parser")
        result = soup.find(class_='video-data')
        timedata = []
        for i in result:
            if type(i) == bs4.element.Tag:
                timedata.append(re.sub('\s', ' ', i.text))
            else:
                timedata.append('None')
        return timedata

    def random_data(self, path):
        with open(path, 'r') as f:  # 这里打开的就是你之前爬取ip保存的地址
            data = f.readlines()
            f.close()

        reg = []
        for i in data:
            k = eval(i)  # 将字符串转化为字典形式
            reg.append(k)
        ip = random.choice(reg)  # 随机从reg这个存储了多个ip的list 里返回一个ip地址
        return ip

    def save_content_list(self,video_dict):
        header = ['title', 'author', 'href', 'bvid', 'view', 'danmu', 'reply_num', 'like_num', 'coin_num', 'favorite_num','share_num', 'video_type', 'video_time', 'video_rank', 'video_tag']
        with open('video_data.csv', 'a', newline='', encoding='utf-8') as f:
            writer = csv.DictWriter(f, fieldnames=header)  # 提前预览列名,当下面代码写入数据时,会将其一一对应。
            writer.writerow(video_dict)  # 写入数据


    def run(self, BV, url_video,title,author):
        bid = BV
        aid = self.dec(bid)
        url = r'https://api.bilibili.com/x/web-interface/archive/stat?aid=' + str(aid)
        # 携带cookie进行访问
        url2 = r'https://api.bilibili.com/x/web-interface/view/detail/tag?aid=' + str(aid)
        # proxy = self.random_data('ip.txt')
        # proxy2 = self.random_data('ip.txt')
        headers = self.random_data('headers.txt')
        response = requests.get(url, timeout=30, headers=headers)
        headers2 = self.random_data('headers.txt')
        response2 = requests.get(url2, timeout=30, headers=headers2)
        time.sleep(3)
        if response.status_code == 200:
            text = response.text
            text2 = response2.text
            jsonobj = json.loads(text)
            jsonobj2 = json.loads(text2)
            video_tags=''
            for tags in jsonobj2['data']:
                video_tags=video_tags+tags['tag_name']+' '
            timedata = self.get_time(url_video)
            # 从Json对象获取视频基本信息并转入词典中
            video_dict = {'title': title,
                          'author': author,
                          'href': url_video,
                          'bvid': jsonobj['data']['bvid'],
                          'view': jsonobj['data']['view'],
                          'danmu': jsonobj['data']['danmaku'],
                          'reply_num': jsonobj['data']['reply'],
                          'like_num': jsonobj['data']['like'],
                          'coin_num': jsonobj['data']['coin'],
                          'favorite_num': jsonobj['data']['favorite'],
                          'share_num': jsonobj['data']['share'],
                          'video_type': timedata[0],
                          'video_time': timedata[1],
                          'video_rank': timedata[2],
                          'video_tag': video_tags
                          }
            return video_dict



if __name__ == '__main__':
    Detail_data = detail_data()
    header = ['title', 'author', 'href', 'bvid', 'view', 'danmu', 'reply_num', 'like_num', 'coin_num', 'favorite_num','share_num', 'video_type', 'video_time', 'video_rank', 'video_tag']
    with open('video_data.csv', 'a', newline='', encoding='utf-8') as f:
        writer = csv.DictWriter(f, fieldnames=header)  # 提前预览列名,当下面代码写入数据时,会将其一一对应。
        writer.writeheader()  # 写入列名
    with open(r'video.csv', encoding='utf-8') as f:
        reader = csv.DictReader(f)
        for row in reader:
            url = row['href']
            BV = Detail_data.url_deal(url)
            video_dict = Detail_data.run(BV, url,row['title'],row['author'])
            print(video_dict)
            Detail_data.save_content_list(video_dict)
Last modification:August 31st, 2020 at 09:15 am
如果觉得我的文章对你有用,请随意赞赏