四、小电视自动抽奖

自动参加小电视抽奖
自动领取舰长、提督奖励
运行一天大概能获取1800包辣条
However. 每次凌晨B站后台都会检测账号脚本,一次抢太多很容易被封号一周。
可能是依据最大的连续领奖次数判断的吧

图片.png
一、抽奖流程
1. 分析页面可知
每当有人赠送小电视之类的礼物时
系统就会发送广播弹幕(SYS_MSG)
2. 点击广播跳转到抽奖房间
F12清空网络请求
接着点击抽奖
就找到了抽奖请求

POST https://api.live.bilibili.com/gift/v3/smalltv/join

参数
图片.png

响应
图片.png
3. 分析post参数
roomid系统广播里就有
raffleId是抽奖关键参数
type 固定为 Gift
csrf_token 来自于cookie值bili_jct (可为空)
visit_id (可为空)
4. 刷新抽奖页面
F12过滤得到XHR 请求
逐个分析返回值和url请求
发现raffleId请求

GET  https://api.live.bilibili.com/gift/v3/smalltv/check
data={
'roomid':roomid
 }

响应
图片.png
分析结果

  1. 小电视抽奖和月色真美、摩天大楼
    获取raffleId的请求略有不同
https://api.live.bilibili.com/gift/v3/smalltv/check
https://api.live.bilibili.com/gift/v2/smalltv/check

参加抽奖的请求相同
二、代码实现
api.py 添加Gift类

class Gift():
    def check_smalltv(self,s,roomid,ver='v2'):
        url='https://api.live.bilibili.com/gift/'+ver+'/smalltv/check'
        data={'roomid': roomid}
        return ajax(s,url,'GET',data)
    def join_smalltv(self,s,roomid,raffleId,csrf_token,visit_id='', type = 'Gift'):
        url= 'https://api.live.bilibili.com/gift/v3/smalltv/join'
        data={
                'roomid': roomid,
                'raffleId': raffleId,
                'type': type,
                'csrf_token': csrf_token,
                'visit_id': visit_id
                }
        return ajax(s,url,'POST',data)
    def check_guard(self,s,roomid,_type='_guard'):
        url='https://api.live.bilibili.com/lottery/v1/Lottery/check'+_type+'?roomid='+str(roomid)
        return ajax(s,url,'GET');
    def join_guard(self,s,roomid,id,csrf_token,type='guard',visit_id=None):
        url= 'https://api.live.bilibili.com/lottery/v2/Lottery/join'
        data= {
                'roomid': roomid,
                'id': id,
                'type': type,
                'csrf_token': csrf_token,
                'visit_id': visit_id
            }
        return ajax(s,url,'POST',data)

server.py添加抽奖类

class Join:
    def __init__(self,Info,s):
        self.Info = Info
        self.check_roomids=[Info['roomid']]
        self.raffleId=0
        self.hour_run=Timer(7.2e3,self.each_hour_run)
        self.s=s
        self.num=1
    def join_smalltv(self,obj):
        if self.num==50:
            self.num=0
            print('[跳过抽奖]['+obj['title']+']已跳过抽奖(roomid=' + str(obj['roomid'])+ ',raffleId=' + str(obj['raffleId'])+ ')')
            return
        self.num+=1
        res=API.Gift.join_smalltv(self.s,obj['roomid'], obj['raffleId'],self.Info['csrf_token'],self.Info['visit_id'])
        code=res['code']
        if code==0:Logger.info('[自动抽奖]['+obj['title']+']已参加抽奖(roomid=' + str(obj['roomid'])+ ',raffleId=' + str(obj['raffleId'])+ ')')
        elif code==400:Logger.warning('[自动抽奖][礼物抽奖]访问被拒绝,您的帐号可能已经被封禁,已停止')
        elif code==65531:Logger.info('[自动抽奖][礼物抽奖]已参加抽奖(roomid=' + str(obj['roomid'])+ ',raffleId=' + str(obj['raffleId'])+ ')')
        else:Logger.error('[自动抽奖][礼物抽奖]已参加抽奖(roomid=' + str(obj['roomid'])+ ',raffleId=' + str(obj['raffleId'])+ ')'+res['msg'])
    def push_and_check_roomid(self,roomid):
        self.push_roomid(roomid)
        for roomid in self.check_roomids:self.check_and_join_guard(roomid)
    def push_roomid(self,roomid):
        if roomid in self.check_roomids:return
        self.check_roomids.append(roomid)
    def each_hour_run(self):
        res=API.Room.room_rank(self.s,"master_realtime_hour",'areaid_realtime_hour')
        for room in res['data']['list']:self.push_roomid(room['roomid'])
        res=API.Room.room_rank(self.s,"week_star_master",'week')
        for room in res['data']['list']:self.push_roomid(room['roomid'])
    def check_and_join_smalltv(self,roomid):
        res=API.Gift.check_smalltv(self.s,roomid,'v2')
        if res['code']==-400:return
        for data in res['data']:
            if data['raffleId']<=self.raffleId:continue
            self.raffleId=data['raffleId']
            self.join_smalltv({'roomid':roomid,'raffleId':self.raffleId,'title':data['title']})
        if len(res['data'])==0:
            res=API.Gift.check_smalltv(self.s,roomid,'v3')
            for data in res['data']['list']:
                if data['raffleId']<=self.raffleId:continue
                self.raffleId=data['raffleId']
                self.join_smalltv({'roomid':roomid,'raffleId':self.raffleId,'title':data['title']})
        self.push_and_check_roomid(roomid)
    def join_guard(self,obj):
        res=API.Gift.join_guard(self.s,obj['roomid'],obj['id'],self.Info['csrf_token'])
        print(obj['roomid'])
        Logger.info(res['data']['message'])
    def check_and_join_guard(self,roomid):
        res=API.Gift.check_guard(self.s,roomid)
        if len(res['data'])==0:return
        for guard in res['data']:self.join_guard({'id':guard['id'],'roomid':roomid})
        res=API.Gift.check_guard(self.s,roomid,'')
        if len(res['data']['guard'])==0:return
        for guard in res['data']['guard']:self.join_guard({'id':guard['id'],'roomid':roomid})

