Skip to content

Commit

Permalink
[Feature] Scene control by votes (#3)
Browse files Browse the repository at this point in the history
### Key Updates

1. **BiliInteractReader**: Integrated with the Bilibili live platform's danmu system, viewers can vote by sending specific danmu messages. Each danmu message is converted into a certain number of votes, with super users and users with higher follower levels receiving additional votes.

2. **DetectionProcessor**: A new Detection Processor class has been added, which reads video frames from multiple RTSP network cameras and uses YOLOv5 or OpenCV for cat detection. The detection results are used for voting and affect the scene switching in OBS.

3. **Scene Switching Logic**: The main program controls OBS to switch scenes based on the scene with the most votes in each loop. This allows the live content to be adjusted in real-time according to the preferences of the viewers.

4. **User Interface Improvements**: Program's running state is displayed on OBS stream.

5. **Code Optimization**: The code has been optimized to improve the efficiency and stability of the program. This includes improvements to asynchronous operations and enhanced exception handling.
  • Loading branch information
LazyBusyYang authored Apr 11, 2024
1 parent 3d36757 commit a9b2eee
Show file tree
Hide file tree
Showing 22 changed files with 1,416 additions and 129 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
build
yolov5s.pt
bilibili_ref
*.egg-info*
__pycache__
local_files
2 changes: 1 addition & 1 deletion .isort.cfg
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
[settings]
known_third_party = cv2,numpy,setuptools,simpleobsws
known_third_party = cv2,numpy,prettytable,requests,setuptools,simpleobsws,websockets
17 changes: 13 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,22 @@

## Introduction

This project is a multi-perspective cat live streaming program implemented based on Python and OBS Studio. It can select the camera view according to the detection results of cats from multiple RTSP network cameras in the home, control OBS to activate scenes with cats, and complete unattended cat live streaming on the network.
This project is a multi-perspective cat live streaming program implemented based on Python and OBS Studio. It can select the camera view according to the vote results. Every loop the main program controls OBS to activate a scene
with most votes.

![Screenshot](./resources/screenshot.jpg)

Please set up your cameras for spots where cats often stay.

![Rooms, cameras and cat](./resources/room_illustration.jpg)

The main program initializes upon startup according to the configuration file, connects to the OBS websocket server, and enters a while loop. Within each iteration of the loop, it requests the latest RTSP URLs for each perspective from OBS. It then uses ffmpeg or cv2 to read the latest video frames from RTSP. Utilizing YOLOv5 or cv2, it detects whether a cat is present in the video frames. Based on the detection results, it sends scene-switching signals to OBS. The flowchart is shown in the figure below.
The main program initializes upon startup according to the configuration file, connects to the OBS websocket server, and enters a while loop. Within each iteration of the loop, it collects votes.
There are 2 sources of votes, one is image detector, the other is user's danmu from bilibili live platform.
- **interact_reader**: `BiliInteractReader` is a class reading interactive danmu messages from a sub-thread, converting them into number of vote. Vote weight bonus for super user and followers level is also considered here.
- **detection_proc**: `DetectionProcessor` is a class submitting valid rtsp urls to a sub-thread, fetching detection
results on rtsp newest frame. `RTSPDetectionThread` uses ffmpeg or cv2 to read the latest video frames from RTSP. Utilizing YOLOv5 or cv2, it detects whether a cat is present in the video frames.

![Mainloop flow chart](./resources/mainloop_flow.png)
![Sequence Diagram for loops](./resources/mainloop_seq.png)

## Prerequisites

Expand All @@ -32,7 +41,7 @@ Please write the configuration file needed for live streaming control based on t

When the configuration file is ready, start program with a command like below:
```bash
python tools/main.py --config_path configs/yolov5_ffmpeg_3scenes.py
python tools/main.py --config_path configs/default_config.py
```


Expand Down
318 changes: 318 additions & 0 deletions cat_stream/bili_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,318 @@
import asyncio
import hashlib
import hmac
import json
import logging
import random
import requests
import struct
import time
import websockets
from hashlib import sha256
from queue import Full, Queue
from threading import Event
from typing import Union


class BiliClient:

def __init__(self,
id_code: str,
app_id: str,
key: str,
secret: str,
host: str,
interact_queue: Union[None, Queue] = None,
exit_signal: Union[None, Event] = None,
queue_put_timeout: int = 10,
verbose: bool = False,
logger: Union[None, str, logging.Logger] = None):
self.id_code = id_code
self.app_id = app_id
self.key = key
self.secret = secret
self.host = host
self.game_id = ''
self.interact_queue = interact_queue
self.queue_put_timeout = queue_put_timeout
self.exit_signal = exit_signal
self.verbose = verbose
if logger is None:
self.logger = logging.getLogger(__name__)
elif isinstance(logger, str):
self.logger = logging.getLogger(logger)
else:
self.logger = logger

def run(self):
# loop = asyncio.get_event_loop()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
websocket = loop.run_until_complete(self.connect())
try:
tasks = [
asyncio.ensure_future(self.recv_loop(websocket)),
asyncio.ensure_future(self.send_heartbeat(websocket)),
asyncio.ensure_future(self.app_send_heartbeat())
]
loop.run_until_complete(asyncio.gather(*tasks))
finally:
self.exit()

def sign(self, params):
"""Sign the http request."""
key = self.key
secret = self.secret
md5 = hashlib.md5()
md5.update(params.encode())
ts = time.time()
nonce = random.randint(1, 100000) + time.time()
md5data = md5.hexdigest()
headerMap = {
'x-bili-timestamp': str(int(ts)),
'x-bili-signature-method': 'HMAC-SHA256',
'x-bili-signature-nonce': str(nonce),
'x-bili-accesskeyid': key,
'x-bili-signature-version': '1.0',
'x-bili-content-md5': md5data,
}

headerList = sorted(headerMap)
headerStr = ''

for key in headerList:
headerStr = headerStr + key + ':' + str(headerMap[key]) + '\n'
headerStr = headerStr.rstrip('\n')

appsecret = secret.encode()
data = headerStr.encode()
signature = hmac.new(appsecret, data, digestmod=sha256).hexdigest()
headerMap['Authorization'] = signature
headerMap['Content-Type'] = 'application/json'
headerMap['Accept'] = 'application/json'
return headerMap

def get_websocket_info(self):
# 开启应用
postUrl = '%s/v2/app/start' % self.host
params = '{"code":"%s","app_id":%d}' % (self.id_code, self.app_id)
headerMap = self.sign(params)
retry_count = 0
retry_max = 10
retry_interval = 1
_game_id = None
while retry_count < retry_max:
r = requests.post(
url=postUrl, headers=headerMap, data=params, verify=True)
data = json.loads(r.content)
_data = data['data']
try:
_game_info = _data['game_info']
_game_id = _game_info['game_id']
break
except TypeError:
self.logger.warning(
'[BiliClient] get_websocket_info failed for ' +
f'{retry_count} retries.\n' + f'params={params}\n' +
f'data={data["data"]}')
retry_count += 1
retry_interval *= 2
time.sleep(retry_interval)
continue
if _game_id is None:
self.interact_queue.put('Error')
raise ValueError('[BiliClient] Failed to get game_id ' +
f'within {retry_count} retries.')
self.game_id = str(_game_id)
self.logger.info('[BiliClient] get_websocket_info success. ' +
f'data={data}')
# 获取长连地址和鉴权体
return str(data['data']['websocket_info']['wss_link'][0]), str(
data['data']['websocket_info']['auth_body'])

# 发送游戏心跳
async def app_send_heartbeat(self):
while True:
await asyncio.ensure_future(asyncio.sleep(20))
if self.exit_signal.is_set():
break
postUrl = '%s/v2/app/heartbeat' % self.host
params = '{"game_id":"%s"}' % (self.game_id)
headerMap = self.sign(params)
r = requests.post(
url=postUrl, headers=headerMap, data=params, verify=True)
if r.status_code != 200:
self.logger.error('[BiliClient] app_send_heartbeat failed')
r.raise_for_status()
else:
data = json.loads(r.content)
self.logger.debug('[BiliClient] app_send_heartbeat success. ' +
f'data={data}')

# 发送鉴权信息
async def auth(self, websocket, authBody):
req = _BliveProto()
req.body = authBody
req.op = 7
await websocket.send(req.pack())
buf = await websocket.recv()
resp = _BliveProto()
resp.unpack(buf)
respBody = json.loads(resp.body)
if respBody['code'] != 0:
self.logger.info('[BiliClient] Auth failed.')
else:
self.logger.info('[BiliClient] Auth success.')
self.interact_queue.put(item='Ready')

# 发送心跳
async def send_heartbeat(self, websocket):
while True:
await asyncio.ensure_future(asyncio.sleep(20))
if self.exit_signal.is_set():
break
req = _BliveProto()
req.op = 2
await websocket.send(req.pack())
self.logger.debug('[BiliClient] send_heartbeat success')

def _run_one_loop(self, recv_buffer) -> None:
resp = _BliveProto()
resp.unpack(recv_buffer)
op_type = resp.get_operation_type()
if op_type == 'OP_SEND_SMS_REPLY':
body_str = resp.body
body_dict = json.loads(body_str)
if 'cmd' in body_dict and \
body_dict['cmd'] == 'LIVE_OPEN_PLATFORM_DM':
uid = body_dict['data']['uid']
uname = body_dict['data']['uname']
msg = body_dict['data']['msg']
medal_level = int(body_dict['data']['fans_medal_level'])
medal_name = body_dict['data']['fans_medal_name']
# put danmu into queue
clean_data = dict(
uid=uid,
uname=uname,
msg=msg,
medal_level=medal_level,
medal_name=medal_name)
if self.verbose:
self.logger.info(
f'[BiliClient] danmu message={clean_data}')
if self.interact_queue is not None:
try:
self.interact_queue.put(
clean_data, timeout=self.queue_put_timeout)
except Full:
self.logger.error(
'[BiliClient] interact_queue is full, ' +
f' drop danmu message={clean_data}.')
else:
# Not a danmu message
# TODOL record gifts
pass
else:
# Not a reply message
pass

# 读取信息
async def recv_loop(self, websocket):
self.logger.debug('[BiliClient] recv_loop start')
while True:
if self.exit_signal.is_set():
break
try:
recv_buffer = await websocket.recv()
self._run_one_loop(recv_buffer)
except Exception as e:
self.logger.error(
f'[BiliClient] recv_loop error, exception={e}')
break

# 建立连接
async def connect(self):
addr, authBody = self.get_websocket_info()
self.logger.debug('[BiliClient] connect success. ' +
f'addr={addr}, authBody={authBody}')
websocket = await websockets.connect(addr)
# 鉴权
await self.auth(websocket, authBody)
return websocket

def exit(self):
# 关闭应用
postUrl = '%s/v2/app/end' % self.host
params = '{"game_id":"%s","app_id":%d}' % (self.game_id, self.app_id)
headerMap = self.sign(params)
r = requests.post(
url=postUrl, headers=headerMap, data=params, verify=True)
if r.status_code != 200:
self.logger.error(f'[BiliClient] end app failed, params={params}')
else:
self.logger.debug(f'[BiliClient] end app success, params={params}')


class _BliveProto:
"""Protocol for bilibili live streaming."""

def __init__(self) -> None:
self.packetLen = 0
self.headerLen = 16
self.ver = 0
self.op = 0
self.seq = 0
self.body = ''
self.maxBody = 2048

def pack(self): # -> bytes | Any:
"""Pack the message into bytes."""
self.packetLen = len(self.body) + self.headerLen
buf = struct.pack('>i', self.packetLen)
buf += struct.pack('>h', self.headerLen)
buf += struct.pack('>h', self.ver)
buf += struct.pack('>i', self.op)
buf += struct.pack('>i', self.seq)
buf += self.body.encode()
return buf

def unpack(self, buf) -> None:
"""Unpack the message from bytes."""
if len(buf) < self.headerLen:
print('包头不够')
return
self.packetLen = struct.unpack('>i', buf[0:4])[0]
self.headerLen = struct.unpack('>h', buf[4:6])[0]
self.ver = struct.unpack('>h', buf[6:8])[0]
self.op = struct.unpack('>i', buf[8:12])[0]
self.seq = struct.unpack('>i', buf[12:16])[0]
if self.packetLen < 0 or self.packetLen > self.maxBody:
print('包体长不对', 'self.packetLen:', self.packetLen, ' self.maxBody:',
self.maxBody)
return
if self.headerLen != self.headerLen:
print('包头长度不对')
return
bodyLen = self.packetLen - self.headerLen
self.body = buf[16:self.packetLen]
if bodyLen <= 0:
return
if self.ver == 0:
return
else:
return

def get_operation_type(self) -> str:
"""Get the operation type."""
if self.op == 2:
return 'OP_HEARTBEAT'
elif self.op == 3:
return 'OP_HEARTBEAT_REPLY'
elif self.op == 5:
return 'OP_SEND_SMS_REPLY'
elif self.op == 7:
return 'OP_AUTH'
elif self.op == 8:
return 'OP_AUTH_REPLY'
else:
return 'OP_UNKNOWN'
Loading

0 comments on commit a9b2eee

Please sign in to comment.