Skip to content

Commit

Permalink
refactor: async live
Browse files Browse the repository at this point in the history
  • Loading branch information
daxartio committed Dec 9, 2023
1 parent 5e270ae commit c040f87
Show file tree
Hide file tree
Showing 6 changed files with 512 additions and 109 deletions.
391 changes: 379 additions & 12 deletions poetry.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ python = "~3.8.1"
pywin32 = {version="^228", optional = true}
pyImpinj = {version="^1.2", optional = true}
playsound = {version="=1.2.2", optional = true}
aiohttp = "^3.9.0"
boltons = "^20"
chardet = "^5.2.0"
docxtpl = "^0"
Expand All @@ -17,14 +18,14 @@ markupsafe = "2.0.1"
orjson = "^3.9.5"
polib = "^1"
pydantic = "^1"
pylocker = "^3.1.0"
PySide2 = "^5"
python-dateutil = "^2"
python-dotenv = '^0.14'
pywinusb = "^0"
requests = "^2"
shiboken2 = "^5.15"
sportident = "^1"
pylocker = "^3.1.0"

[tool.poetry.group.dev.dependencies]
autoflake = "*"
Expand Down
4 changes: 2 additions & 2 deletions sportorg/gui/main_window.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
from sportorg.modules.backup.file import File
from sportorg.modules.configs.configs import Config as Configuration
from sportorg.modules.configs.configs import ConfigFile
from sportorg.modules.live.live import LiveClient, live_client
from sportorg.modules.live.live import live_client
from sportorg.modules.printing.model import (
NoPrinterSelectedException,
NoResultToPrintException,
Expand Down Expand Up @@ -295,7 +295,7 @@ def post_show(self):
self.res_recalculate = QTimer(self)
self.res_recalculate.timeout.connect(self.res_recalculate_by_timer)

LiveClient().init()
live_client.init()
self._menu_disable(self.current_tab)

def _setup_ui(self):
Expand Down
75 changes: 60 additions & 15 deletions sportorg/modules/live/live.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,67 @@
import asyncio
import logging
import os
from functools import partial
from threading import Thread
from queue import Empty, Queue
from threading import Event, Thread

import requests
import aiohttp

from sportorg.common.broker import Broker
from sportorg.models.memory import race
from sportorg.modules.live import orgeo

LIVE_TIMEOUT = int(os.getenv('SPORTORG_LIVE_TIMEOUT', '10'))


async def create_session(timeout: int) -> aiohttp.ClientSession:
return aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=timeout))


class LiveThread(Thread):
def __init__(self):
super().__init__(name='LiveThread', daemon=True)
self._queue = Queue()
self._stop_event = Event()
self._delay = 0.5

def send(self, func) -> None:
self._queue.put_nowait(func)

def stop(self) -> None:
self._stop_event.set()

def run(self) -> None:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
session = loop.run_until_complete(create_session(LIVE_TIMEOUT))
while not self._stop_event.is_set():
try:
funcs = []
while True:
try:
funcs.append(self._queue.get_nowait())
self._queue.task_done()
except Empty:
break
if funcs:
loop.run_until_complete(
asyncio.gather(
*[func(session=session) for func in funcs],
return_exceptions=True
)
)
else:
self._stop_event.wait(self._delay)
except Exception as e:
logging.error('Error: %s', str(e))


class LiveClient:
def __init__(self):
self._thread = LiveThread()

def init(self):
Broker().subscribe('teamwork_recieving', self.send)
Broker().subscribe('teamwork_sending', self.send)
Broker().subscribe('teamwork_deleting', self.delete)
self._thread.start()

@staticmethod
def is_enabled():
Expand All @@ -29,7 +77,7 @@ def get_urls():
return urls

def send(self, data):
logging.debug('LiveClient.send started, data = ' + str(data))
logging.debug('LiveClient.send started, data = %s', str(data))
if not self.is_enabled():
return

