diff --git a/nodeman/jose.py b/nodeman/jose.py index 1f7edfd..193a611 100644 --- a/nodeman/jose.py +++ b/nodeman/jose.py @@ -1,5 +1,6 @@ from typing import Annotated +from jwcrypto.common import base64url_decode from jwcrypto.jwk import JWK from pydantic import BaseModel, Field from pydantic.types import StringConstraints @@ -91,3 +92,17 @@ def jwk_to_alg(key: JWK) -> str: case ("OKP", "Ed448"): return "EdDSA" raise ValueError(f"Unsupported key type: {kty}" + (f" with curve: {crv}" if crv else "")) + + +def generate_similar_jwk(key: JWK) -> JWK: + """Generate similar JWK""" + + params = {param: key.get(param) for param in ["kty", "crv"] if param in key} + match key.get("kty"): + case "RSA": + params["size"] = key._get_public_key().key_size + case "oct": + params["size"] = len(base64url_decode(key.k)) * 8 + case _: + pass + return JWK.generate(**params) diff --git a/nodeman/x509.py b/nodeman/x509.py index fa7a37a..e061427 100644 --- a/nodeman/x509.py +++ b/nodeman/x509.py @@ -1,6 +1,7 @@ import logging from abc import ABC, abstractmethod from dataclasses import dataclass +from datetime import datetime, timedelta, timezone from cryptography import x509 from cryptography.hazmat.primitives import hashes, serialization @@ -161,3 +162,49 @@ def process_csr_request(request: Request, csr: x509.CertificateSigningRequest, n x509_ca_certificate=x509_ca_certificate_pem, x509_certificate_serial_number=x509_certificate_serial_number, ) + + +def generate_ca_certificate( + issuer_ca_name: x509.Name, + issuer_ca_private_key: PrivateKey, + root_ca_name: x509.Name | None = None, + root_ca_private_key: PrivateKey | None = None, + validity_days: int = 1, +) -> x509.Certificate: + """Generate CA Certificate""" + + now = datetime.now(tz=timezone.utc) + validity = timedelta(days=validity_days) + + builder = x509.CertificateBuilder() + builder = builder.subject_name(issuer_ca_name) + builder = builder.issuer_name(root_ca_name or issuer_ca_name) + builder = builder.not_valid_before(now) + builder = builder.not_valid_after(now + validity) + builder = builder.serial_number(x509.random_serial_number()) + builder = builder.public_key(issuer_ca_private_key.public_key()) + builder = builder.add_extension( + x509.BasicConstraints(ca=True, path_length=None), + critical=True, + ) + builder = builder.add_extension( + x509.KeyUsage( + digital_signature=True, + content_commitment=False, + key_encipherment=False, + data_encipherment=False, + key_agreement=False, + key_cert_sign=True, + crl_sign=True, + encipher_only=False, + decipher_only=False, + ), + critical=True, + ) + + private_key = root_ca_private_key or issuer_ca_private_key + + return builder.sign( + private_key=private_key, + algorithm=get_hash_algorithm_from_key(private_key), + ) diff --git a/tests/test_api.py b/tests/test_api.py index 2900bbb..53637cc 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -19,12 +19,16 @@ from pydantic_settings import SettingsConfigDict from nodeman.internal_ca import InternalCertificateAuthority -from nodeman.jose import jwk_to_alg +from nodeman.jose import generate_similar_jwk, jwk_to_alg from nodeman.models import PublicKeyFormat from nodeman.server import NodemanServer from nodeman.settings import Settings -from nodeman.x509 import RSA_EXPONENT, CertificateAuthorityClient, generate_x509_csr -from tests.utils import generate_ca_certificate, rekey +from nodeman.x509 import ( + RSA_EXPONENT, + CertificateAuthorityClient, + generate_ca_certificate, + generate_x509_csr, +) ADMIN_TEST_NODE_COUNT = 100 BACKEND_CREDENTIALS = ("username", "password") @@ -171,7 +175,7 @@ def _test_enroll(data_key: JWK, x509_key: PrivateKey, requested_name: str | None } jws = JWS(payload=json.dumps(payload)) - jws.add_signature(key=rekey(data_key), alg=data_alg, protected={"alg": data_alg}) + jws.add_signature(key=generate_similar_jwk(data_key), alg=data_alg, protected={"alg": data_alg}) renew_request = json.loads(jws.serialize()) response = client.post(f"{node_url}/renew", json=renew_request) diff --git a/tests/test_internal_ca.py b/tests/test_internal_ca.py index 9028720..7f4e265 100644 --- a/tests/test_internal_ca.py +++ b/tests/test_internal_ca.py @@ -11,8 +11,14 @@ from cryptography.x509.oid import NameOID from nodeman.internal_ca import InternalCertificateAuthority -from nodeman.x509 import RSA_EXPONENT, CertificateInformation, PrivateKey, generate_similar_key, generate_x509_csr -from tests.utils import generate_ca_certificate +from nodeman.x509 import ( + RSA_EXPONENT, + CertificateInformation, + PrivateKey, + generate_ca_certificate, + generate_similar_key, + generate_x509_csr, +) def _verify_certification_information(res: CertificateInformation) -> None: diff --git a/tests/utils.py b/tests/utils.py deleted file mode 100644 index 19ac54e..0000000 --- a/tests/utils.py +++ /dev/null @@ -1,66 +0,0 @@ -from datetime import datetime, timedelta, timezone - -from cryptography import x509 -from jwcrypto.common import base64url_decode -from jwcrypto.jwk import JWK - -from nodeman.x509 import PrivateKey, get_hash_algorithm_from_key - - -def rekey(key: JWK) -> JWK: - """Generate similar key""" - params = {param: key.get(param) for param in ["kty", "crv"] if param in key} - match key.get("kty"): - case "RSA": - params["size"] = key._get_public_key().key_size - case "oct": - params["size"] = len(base64url_decode(key.k)) * 8 - case _: - pass - return JWK.generate(**params) - - -def generate_ca_certificate( - issuer_ca_name: x509.Name, - issuer_ca_private_key: PrivateKey, - root_ca_name: x509.Name | None = None, - root_ca_private_key: PrivateKey | None = None, - validity_days: int = 1, -) -> x509.Certificate: - """Generate CA Certificate""" - - now = datetime.now(tz=timezone.utc) - validity = timedelta(days=validity_days) - - builder = x509.CertificateBuilder() - builder = builder.subject_name(issuer_ca_name) - builder = builder.issuer_name(root_ca_name or issuer_ca_name) - builder = builder.not_valid_before(now) - builder = builder.not_valid_after(now + validity) - builder = builder.serial_number(x509.random_serial_number()) - builder = builder.public_key(issuer_ca_private_key.public_key()) - builder = builder.add_extension( - x509.BasicConstraints(ca=True, path_length=None), - critical=True, - ) - builder = builder.add_extension( - x509.KeyUsage( - digital_signature=True, - content_commitment=False, - key_encipherment=False, - data_encipherment=False, - key_agreement=False, - key_cert_sign=True, - crl_sign=True, - encipher_only=False, - decipher_only=False, - ), - critical=True, - ) - - private_key = root_ca_private_key or issuer_ca_private_key - - return builder.sign( - private_key=private_key, - algorithm=get_hash_algorithm_from_key(private_key), - )