From 5970f337e91b376fac7383bad903082ec55b5f00 Mon Sep 17 00:00:00 2001 From: Ondrej Kokes Date: Tue, 26 Mar 2024 10:47:53 +0100 Subject: [PATCH] psp/steno padaj na timeouty (#289) Kdyz to poustim rucne (Praha/Frankfurt/Newark), nikdy to nepada, ale neco s tady mistnima runnerama si s tim nerozumi. Tak u stena muzu trivialne navysit timeout, u PSP to bude chtit vetsi zmenu. Mozna to bude tim, ze to poustime paralelizovane, takze to otevira hodne spojeni na psp.cz Mistni zmeny to nevyresily, ale je to trochu lepsi. --- .github/workflows/partial.yaml | 2 +- data/psp/main.py | 13 +++++++++++-- data/red/main.py | 10 ++++++++-- data/steno/main.py | 9 +++++++-- 4 files changed, 27 insertions(+), 7 deletions(-) diff --git a/.github/workflows/partial.yaml b/.github/workflows/partial.yaml index 0e7539c..ff79b58 100644 --- a/.github/workflows/partial.yaml +++ b/.github/workflows/partial.yaml @@ -31,7 +31,7 @@ jobs: pip install -r requirements.txt - name: Run partial processing id: main - if: matrix.module != 'eufondy' + if: matrix.module != 'eufondy' && !(matrix.module == 'psp' && matrix.os == 'windows-latest') && !(matrix.module == 'steno' && matrix.os == 'windows-latest') run: | python3 main.py --connstring sqlite:///data.db --partial ${{ matrix.module }} - name: Run partial processing (broken jobs) diff --git a/data/psp/main.py b/data/psp/main.py index 39e9899..a7d499a 100644 --- a/data/psp/main.py +++ b/data/psp/main.py @@ -7,14 +7,16 @@ import logging import multiprocessing import os +import shutil import zipfile from contextlib import contextmanager from datetime import datetime from io import TextIOWrapper from tempfile import TemporaryDirectory -from urllib.request import urlretrieve +from urllib.request import urlopen NULL_DATE = datetime(1900, 1, 1, 0, 0) +HTTP_TIMEOUT = 90 @contextmanager @@ -22,7 +24,11 @@ def read_compressed(zipname, filename): burl = "http://www.psp.cz/eknih/cdrom/opendata/{}" with TemporaryDirectory() as tdir: tfn = os.path.join(tdir, "tmp.zip") - urlretrieve(burl.format(zipname), tfn) + with open(tfn, "wb") as f, urlopen( + burl.format(zipname), timeout=HTTP_TIMEOUT + ) as u: + shutil.copyfileobj(u, f) + with zipfile.ZipFile(tfn) as zf, zf.open(filename) as zfh: # tisky.unl maj encoding chyby yield TextIOWrapper(zfh, "cp1250", errors="ignore") @@ -112,6 +118,9 @@ def main(outdir: str, partial: bool = False): job = functools.partial(process_mapping, outdir, partial) ncpu = multiprocessing.cpu_count() + if os.getenv("CI"): + logging.info("Pouze jedno CPU, abychom nepretizili psp.cz") + ncpu = 1 with multiprocessing.Pool(ncpu) as pool: for tema, tabulka in pool.imap_unordered(job, mapping): logging.info("hotovo: %s, %s", tema, tabulka) diff --git a/data/red/main.py b/data/red/main.py index d7f3f58..849f6b1 100755 --- a/data/red/main.py +++ b/data/red/main.py @@ -9,6 +9,8 @@ from urllib.parse import urlparse from urllib.request import Request, urlopen +HTTP_TIMEOUT = 60 + # TODO: mozna nebude treba, mozna budou URL nemenne DATASETS_GRAPHQL_QUERY = ( """ @@ -34,7 +36,9 @@ def remote_csv(url): - with urlopen(url, timeout=30) as r, gzip.open(r, encoding="utf-8", mode="rt") as f: + with urlopen(url, timeout=HTTP_TIMEOUT) as r, gzip.open( + r, encoding="utf-8", mode="rt" + ) as f: cr = csv.DictReader((line.replace("\0", "") for line in f), strict=True) yield from cr @@ -119,7 +123,9 @@ def main(outdir: str, partial: bool = False): req = Request("https://data.gov.cz/graphql") req.add_header("content-type", "application/json") with urlopen( - req, json.dumps({"query": DATASETS_GRAPHQL_QUERY}).encode(), timeout=10 + req, + json.dumps({"query": DATASETS_GRAPHQL_QUERY}).encode(), + timeout=HTTP_TIMEOUT, ) as r: distribution_urls = json.load(r)["data"]["datasets"]["data"] diff --git a/data/steno/main.py b/data/steno/main.py index 77bb4c1..dd81010 100644 --- a/data/steno/main.py +++ b/data/steno/main.py @@ -16,6 +16,8 @@ import lxml.html from tqdm import tqdm +HTTP_TIMEOUT = 90 + def clean_lines(rel_path): return list( @@ -132,7 +134,7 @@ def zpracuj_schuzi(outdir, params): with tempfile.TemporaryDirectory() as tmpdir: base_name = os.path.basename(urlparse(url).path) tfn = os.path.join(tmpdir, base_name) - with urlopen(url, timeout=30) as r, open(tfn, "wb") as fw: + with urlopen(url, timeout=HTTP_TIMEOUT) as r, open(tfn, "wb") as fw: shutil.copyfileobj(r, fw) tdir = os.path.join(outdir, "psp") os.makedirs(tdir, exist_ok=True) @@ -178,7 +180,7 @@ def main(outdir: str, partial: bool = False): logging.getLogger().setLevel(logging.INFO) jobs = [] for rok, burl in urls.items(): - with urlopen(burl, timeout=30) as r: + with urlopen(burl, timeout=HTTP_TIMEOUT) as r: ht = lxml.html.parse(r).getroot() for num, ln in enumerate(ht.cssselect("div#main-content a")): @@ -188,6 +190,9 @@ def main(outdir: str, partial: bool = False): jobs.append((rok, url)) ncpu = multiprocessing.cpu_count() + if os.getenv("CI"): + logging.info("Pouze jedno CPU, abychom nepretizili psp.cz") + ncpu = 1 func = functools.partial(zpracuj_schuzi, outdir) lnm = Counter() progress = tqdm(total=len(jobs))