Expand All @@ -46,21 +94,18 @@ def send(self, data):
race_data = race().to_dict()
for url in urls:
if race().get_setting('live_results_enabled', False):
func = partial(
orgeo.create, requests, url, items, race_data, logging.root
)
Thread(target=func, name='LiveThread', daemon=True).start()
func = partial(orgeo.create, url, items, race_data, logging.root)
self._thread.send(func)

if race().get_setting('live_cp_enabled', False):
func = partial(
orgeo.create_online_cp,
requests,
url,
items,
race_data,
logging.root,
)
Thread(target=func, name='LiveThread_OnlineCP', daemon=True).start()
self._thread.send(func)

def delete(self, data):
if not self.is_enabled():
Expand All @@ -76,8 +121,8 @@ def delete(self, data):
urls = self.get_urls()
race_data = race().to_dict()
for url in urls:
func = partial(orgeo.delete, requests, url, items, race_data)
Thread(target=func, name='LiveThread', daemon=True).start()
func = partial(orgeo.delete, url, items, race_data, logging.root)
self._thread.send(func)


live_client = LiveClient()
129 changes: 50 additions & 79 deletions sportorg/modules/live/orgeo.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from re import subn
from typing import Any, Dict

from sportorg.utils.time import int_to_otime

LOG_MSG = 'HTTP Status: %s, Msg: %s'

RESULT_STATUS = [
'NONE',
'OK',
Expand All @@ -23,29 +26,17 @@


class Orgeo:
def __init__(self, requests, url, user_agent=None):
if not user_agent:
user_agent = 'SportOrg'
self.requests = requests
def __init__(self, session, url: str, user_agent: str = 'SportOrg'):
self.session = session
self._url = url
self._headers = {'User-Agent': user_agent}

def _get_url(self, text=''):
return '{}{}'.format(self._url, text)

def send(self, data):
response = self.requests.post(self._get_url(), headers=self._headers, json=data)
return response

def send_online_cp(self, chip, code, time):
url = self._get_url()
url += '&si=' + str(chip)
url += '&radio=' + str(code)
url += '&r=' + str(time)
url += '&fl=0'
async def send(self, data):
return await self.session.post(self._url, headers=self._headers, json=data)

response = self.requests.get(url, headers=self._headers)
return response
async def send_online_cp(self, chip, code, time):
url = f'{self._url}&si={chip}&radio={code}&r={time}&fl=0'
return await self.session.get(url, headers=self._headers)


def _get_obj(data, race_data, key, key_id):
Expand Down Expand Up @@ -186,12 +177,12 @@ def make_nice(s):
)[0]


def create(requests, url, data, race_data, log):
async def create(url, data, race_data, log, *, session):
"""
data is Dict: Person, Result, Group, Course, Organization
race_data is Dict: Race
"""
o = Orgeo(requests, url)
o = Orgeo(session, url)
is_start = False
group_i = 0
persons = []
Expand Down Expand Up @@ -227,30 +218,23 @@ def create(requests, url, data, race_data, log):
if group_i == len(race_data['groups']):
is_start = True
if persons:
obj_for_send = {'persons': persons}
obj_for_send: Dict[str, Any] = {'persons': persons}
if is_start:
obj_for_send['params'] = {'start_list': True}
try:
resp = o.send(obj_for_send)

if resp.status_code != 200:
log.error(
'HTTP Status: {}, Msg: {}'.format(
resp.status_code, make_nice(str(resp.content))
)
)
resp = await o.send(obj_for_send)

result_txt = make_nice(str(await resp.text()))
if resp.status != 200:
log.error(LOG_MSG, resp.status, result_txt)
else:
log.info(
'HTTP Status: {}, Msg: {}'.format(
resp.status_code, make_nice(str(resp.content))
)
)
log.info(LOG_MSG, resp.status, result_txt)

except Exception as e:
log.error(e)
log.error('Error: %s', str(e))


