From bc980b5c12ef3f4ebd5cb1d0da4cfd1ace944524 Mon Sep 17 00:00:00 2001 From: Jakob Schlyter Date: Fri, 15 Nov 2024 15:53:04 +0100 Subject: [PATCH] add cleaned up code for Mozilla PSL from @morkrost --- dnstapir/dns/__init__.py | 0 dnstapir/dns/mozpsl.py | 135 +++++++++++++++++++++++++++++++++++++++ poetry.lock | 49 ++++++++++---- pyproject.toml | 1 + tests/test_dns_mozpsl.py | 26 ++++++++ 5 files changed, 198 insertions(+), 13 deletions(-) create mode 100644 dnstapir/dns/__init__.py create mode 100644 dnstapir/dns/mozpsl.py create mode 100644 tests/test_dns_mozpsl.py diff --git a/dnstapir/dns/__init__.py b/dnstapir/dns/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/dnstapir/dns/mozpsl.py b/dnstapir/dns/mozpsl.py new file mode 100644 index 0000000..db0d6de --- /dev/null +++ b/dnstapir/dns/mozpsl.py @@ -0,0 +1,135 @@ +import io +from typing import Self + +import httpx +import punycode + + +class TrieNode: + """ "Storage class for Trie""" + + def __init__(self) -> None: + self.count = 0 + self.icann: bool | None = None + self.children: dict[str, Self] = {} + + +class Trie: + def __init__(self) -> None: + self.root = TrieNode() + + def __repr__(self) -> str: + """Print full Trie structure""" + + def recur(node: TrieNode, indent: str): + return "".join( + indent + key + (f" {child.count}" if child.count else "") + recur(child, indent + " - ") + for key, child in node.children.items() + ) + + return recur(self.root, "\n") + + def insert(self, array: list[str], nlbl: int, icann: bool) -> None: + """Add data to Trie""" + node = self.root + for x in array: + if x in node.children: + node = node.children[x] + else: + child = TrieNode() + node.children[x] = child + node = child + node.count = nlbl + node.icann = icann + + def search(self, key: list[str]) -> tuple[int, int]: + """Search Trie""" + current = self.root + for label in key: + if current.icann: + core = current.count + else: + pcore = current.count + if label not in current.children: + if current.count != 0: + break + else: + raise KeyError + current = current.children[label] + if pcore == core: + pcore = 0 + return (core, pcore) + + +class PublicSuffixList: + """Mozilla Public Suffix List""" + + def __init__(self) -> None: + self.trie = Trie() + + def load_psl_url(self, url: str) -> None: + """Load PSL from URL""" + response = httpx.get( + url, + headers={ + "Accept-Encoding": "gzip", + }, + ) + response.raise_for_status() + self.load_psl(io.StringIO(response.text)) + + def load_psl(self, stream: io.StringIO) -> None: + """Load PSL from stream""" + icann = False + for line in stream: + line = line.rstrip() + + if "===BEGIN ICANN DOMAINS===" in line: + # Mark ICANN domains + icann = True + elif "===BEGIN PRIVATE DOMAINS===" in line: + # Mark PRIVATE domains + icann = False + + if (line.strip() == "") or (line[0] == "/"): + # Remove empty or comment lines + continue + + # Set number of labels in core domain + labels = len(line.split(".")) + 1 + + # Wildcards + if line[0] == "*": + line = line[2:] + + # Exclusions... .ck and .jp, just stop + if line[0] == "!": + line = line[1:] + labels -= 2 + + # Convert from Unicode + lbls = punycode.convert(line).split(".") + + # Store reversed + lbls.reverse() + + # Insert into Trie + self.trie.insert(lbls, labels, icann) + + def coredomain(self, domain: str) -> tuple[str, str]: + """Find ICANN and private name cut-off for domain""" + domain = domain.rstrip(".") + lbls = domain.split(".") + lbls.reverse() + c, p = self.trie.search(lbls) + core = lbls[0:c] + core.reverse() + pcore = lbls[0:p] + pcore.reverse() + return (".".join(core), ".".join(pcore)) + + def rdomain(self, rdomain: str) -> tuple[str, str]: + """Find ICANN and private name cut-off for domain, reverse order process""" + lbls = rdomain.split(".") + c, p = self.trie.search(lbls) + return (".".join(lbls[0:c]), ".".join(lbls[0:p])) diff --git a/poetry.lock b/poetry.lock index ada2644..04830b4 100644 --- a/poetry.lock +++ b/poetry.lock @@ -330,20 +330,20 @@ test-randomorder = ["pytest-randomly"] [[package]] name = "deprecated" -version = "1.2.14" +version = "1.2.15" description = "Python @deprecated decorator to deprecate old python classes, functions or methods." optional = true -python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" +python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,>=2.7" files = [ - {file = "Deprecated-1.2.14-py2.py3-none-any.whl", hash = "sha256:6fac8b097794a90302bdbb17b9b815e732d3c4720583ff1b198499d78470466c"}, - {file = "Deprecated-1.2.14.tar.gz", hash = "sha256:e5323eb936458dccc2582dc6f9c322c852a775a27065ff2b0c4970b9d53d01b3"}, + {file = "Deprecated-1.2.15-py2.py3-none-any.whl", hash = "sha256:353bc4a8ac4bfc96800ddab349d89c25dec1079f65fd53acdcc1e0b975b21320"}, + {file = "deprecated-1.2.15.tar.gz", hash = "sha256:683e561a90de76239796e6b6feac66b99030d2dd3fcf61ef996330f14bbb9b0d"}, ] [package.dependencies] wrapt = ">=1.10,<2" [package.extras] -dev = ["PyTest", "PyTest-Cov", "bump2version (<1)", "sphinx (<2)", "tox"] +dev = ["PyTest", "PyTest-Cov", "bump2version (<1)", "jinja2 (>=3.0.3,<3.1.0)", "setuptools", "sphinx (<2)", "tox"] [[package]] name = "dnspython" @@ -580,18 +580,15 @@ zstd = ["zstandard (>=0.18.0)"] [[package]] name = "idna" -version = "3.10" +version = "3.4" description = "Internationalized Domain Names in Applications (IDNA)" optional = false -python-versions = ">=3.6" +python-versions = ">=3.5" files = [ - {file = "idna-3.10-py3-none-any.whl", hash = "sha256:946d195a0d259cbba61165e88e65941f16e9b36ea6ddb97f00452bae8b1287d3"}, - {file = "idna-3.10.tar.gz", hash = "sha256:12f65c9b470abda6dc35cf8e63cc574b1c52b11df2c86030af0ac09b01b13ea9"}, + {file = "idna-3.4-py3-none-any.whl", hash = "sha256:90b77e79eaa3eba6de819a0c442c0b4ceefc341a7a2ab77d7562bf49f425c5c2"}, + {file = "idna-3.4.tar.gz", hash = "sha256:814f528e8dead7d329833b91c5faa87d60bf71824cd12a7530b5526063d02cb4"}, ] -[package.extras] -all = ["flake8 (>=7.1.1)", "mypy (>=1.11.2)", "pytest (>=8.3.2)", "ruff (>=0.6.2)"] - [[package]] name = "importlib-metadata" version = "8.5.0" @@ -980,6 +977,21 @@ files = [ {file = "protobuf-5.28.3.tar.gz", hash = "sha256:64badbc49180a5e401f373f9ce7ab1d18b63f7dd4a9cdc43c92b9f0b481cef7b"}, ] +[[package]] +name = "punycode" +version = "0.2.1" +description = "Punycode Converter Library for Python" +optional = false +python-versions = "*" +files = [ + {file = "punycode-0.2.1-py3-none-any.whl", hash = "sha256:5c6c2c1fcfdcd50752fe4e68d402495d9ae2a71ac73915bc6af39cc7ccd99eb5"}, + {file = "punycode-0.2.1.tar.gz", hash = "sha256:2619d4cc3d517f5b15f092d2e4108f37a44689514207f359ae6c5edf4fcc341d"}, +] + +[package.dependencies] +idna = "3.4" +wincertstore = "0.2" + [[package]] name = "pycparser" version = "2.22" @@ -1404,6 +1416,17 @@ h2 = ["h2 (>=4,<5)"] socks = ["pysocks (>=1.5.6,!=1.5.7,<2.0)"] zstd = ["zstandard (>=0.18.0)"] +[[package]] +name = "wincertstore" +version = "0.2" +description = "Python module to extract CA and CRL certs from Windows' cert store (ctypes based)." +optional = false +python-versions = "*" +files = [ + {file = "wincertstore-0.2-py2.py3-none-any.whl", hash = "sha256:22d5eebb52df88a8d4014d5cf6d1b6c3a5d469e6c3b2e2854f3a003e48872356"}, + {file = "wincertstore-0.2.zip", hash = "sha256:780bd1557c9185c15d9f4221ea7f905cb20b93f7151ca8ccaed9714dce4b327a"}, +] + [[package]] name = "wrapt" version = "1.16.0" @@ -1509,4 +1532,4 @@ opentelemetry = ["botocore", "fastapi", "opentelemetry-api", "opentelemetry-expo [metadata] lock-version = "2.0" python-versions = "^3.12" -content-hash = "0956219725bfe0ed8d870858f559baa914b75a11535e52d7c4a6be143ff10caa" +content-hash = "7838a6ecc014da58ff46b61e7b3bc663ecdcfe65d85ad72dbca146789920cd50" diff --git a/pyproject.toml b/pyproject.toml index e10c1e6..4f80993 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,6 +22,7 @@ httpx = { version = ">=0.27.2", optional = true } pydantic = { version = "^2.9.2", optional = true } pymongo = { version = "^4.10.1", optional = true } redis = { version = "^5.1.1", optional = true } +punycode = "^0.2.1" [tool.poetry.group.dev.dependencies] pytest = "^8.2.0" diff --git a/tests/test_dns_mozpsl.py b/tests/test_dns_mozpsl.py new file mode 100644 index 0000000..4bfad5d --- /dev/null +++ b/tests/test_dns_mozpsl.py @@ -0,0 +1,26 @@ +import pytest + +from dnstapir.dns.mozpsl import PublicSuffixList + +MOZ_PSL = "https://publicsuffix.org/list/public_suffix_list.dat" + + +def test_mozpsl(): + psl = PublicSuffixList() + psl.load_psl_url(url=MOZ_PSL) + + assert psl.coredomain("www.ck.") == ("www.ck", "") + assert psl.coredomain("www.something.gov.ck.") == ("something.gov.ck", "") + assert psl.coredomain("www.something.or.other.microsoft.com.") == ("microsoft.com", "") + assert psl.coredomain("www.something.or.other.microsoft.com.br.") == ("microsoft.com.br", "") + assert psl.coredomain("www.something.emrstudio-prod.us-gov-east-1.amazonaws.com.") == ( + "amazonaws.com", + "something.emrstudio-prod.us-gov-east-1.amazonaws.com", + ) + assert psl.rdomain("com.amazonaws.us-gov-east-1.emrstudio-prod.www.something.emrstudio-prod") == ( + "com.amazonaws", + "com.amazonaws.us-gov-east-1.emrstudio-prod.www", + ) + + with pytest.raises(KeyError): + psl.coredomain("local.")