Skip to content

Commit

Permalink
[justice] historické datasety (#284)
Browse files Browse the repository at this point in the history
  • Loading branch information
kokes authored Feb 13, 2024
1 parent f0b486f commit 5e5c488
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 53 deletions.
11 changes: 10 additions & 1 deletion data/justice/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,13 @@ Implementační detaily:

- Na portálu MSp není seznam souborů ke stažení, používáme neveřejné API, které seznam souborů obsahuje. Časem by mělo dojít k nápravě.
- Data na webu jsou i v CSV, ale tato CSV jsou prakticky nepoužitelná.
- Zaniklé subjekty jsou prozatím nezpracované, protože způsob jejich exportu je naprosto nepraktický.

## Zaniklé subjekty

V úplném výpisu jsou jen živé subjekty a pak entity smazané _v daný kalendářní rok_, takže teď (únor 2024) je v datasetech
o mnoho méně dat než bylo v prosinci. Pokud tedy potřebujete historické subjekty, je třeba stáhnout staré exporty.

Tyto exporty jsou nabízené, ale protože tohle je křehká (kvůli kvalitě linky MSp) a pomalá věc, není to default. Defaultně
se berou jen nejnovější exporty, je možné to ovlivnit přes enviromentální proměnnou `CURRENT_YEAR_ONLY=0`. Doporučuju
použít taky `CACHE_ENABLED=1`, abyste nebyli na pospas linkám MSp, ale aby se vám lokálně ukládala data, než vám to celé
doběhne (a tedy retry bude stahovat jen nikdy nestažená data). Cache je nutné vypnout, pokud chcete aktualizovat data.
122 changes: 72 additions & 50 deletions data/justice/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,19 @@
import os
import re
import shutil
import time
from collections import defaultdict
from urllib.parse import urlparse
from urllib.request import urlopen

import lxml.etree
from tqdm import tqdm

NON_ISO_DATUM = re.compile(r"^(\d{1,2})[\.\-](\d{1,2})[\.\-](\d{4})$")
HTTP_TIMEOUT = 60
HTTP_TIMEOUT = 180
CACHE_DIR = "cache"
CACHE_ENABLED = bool(int(os.environ.get("CACHE_ENABLED", "0")))
CURRENT_YEAR_ONLY = bool(int(os.environ.get("CURRENT_YEAR_ONLY", "1")))


def gen_schema(element, parent=None):
Expand Down Expand Up @@ -104,7 +107,7 @@ def nahraj_ds(url):
yield from et


def zpracuj_ds(url, schemas, outdir, partial, autogen):
def zpracuj_ds(url, schemas, outdir, partial, autogen, icos):
et = nahraj_ds(url)

fs, csvs, schemasd = dict(), dict(), dict()
Expand Down Expand Up @@ -138,7 +141,6 @@ def zpracuj_ds(url, schemas, outdir, partial, autogen):
fs[udaj] = f
csvs[udaj] = cw

icos = set() # TODO: tohle budem muset dostavat jak input (z minulych let)
for num, (action, el) in enumerate(et):
if partial and num > 1e5:
break
Expand All @@ -160,11 +162,11 @@ def zpracuj_ds(url, schemas, outdir, partial, autogen):
# fw.write(f"{nazev}\t{el.sourceline}\t{url}\n")
continue

# TODO: kdyz zpracovavame data starsi nez letosni, musime
# kdyz zpracovavame data starsi nez letosni, musime
# zahazovat jiz zpracovana data
# if ico in icos:
# el.clear()
# continue
if ico in icos:
el.clear()
continue
icos.add(ico)

csvs["subjekty"].writerow([ico, nazev, zapis, vymaz])
Expand Down Expand Up @@ -251,59 +253,79 @@ def main(outdir: str, partial: bool = False):
assert data["success"]

dss = [ds for ds in data["result"] if "-full-" in ds]
print(f"celkem {len(dss)} datasetu, ale filtruji jen na ty letosni")
# TODO: abychom zvladli i ty minuly roky, budem muset udelat batche po
# letech a udelat dycky kazdej rok zvlast a predavat si seznam zpracovanych ICO
dss = [j for j in dss if int(j.rpartition("-")[-1]) == dt.date.today().year]
print(f"po odfiltrovani {len(dss)} datasetu")
dss.sort(key=lambda x: int(x.rpartition("-")[-1]), reverse=True)

urls = []
for j, ds in enumerate(tqdm(dss)):
if partial and len(urls) > 20:
break
url = f"https://dataor.justice.cz/api/3/action/package_show?id={ds}"
with cached_urlopen(url, timeout=HTTP_TIMEOUT) as r:
dtp = json.load(r)
assert dtp["success"]
ds_url = [
j["url"] for j in dtp["result"]["resources"] if j["url"].endswith(".xml.gz")
]
assert len(ds_url) == 1

# mohli bychom to omezit jen na mensi soubory, ale radsi
# prectu trosku z vicero dat
# if partial:
# req = urlopen(ds_url[0])
# if int(req.headers.get("Content-Length")) > 10_000_000:
# continue

urls.append(ds_url[0])
print(f"celkem {len(dss)} datasetu")

dsm = defaultdict(list)
for ds in dss:
year = ds.rpartition("-")[-1]
dsm[year].append(ds)

years = sorted(dsm.keys(), reverse=True)
print(f"mame data pro roky: {years}")
if CURRENT_YEAR_ONLY:
print(f"zpracovavame jen rok {years[0]}")
years = years[:1]

cdir = os.path.dirname(os.path.abspath(__file__))
with open(os.path.join(cdir, "xml_schema.json"), encoding="utf-8") as f:
schemas = json.load(f)

# samotna multiprocessing.queue z nejakyho duvodu nefungovala
autogen = multiprocessing.Manager().Queue()
zpracuj = functools.partial(
zpracuj_ds,
schemas=schemas,
outdir=outdir,
partial=partial,
autogen=autogen,
)
progress = tqdm(total=len(urls))

# TODO: chcem fakt jet naplno? co kdyz budem parametrizovat jednotlivy moduly?
ncpu = multiprocessing.cpu_count()

# chcem frontloadovat nejvetsi datasety, abychom optimalizovali runtime
# mohli bychom HEADnout ty soubory, ale najit sro/as je rychlejsi a good enough
urls.sort(key=lambda x: int("/sro" in x or "/as" in x), reverse=True)
with multiprocessing.Pool(ncpu) as pool:
for _, _ in pool.imap_unordered(zpracuj, urls):
# logging.debug(url)?
progress.update(n=1)
processed = set()
for year in years:
dss = dsm[year]

urls = []
for j, ds in enumerate(tqdm(dss, desc=f"{year} meta")):
if partial and len(urls) > 20:
break
url = f"https://dataor.justice.cz/api/3/action/package_show?id={ds}"
with cached_urlopen(url, timeout=HTTP_TIMEOUT) as r:
dtp = json.load(r)
assert dtp["success"]
ds_url = [
j["url"]
for j in dtp["result"]["resources"]
if j["url"].endswith(".xml.gz")
]
assert len(ds_url) == 1

# mohli bychom to omezit jen na mensi soubory, ale radsi
# prectu trosku z vicero dat
# if partial:
# req = urlopen(ds_url[0])
# if int(req.headers.get("Content-Length")) > 10_000_000:
# continue

urls.append(ds_url[0])

progress = tqdm(total=len(urls), desc=f"{year} data")

zpracuj = functools.partial(
zpracuj_ds,
schemas=schemas,
outdir=outdir,
partial=partial,
autogen=autogen,
icos=set(list(processed)), # bojim se konkurence
)

year_icos = set()
t = time.time()
with multiprocessing.Pool(ncpu) as pool:
for _, icos in pool.imap_unordered(zpracuj, urls):
# logging.debug(url)?
progress.update(n=1)
year_icos.update(icos)

progress.close()
processed.update(year_icos)
print(f"zpracovano {year} za {time.time() - t:.2f}s")

# nezpracovany objekty je treba rucne projit
schema_autogen = dict()
Expand Down
6 changes: 4 additions & 2 deletions data/justice/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@
Column("ico", ForeignKey("subjekty.ico"), nullable=False),
Column("datum_zapis", Date, nullable=False),
Column("datum_vymaz", Date, nullable=True),
Column("text", Text, nullable=False),
Column("text", Text, nullable=True),
),
Table(
"pravni_forma_text",
Expand Down Expand Up @@ -466,7 +466,9 @@
Column("datum_zapis", Date, nullable=False),
Column("datum_vymaz", Date, nullable=True),
Column("vklad_typ", Text, nullable=False),
Column("vklad", Numeric, nullable=False),
# nechtel se mi delat vlastni parser, prece jen tu muzou dat cokoliv
# co mi to rozbilo: 199.004.000,- Kč; 159.203.000,- Kč; 5,5 miliardy Kč
Column("vklad", Text, nullable=False),
),
Table(
"evidence",
Expand Down

0 comments on commit 5e5c488

Please sign in to comment.