def create_online_cp(requests, url, data, race_data, log):
async def create_online_cp(url, data, race_data, log, *, session):
"""
data is Dict: Results
race_data is Dict: Race
Expand All @@ -259,7 +243,7 @@ def create_online_cp(requests, url, data, race_data, log):
if not race_data['settings']['live_cp_enabled']:
return

o = Orgeo(requests, url)
o = Orgeo(session, url)

for item in data:
if item['object'] in [
Expand All @@ -285,25 +269,16 @@ def create_online_cp(requests, url, data, race_data, log):
if card_number > 0:
code = race_data['settings']['live_cp_code']
finish_time = int_to_otime(res['finish_time'] // 10).to_str()
resp = o.send_online_cp(card_number, code, finish_time)
if resp.status_code != 200:
log.error(
'HTTP Status: {}, Msg: {}'.format(
resp.status_code, make_nice(str(resp.content))
)
)
resp = await o.send_online_cp(card_number, code, finish_time)

result_txt = make_nice(str(await resp.text()))

if resp.status != 200:
log.error(LOG_MSG, resp.status, result_txt)
else:
log.info(
'HTTP Status: {}, Msg: {}'.format(
resp.status_code, make_nice(str(resp.content))
)
)
log.info(LOG_MSG, resp.status, result_txt)
else:
log.info(
'HTTP Status: {}, Msg: {}'.format(
401, 'Ignoring empty card number'
)
)
log.info(LOG_MSG, 401, 'Ignoring empty card number')

if res and race_data['settings']['live_cp_splits_enabled']:
# send split as cp, codes of cp to send are set by the list
Expand All @@ -319,36 +294,23 @@ def create_online_cp(requests, url, data, race_data, log):
for split in res['splits']:
if split['code'] in codes:
split_time = int_to_otime(split['time'] // 10).to_str()
resp = o.send_online_cp(
resp = await o.send_online_cp(
card_number, split['code'], split_time
)
if resp.status_code != 200:
log.error(
'HTTP Status: {}, Msg: {}'.format(
resp.status_code,
make_nice(str(resp.content)),
)
)
result_txt = make_nice(str(await resp.text()))
if resp.status != 200:
log.error(LOG_MSG, resp.status, result_txt)
else:
log.info(
'HTTP Status: {}, Msg: {}'.format(
resp.status_code,
make_nice(str(resp.content)),
)
)
log.info(LOG_MSG, resp.status, result_txt)
else:
log.info(
'HTTP Status: {}, Msg: {}'.format(
401, 'Ignoring empty card number'
)
)
log.info(LOG_MSG, 401, 'Ignoring empty card number')

except Exception as e:
log.exception(e)
log.error('Error: %s', str(e))


def delete(requests, url, data, race_data):
o = Orgeo(requests, url)
async def delete(url, data, race_data, log, *, session):
o = Orgeo(session, url)
persons = []
for item in data:
if item['object'] == 'Person':
Expand All @@ -364,5 +326,14 @@ def delete(requests, url, data, race_data):
person_data = _get_person(item, race_data)
if person_data:
persons.append({'ref_id': person_data['id']})
if persons:
o.send({'persons': persons})
try:
if persons:
resp = await o.send({'persons': persons})
result_txt = make_nice(str(await resp.text()))
if resp.status != 200:
log.error(LOG_MSG, resp.status, result_txt)
else:
log.info(LOG_MSG, resp.status, result_txt)

except Exception as e:
log.error('Error: %s', str(e))
19 changes: 19 additions & 0 deletions tests/test_live.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import time

from sportorg.modules.live.live import LiveThread


def test_live_thread():
result = []

async def test(session):
assert session is not None
result.append(1)

live_thread = LiveThread()
live_thread.start()
live_thread.send(test)
time.sleep(1)
live_thread.stop()
live_thread.join()
assert result == [1]

0 comments on commit c040f87

Please sign in to comment.