Skip to content

Commit

Permalink
initial ping
Browse files Browse the repository at this point in the history
  • Loading branch information
Georgios Hadjiharalambous committed Oct 7, 2024
1 parent b3577db commit 76fa3e2
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 3 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.0.2
2.0.3
84 changes: 82 additions & 2 deletions speechmatics/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,14 @@
import json
import logging
import os
from typing import Dict, Union
import random
import time
from typing import Dict, Optional, Union
from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse

logger = logging.getLogger("websockets")
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler())
import httpx
import websockets

Expand Down Expand Up @@ -89,6 +94,59 @@ def __init__(
# Semaphore used to ensure that we don't send too much audio data to
# the server too quickly and burst any buffers downstream.
self._buffer_semaphore = asyncio.BoundedSemaphore
self._event_loop = asyncio.get_event_loop()
cfg = {"autoPingInterval": 1, "autoPingTimeout": 1}
self.ping_interval = cfg["autoPingInterval"]
self.ping_timeout = cfg[
"autoPingTimeout"
] # connection_settings.ping_timeout_seconds
self.ping_timer: Optional[asyncio.TimerHandle] = None
self.ping_response_timer: Optional[asyncio.TimerHandle] = None
self.ping_id = 0

def cancel_ping_timers(self):
# logging.warning(" cancel_ping_timers-------------------------------------")

if self.ping_timer:
self.ping_timer.cancel()
self.ping_timer = None
if self.ping_response_timer:
self.ping_response_timer.cancel()
self.ping_response_timer = None

def set_ping_timer(self):
# logging.warning(" set_ping_timer-------------------------------------")
if self.ping_interval > 0:
if self.ping_timer:
self.ping_timer.cancel()
# schedule new ping only when we are not already waiting for pong
if self.ping_response_timer is None:
self.ping_timer = self._event_loop.call_later(
self.ping_interval, self.send_ping
)

async def send_ping(self):
logging.warning("IRTHE send_ping-------------------------------------")
self.ping_id = random.randint(0, 2**32 - 1) # nosec B311
assert self.websocket
await self.websocket.ping(self.ping_id.to_bytes(4, byteorder="big", signed=False))
self.ping_response_timer = self._event_loop.call_later(
self.ping_timeout, self.on_pong_timeout
)

def onPong(self):
logging.warning("IRTHE ponggggggg-------------------------------------")
# id = int.from_bytes(payload, byteorder="big", signed=False)
# if self.ping_id == id and self.ping_response_timer:
if self.ping_response_timer:
self.ping_response_timer.cancel()
self.ping_response_timer = None
self.set_ping_timer()
# do i need to replace the websocket.onPoing = onPong ?
def on_pong_timeout(self):
err = f"Response to PING with id {self.ping_id} not received in time, closing connection."
logging.warning(err)
# self.send_error("protocol_error", err)

async def _init_synchronization_primitives(self):
"""
Expand All @@ -108,6 +166,7 @@ def _flag_recognition_started(self):
This updates an internal flag to mark the recognition session
as started meaning, AddAudio is now allowed.
"""
logging.warning(f"time taken={time.time()-self.time}")
self._recognition_started.set()

def _set_language_pack_info(self, language_pack_info: dict):
Expand Down Expand Up @@ -179,6 +238,7 @@ def _start_recognition(self, audio_settings):
self.session_running = True
self._call_middleware(ClientMessageType.StartRecognition, msg, False)
LOGGER.debug(msg)
self.time= time.time()
return msg

@json_utf8
Expand Down Expand Up @@ -206,6 +266,7 @@ def _consumer(self, message):
:raises ForceEndSession: If this was raised by the user's event
handler.
"""
self.cancel_ping_timers()
LOGGER.debug(message)
message = json.loads(message)
message_type = message["message"]
Expand Down Expand Up @@ -241,6 +302,7 @@ async def _producer(self, stream, audio_chunk_size):
:type audio_chunk_size: int
"""
async for audio_chunk in read_in_chunks(stream, audio_chunk_size):
self.set_ping_timer()
if self._session_needs_closing:
break

Expand Down Expand Up @@ -274,6 +336,12 @@ async def _consumer_handler(self):
except websockets.exceptions.ConnectionClosedError as ex:
LOGGER.info("Disconnected while waiting for recv().")
raise ex
# if isinstance(message, bytes) and len(message) == 0:
# self.onPong(message)
# exit()
# self.onPong(message)
# exit()

self._consumer(message)

async def _send_message(self, msg):
Expand Down Expand Up @@ -528,17 +596,29 @@ async def run(
async with websockets.connect( # pylint: disable=no-member
updated_url,
ssl=self.connection_settings.ssl_context,
ping_timeout=self.connection_settings.ping_timeout_seconds,
# ping_timeout=self.connection_settings.ping_timeout_seconds, = 0?
# Don't limit the max. size of incoming messages
max_size=None,
extra_headers=extra_headers,
) as self.websocket:
pong_waiter = await self.websocket.ping(self.ping_id.to_bytes(4, byteorder="big", signed=False))
# self.websocket.onPong = self.onPong
# self.websocket.on_pong = self.onPong
# self._event_loop.call_soon(self.WaitPong,pong_waiter)
self._event_loop.create_task(self.WaitPong(pong_waiter))
# self.websocket.protocol.onPong = self.onPong
# self.websocket.protocol.on_pong = self.onPong
# self.websocket.ping
await self._communicate(stream, audio_settings)
finally:
self.session_running = False
self._session_needs_closing = False
self.websocket = None

async def WaitPong(self,pong_waiter):
logging.warning("running froom ---------------------WaitPong")
await pong_waiter
self.onPong()
def stop(self):
"""
Indicates that the recognition session should be forcefully stopped.
Expand Down

0 comments on commit 76fa3e2

Please sign in to comment.