This repository has been archived by the owner on May 27, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
stream_watcher.py
89 lines (73 loc) · 2.96 KB
/
stream_watcher.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import time
import streamlink
from attr import evolve
from api.video_buffer import VideoBuffer
from pipe.pipe import Pipe
from pipe.sync_pipeline import SyncPipeline
from pipe.async_pipeline import AsyncPipeline
from pipe.debug_pipe import DebugPipe
from pipe.screen_pipe import ScreenPipe
from pipe.versus_pipe import VersusPipe
from pipe.device_pipe import DevicePipe
from pipe.damage_pipe import DamagePipe
from pipe.gembar_pipe import GembarPipe
from pipe.socketio_pipe import SocketioPipe
from state.game_state import GameState
class StreamWatcher(object):
"""
Open a stream, process frames and update the state for it.
"""
def __init__(self):
self._realtime_pipeline = SyncPipeline((
DevicePipe(),
ScreenPipe(),
DamagePipe(),
GembarPipe()))
self._deferred_pipeline = AsyncPipeline((
VersusPipe(),
DebugPipe(),
SocketioPipe()))
def start(self, stream_config, block_operations, video_url=None):
self._block_deferred_pipeline = block_operations
self.state = GameState(stream_config=stream_config)
self._realtime_pipeline.start()
self._deferred_pipeline.start()
if video_url is None:
streams = streamlink.streams(stream_config.url)
stream = streams.get(str(stream_config.resolution) + "p") \
or streams.get("best")
if stream is None:
raise Exception("Stream is invalid", stream_config.url)
video_url = stream.url
self._buffer = VideoBuffer()
self._buffer.start(video_url, stream_config.max_fps,
stream_config.resolution, block_read=block_operations)
def stop(self):
self._deferred_pipeline.stop()
self._buffer.stop()
@property
def running(self):
return self._deferred_pipeline.running and self._buffer.running
def process(self):
frame = self._buffer.get()
changes_len = 0
new_state = evolve(self.state,
timestamp=time.time(),
seconds=self._buffer.seconds)
# process sync tasks
changes = self._realtime_pipeline.process(frame, new_state)
new_state = evolve(new_state, **changes)
changes_len += len([k for k in changes.keys() if k != "stream_config"])
# push tasks to async pipeline
self._deferred_pipeline.process(frame, new_state)
# block if not watching a live stream
while self._block_deferred_pipeline and self._deferred_pipeline.processing:
pass
# get changes from this frame (if blocking) or previous frame
changes = self._deferred_pipeline.reset_changes()
new_state = evolve(new_state, **changes)
changes_len += len([k for k in changes.keys() if k != "stream_config"])
if changes_len > 0:
new_state = evolve(new_state, last_change=new_state.seconds)
self.state = new_state
return frame.copy(), self.state