因为上手练习一个自己的数据分析项目,因此需要爬取数据。经历过两个版本的更新后,终于写出了第三版。期间也学会了selenium库的运用,API接口的调用,IP池等。

确定目标

因为想要一个量大的数据集,因此没有考虑热榜排名,因为所有区加起来也才一千左右。全部视频信息的话技术不行,然后就盯上了分区榜。

分区榜

从这个榜单可以选择时间段,可以根据每个月的视频热度排名等信息,来分析月度热点,哪些视频更加容易火,以及各种因素对视频播放量的影响。虽然只是一个小分区月度热度排名,并不包含全部视频,但是数据量也是极大的。下图可以看到接近有23万条数据。

数据量

网站分析

这里存在一个难点,就是虽然浏览器上是可以查看网页源码,并且包含了视频的相关信息,但是用requests请求之后的网页源码却并没有相关的信息。因此前两个版本,我采用了selenium库的方法来获取信息,但是这个方法有一个缺点,速度慢(因为要跟浏览器一样加载整个页面信息)、信息少(只有标题、作者、视频简介、以及视频页和个人主页网址),很麻烦。于是这次我换成了API调用的方法。

页面分析

我们选择一个具体的数字来查找,可以发现搜索出来一个search的接口。

页面分析

点进去之后,可以发现里面的result共有20条数据,刚好对应着每页20个视频。

数据分析

可以看到里面包含了作者、标题、标签、播放等一系列数据。

接口为https://s.search.bilibili.com/cate/search?callback=main_ver=v3&search_type=video&view_type=hot_rank&order=click©_right=-1&cate_id=138&page=1&pagesize=20&jsonp=jsonp&time_from=20200801&time_to=20200831 ,view_type为排行类型,page为页面数,pagesize为页面最大的视频数,上限好像是100。最后面就是时间了。

但是我还需要三连数据以及UP主的粉丝量。同理分析

得到三连的API接口:https://api.bilibili.com/x/web-interface/archive/stat?aid=371876135

其中aid由BV转换。

粉丝数为https://api.bilibili.com/x/relation/stat?vmid=32172331

mid可以在第一个接口处获取。

IP池

这时候虽然已经可以开始爬取了,但是如果数据量稍微有一点大,访问稍微有点频繁,就会导致IP被屏蔽。

这时候我们就需要用到代理IP,免费的代理IP虽然也有,而且GITHUB上也有专门的项目来建立代理IP池。但是免费的终究很麻烦,于是我选择了日租独享的IP。http://www.xdaili.cn/

代码

# coding: utf-8
# Author:南岛鹋 
# Blog: www.ndmiao.cn
# Date :2020/8/25 10:29
# Tool :PyCharm

import requests
import csv
import json
import random
import time