utils.py添加日志类

@singleton
class Logger:
    def __init__(self):
        handlers = {
            logging.NOTSET: "logs/notset.logs",
            logging.DEBUG: "logs/debug.logs",
            logging.INFO: "logs/info.logs",
            logging.WARNING: "logs/warning.logs",
            logging.ERROR: "logs/error.logs",
            logging.CRITICAL: "logs/critical.logs"
        }
        self.__loggers = {}
        logLevels = handlers.keys()
        fmt = logging.Formatter('%(asctime)s [%(levelname)s]: %(message)s')
        for level in logLevels: #创建logger
            logger = logging.getLogger(str(level))
            logger.setLevel(level)
            #创建hander用于写日日志文件
            log_path = os.path.abspath(handlers[level])
            fh = logging.FileHandler(log_path) #定义日志的输出格式
            fh.setFormatter(fmt)
            fh.setLevel(level) #给logger添加hander
            logger.addHandler(fh)
            self.__loggers.update({level: logger})
    def info(self, message):
        self.__loggers[logging.INFO].info(message)
    def error(self, message):
        self.__loggers[logging.ERROR].error(message)
    def warning(self, message):
        self.__loggers[logging.WARNING].warning(message)
    def debug(self, message):
        self.__loggers[logging.DEBUG].debug(message)
    def critical(self, message):
        self.__loggers[logging.CRITICAL].critical(message)

utils.py添加 保存配置文件函数

def save_py(moudle):
    keys,py=dir(moudle),''
    for key in keys:
        if '__' in key:continue
        py+=key+'='+json.dumps(getattr(moudle,key),indent=4, separators=(',', ':'))+'
'
    with open(moudle.__name__+".py",'w') as f:f.write(py)

添加配置文件 config.py

headers={
    "User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:62.0) Gecko/20100101 Firefox/62.0",
    "Accept":"application/json, text/plain, */*",
    "Accept-Language":"zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2",
    "Accept-Encoding":"gzip, deflate, br",
    "Referer":"https://live.bilibili.com/",
    "Origin":"https://live.bilibili.com",
    "Connection":"keep-alive"
}
msg_info={
    "reject_msg":[
        "SEND_GIFT",
        "WELCOME",
        "WELCOME_GUARD",
        "COMBO_SEND",
        "ENTRY_EFFECT",
        "NOTICE_MSG",
        "DANMU_MSG",
        "ROOM_RANK",
        "WISH_BOTTLE",
        "COMBO_END"
    ],
    "receive_msg":[
        "SYS_MSG",
        "SPECIAL_GIFT"
    ]
}
post_info={
    "csrf_token":"",
    "visit_id":"",
    "roomid":392351,
    "uid":380741418
}
ws_info={
    "uid":380741418,
    "roomid":392351,
    "protover":1,
    "platform":"web",
    "clientver":"1.4.0"
}

主程序

import os
os.chdir('/data/jupyter/root/软件/Bilibili')
from jquery import http,session
from servers import Join,Login
from DanmuWS import DanmuWebSocket
from config import *
from utils import save_py
import config
%matplotlib inline
#save_py(config)
s=session(headers,'config/2685054765.txt')
login=Login(s)
join=Join(post_info,s)
ws=None
while not login.isLogin():
    login.get_vdcode()
    login.loop_vdcode()
ws_info['uid']=post_info['uid']=login.info['uid']
def oncmd(data):
    cmd=data["cmd"]
    if cmd in msg_info['reject_msg']:return
    if cmd=="SYS_MSG":
        if "real_roomid" in data:join.check_and_join_smalltv(data["real_roomid"])
def onlogin(data):
    print("login success")
def onreconnect(code,data=None):
    global ws
    ws=data['dws']
room_num=0
def onheartbeat(num):
    if num>3:return
    close()
    if len(join.check_roomids)==0:
        global room_num
        join.each_hour_run()
        room_num+=1
        if room_num>3:return join.hour_run.cancel()
    ws_info['roomid']=post_info['roomid']=join.check_roomids.pop()
    main()
def main():
    global ws
    try:
        print(ws_info['roomid'])
        ws = DanmuWebSocket(ws_info,'wss://broadcastlv.chat.bilibili.com/sub')
        ws.connect()
        ws.bind(onreconnect,onlogin,onheartbeat,oncmd)
        #ws.run_forever()
    except:
        close()
def close():
    ws.close()
    save_py(config)
    s.save()
main()
原文地址:https://www.cnblogs.com/blogs-qq2685054765/p/9734366.html