Skip to content

Commit

Permalink
psp/steno padaj na timeouty (#289)
Browse files Browse the repository at this point in the history
Kdyz to poustim rucne (Praha/Frankfurt/Newark), nikdy to nepada, ale neco s tady mistnima runnerama si s tim nerozumi. Tak u stena muzu trivialne navysit timeout, u PSP to bude chtit vetsi zmenu.

Mozna to bude tim, ze to poustime paralelizovane, takze to otevira hodne spojeni na psp.cz

Mistni zmeny to nevyresily, ale je to trochu lepsi.
  • Loading branch information
kokes authored Mar 26, 2024
1 parent 1fe0896 commit 5970f33
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 7 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/partial.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:
pip install -r requirements.txt
- name: Run partial processing
id: main
if: matrix.module != 'eufondy'
if: matrix.module != 'eufondy' && !(matrix.module == 'psp' && matrix.os == 'windows-latest') && !(matrix.module == 'steno' && matrix.os == 'windows-latest')
run: |
python3 main.py --connstring sqlite:///data.db --partial ${{ matrix.module }}
- name: Run partial processing (broken jobs)
Expand Down
13 changes: 11 additions & 2 deletions data/psp/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,28 @@
import logging
import multiprocessing
import os
import shutil
import zipfile
from contextlib import contextmanager
from datetime import datetime
from io import TextIOWrapper
from tempfile import TemporaryDirectory
from urllib.request import urlretrieve
from urllib.request import urlopen

NULL_DATE = datetime(1900, 1, 1, 0, 0)
HTTP_TIMEOUT = 90


@contextmanager
def read_compressed(zipname, filename):
burl = "http://www.psp.cz/eknih/cdrom/opendata/{}"
with TemporaryDirectory() as tdir:
tfn = os.path.join(tdir, "tmp.zip")
urlretrieve(burl.format(zipname), tfn)
with open(tfn, "wb") as f, urlopen(
burl.format(zipname), timeout=HTTP_TIMEOUT
) as u:
shutil.copyfileobj(u, f)

with zipfile.ZipFile(tfn) as zf, zf.open(filename) as zfh:
# tisky.unl maj encoding chyby
yield TextIOWrapper(zfh, "cp1250", errors="ignore")
Expand Down Expand Up @@ -112,6 +118,9 @@ def main(outdir: str, partial: bool = False):

job = functools.partial(process_mapping, outdir, partial)
ncpu = multiprocessing.cpu_count()
if os.getenv("CI"):
logging.info("Pouze jedno CPU, abychom nepretizili psp.cz")
ncpu = 1
with multiprocessing.Pool(ncpu) as pool:
for tema, tabulka in pool.imap_unordered(job, mapping):
logging.info("hotovo: %s, %s", tema, tabulka)
Expand Down
10 changes: 8 additions & 2 deletions data/red/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from urllib.parse import urlparse
from urllib.request import Request, urlopen

HTTP_TIMEOUT = 60

# TODO: mozna nebude treba, mozna budou URL nemenne
DATASETS_GRAPHQL_QUERY = (
"""
Expand All @@ -34,7 +36,9 @@


def remote_csv(url):
with urlopen(url, timeout=30) as r, gzip.open(r, encoding="utf-8", mode="rt") as f:
with urlopen(url, timeout=HTTP_TIMEOUT) as r, gzip.open(
r, encoding="utf-8", mode="rt"
) as f:
cr = csv.DictReader((line.replace("\0", "") for line in f), strict=True)
yield from cr

Expand Down Expand Up @@ -119,7 +123,9 @@ def main(outdir: str, partial: bool = False):
req = Request("https://data.gov.cz/graphql")
req.add_header("content-type", "application/json")
with urlopen(
req, json.dumps({"query": DATASETS_GRAPHQL_QUERY}).encode(), timeout=10
req,
json.dumps({"query": DATASETS_GRAPHQL_QUERY}).encode(),
timeout=HTTP_TIMEOUT,
) as r:
distribution_urls = json.load(r)["data"]["datasets"]["data"]

Expand Down
9 changes: 7 additions & 2 deletions data/steno/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import lxml.html
from tqdm import tqdm

HTTP_TIMEOUT = 90


def clean_lines(rel_path):
return list(
Expand Down Expand Up @@ -132,7 +134,7 @@ def zpracuj_schuzi(outdir, params):
with tempfile.TemporaryDirectory() as tmpdir:
base_name = os.path.basename(urlparse(url).path)
tfn = os.path.join(tmpdir, base_name)
with urlopen(url, timeout=30) as r, open(tfn, "wb") as fw:
with urlopen(url, timeout=HTTP_TIMEOUT) as r, open(tfn, "wb") as fw:
shutil.copyfileobj(r, fw)
tdir = os.path.join(outdir, "psp")
os.makedirs(tdir, exist_ok=True)
Expand Down Expand Up @@ -178,7 +180,7 @@ def main(outdir: str, partial: bool = False):
logging.getLogger().setLevel(logging.INFO)
jobs = []
for rok, burl in urls.items():
with urlopen(burl, timeout=30) as r:
with urlopen(burl, timeout=HTTP_TIMEOUT) as r:
ht = lxml.html.parse(r).getroot()

for num, ln in enumerate(ht.cssselect("div#main-content a")):
Expand All @@ -188,6 +190,9 @@ def main(outdir: str, partial: bool = False):
jobs.append((rok, url))

ncpu = multiprocessing.cpu_count()
if os.getenv("CI"):
logging.info("Pouze jedno CPU, abychom nepretizili psp.cz")
ncpu = 1
func = functools.partial(zpracuj_schuzi, outdir)
lnm = Counter()
progress = tqdm(total=len(jobs))
Expand Down

0 comments on commit 5970f33

Please sign in to comment.