class video_data:
    def __init__(self):
        self.url = 'https://s.search.bilibili.com/cate/search?main_ver=v3&search_type=video&view_type=hot_rank&order=click&copy_right=-1&cate_id=138&page={}&pagesize=20&jsonp=jsonp&time_from=20200801&time_to=20200831'
        self.page = 11507
        self.alphabet = 'fZodR9XQDSUm21yCkr6zBqiveYah8bt4xsWpHnJE7jL5VG3guMTKNPAwcF'

    def dec(self, x):  # BV号转换成AV号
        r = 0
        for i, v in enumerate([11, 10, 3, 8, 4, 6]):
            r += self.alphabet.find(x[v]) * 58 ** i
        return (r - 0x2_0840_07c0) ^ 0x0a93_b324

    def random_headers(self, path): # 随机读取一个头信息
        with open(path, 'r') as f:
            data = f.readlines()
            f.close()

        reg = []
        for i in data:
            k = eval(i)  # 将字符串转化为字典形式
            reg.append(k)
        header = random.choice(reg)
        return header

    def get_ip(self): # 代理IP获取
        print('切换IP中.......')
        url = '代理IP的地址'
        ip = requests.get(url).text
        if ip in ['{"ERRORCODE":"10055","RESULT":"提取太频繁,请按规定频率提取!"}', '{"ERRORCODE":"10098","RESULT":"可用机器数量不足"}']: # 出现频繁或者机器不足,睡眠14秒
            time.sleep(14)
            ip = requests.get(url).text
            print(ip)
        else:
            print(ip)
        proxies = {
            'https': 'http://' + ip,
            'http': 'http://' + ip
        } # 设置https和http可以按需选择
        return proxies

    def get_requests(self, url, proxy): # 请求的函数
        headers = self.random_headers('headers.txt')
        # 将头信息和IP写入,用try来减少意外对程序的影响
        try: 
            response = requests.get(url, timeout=3, headers=headers, proxies=proxy)
        except requests.exceptions.RequestException as e:
            print(e)
            proxy = self.get_ip()
            try:
                response = requests.get(url, timeout=3, headers=headers, proxies=proxy)
            except requests.exceptions.RequestException as e:
                print(e)
                print('原始IP')
                response = requests.get(url, timeout=3, headers=headers)
        return response, proxy

    def get_follower(self, mid, proxy): # 获取粉丝数
        url = 'https://api.bilibili.com/x/relation/stat?vmid=' + str(mid)
        r, proxy = self.get_requests(url, proxy)
        result = json.loads(r.text) # 用json来解析文本
        # 按照需求获取需要的数据,因为粉丝数是必定存在的,所以失败了需要多次尝试获取。
        try:
            follower = result['data']['follower']
        except:
            follower,proxy = self.get_follower(mid, proxy)
        return follower, proxy

    def get_view(self, BV, proxy): # 获取三连和播放
        aid = self.dec(BV)
        url = 'https://api.bilibili.com/x/web-interface/archive/stat?aid=' + str(aid)
        r, proxy = self.get_requests(url, proxy)
        result = json.loads(r.text)
        view = {}# 因为视频虽然在排行榜,但是很可能已经删除,所以没有数据为None
        try:
            view['view'] = result['data']['view']
            view['danmu'] = result['data']['danmaku']
            view['reply'] = result['data']['reply']
            view['like'] = result['data']['like']
            view['coin'] = result['data']['coin']
            view['favorite'] = result['data']['favorite']
            view['share'] = result['data']['share']
            view['rank'] = result['data']['his_rank']
        except:
            view['view'] = 'None'
            view['danmu'] = 'None'
            view['reply'] = 'None'
            view['like'] = 'None'
            view['coin'] = 'None'
            view['favorite'] = 'None'
            view['share'] = 'None'
            view['rank'] = 'None'
        return view, proxy

    def get_parse(self, result, proxy): # 整合数据
        content = []
        items = result['result']
        for item in items:
            pubdate = item['pubdate']
            title = item['title']
            author = item['author']
            bvid = item['bvid']
            mid = item['mid']
            follower, proxy = self.get_follower(mid, proxy)
            video_view, proxy = self.get_view(bvid, proxy)
            view = video_view['view']
            danmu = video_view['danmu']
            reply = video_view['reply']
            like = video_view['like']
            coin = video_view['coin']
            favorite = video_view['favorite']
            share = video_view['share']
            rank = video_view['rank']
            tag = item['tag']
            con = [pubdate, title, author, bvid, mid, follower,view,danmu,reply,like,coin,favorite,share,rank, tag]
            content.append(con)
            print(con)
        print(content)
        self.save(content)
        return proxy

    def write_header(self):
        header = ['日期', '标题', '作者', 'BV', 'mid', '粉丝', '播放', '弹幕', '评论', '点赞','硬币','收藏','转发','排名','标签']
        with open('fun_video.csv', 'a', encoding='gb18030', newline='')as f:
            write = csv.writer(f)
            write.writerow(header)

    def save(self,content):# 存入csv
        with open('fun_video.csv', 'a', encoding='gb18030', newline='')as file:
            write = csv.writer(file)
            write.writerows(content)

    def run(self):
        #self.write_header()
        proxy = self.get_ip()
        for i in range(168, self.page):
            url = self.url.format(i)
            response, proxy = self.get_requests(url, proxy)
            result = json.loads(response.text)
            proxy = self.get_parse(result, proxy)
            print('第{}页爬取完毕'.format(i))


if __name__ == '__main__':
    video = video_data()
    video.run()

Last modification:September 4, 2020
如果觉得我的文章对你有用,请随意赞赏