forked from captain-pool/GSOC
-
Notifications
You must be signed in to change notification settings - Fork 0
/
stream_client.py
150 lines (136 loc) · 4.25 KB
/
stream_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
from __future__ import print_function
import io
import socket
import threading
import time
from absl import logging
import argparse
import queue
import numpy as np
import datapacket_pb2
import pyaudio as pya
import pygame
import tensorflow as tf
from pygame.locals import * # pylint: disable=wildcard-import
pygame.init()
class Client(object):
SYN = b'SYN'
SYNACK = b'SYN/ACK'
ACK = b'ACK'
def __init__(self, ip, port):
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._metadata = datapacket_pb2.Metadata()
self._audio_queue = queue.Queue()
self._video_queue = queue.Queue()
self._audio_thread = threading.Thread(
target=self.write_to_stream, args=(True,))
self._video_thread = threading.Thread(
target=self.write_to_stream, args=(False,))
self._running = False
self.tolerance = 30 # Higher Tolerance Higher Frame Rate
self._lock = threading.Lock()
pyaudio = pya.PyAudio()
self._audio_stream = pyaudio.open(
format=pya.paFloat32,
channels=2,
rate=44100,
output=True,
frames_per_buffer=1024)
self._socket.connect((ip, port))
self.fetch_metadata()
def readpacket(self, buffersize=2**32):
buffer_ = io.BytesIO()
done = False
eof = False
while not done:
data = self._socket.recv(buffersize)
if data:
logging.debug("Reading Stream: Buffer Size: %d" % buffersize)
if data[-5:] == b'<EOF>':
logging.debug("Found EOF")
data = data[:-5]
eof = True
done = True
if data[-5:] == b'<END>':
logging.debug("Find End of Message")
data = data[:-5]
done = True
buffer_.write(data)
buffer_.seek(0)
return buffer_.read(), eof
def fetch_metadata(self):
logging.debug("Sending SYN...")
self._socket.send(b'SYN')
logging.debug("Sent Syn. Awating Metadata")
data, eof = self.readpacket(8)
self._metadata.ParseFromString(data)
dimension = self._metadata.dimension
self.screen = pygame.display.set_mode(dimension[:-1][::-1], 0, 32)
def fetch_video(self):
data, eof = self.readpacket()
framedata = datapacket_pb2.FramePacket()
framedata.ParseFromString(data)
frames = []
for frame in framedata.video_frames:
frames.append(self.parse_frames(frame, False))
self._audio_queue.put(framedata.audio_chunk)
self._video_queue.put(frames)
return eof
def parse_frames(self, bytestring, superresolve=False):
frame = np.asarray(bytestring)
if superresolve:
# Perform super resolution here
pass
frame = tf.cast(tf.clip_by_value(frame, 0, 255), tf.float32)
return frame.numpy()
def start(self):
with self._lock:
self._running = True
if not self._audio_thread.isAlive():
self._audio_thread.start()
if not self._video_thread.isAlive():
self._video_thread.start()
self._socket.send(b'ACK')
while not self.fetch_video():
pass # Wait till the end
self.wait_to_end()
def wait_to_end(self):
self._audio_thread.join()
self._video_thread.join()
def stop(self):
with self.lock:
self._running = False
def write_to_stream(self, isaudio=False):
while self._running:
try:
if isaudio:
if self._audio_queue.qsize() < 5:
continue
audio_chunk = self._audio_queue.get(timeout=10)
self._audio_stream.write(audio_chunk)
else:
if self._video_queue.qsize() < 5:
continue
for video_frame in self._video_queue.get(timeout=10):
video_frame = pygame.surfarray.make_surface(
np.rot90(np.fliplr(video_frame)))
self.screen.fill((0, 0, 2))
self.screen.blit(video_frame, (0, 0))
pygame.display.update()
time.sleep(
(1000 / self._metadata.video_fps - self.tolerance) / 1000)
except StopIteration:
pass
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--server",
default="127.0.0.1",
help="Address of stream server.")
parser.add_argument(
"--port",
default=8001,
help="Port of the server to connect to.")
logging.set_verbosity(logging.DEBUG)
client = Client("127.0.0.1", 8001)
client.start()