Skip to content

Commit

Permalink
X.509 cleanup (#41)
Browse files Browse the repository at this point in the history
* Sign issued certificate wih the same hash algorithm as the issuer CA certificate

* Clean up get_hash_algorithm_from_key

* move helpers
  • Loading branch information
jschlyter authored Dec 19, 2024
1 parent 65d9e1c commit d13d42a
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 79 deletions.
7 changes: 4 additions & 3 deletions nodeman/internal_ca.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
CertificateAuthorityClient,
CertificateInformation,
PrivateKey,
get_hash_algorithm_from_key,
verify_x509_csr_signature,
)

Expand Down Expand Up @@ -59,7 +58,6 @@ def __init__(
self.root_ca_certificate = root_ca_certificate or issuer_ca_certificate
self.time_skew = time_skew or timedelta(minutes=10)
self.validity = validity or timedelta(days=validity_days)
self.signature_hash_algorithm = get_hash_algorithm_from_key(self.issuer_ca_private_key)

@classmethod
def load(
Expand Down Expand Up @@ -128,7 +126,10 @@ def sign_csr(self, csr: x509.CertificateSigningRequest, name: str) -> Certificat
critical=True,
)

certificate = builder.sign(private_key=self.issuer_ca_private_key, algorithm=self.signature_hash_algorithm)
certificate = builder.sign(
private_key=self.issuer_ca_private_key,
algorithm=self.issuer_ca_certificate.signature_hash_algorithm,
)

if self.root_ca_certificate != self.issuer_ca_certificate:
cert_chain = [certificate, self.issuer_ca_certificate]
Expand Down
15 changes: 15 additions & 0 deletions nodeman/jose.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import Annotated

from jwcrypto.common import base64url_decode
from jwcrypto.jwk import JWK
from pydantic import BaseModel, Field
from pydantic.types import StringConstraints
Expand Down Expand Up @@ -91,3 +92,17 @@ def jwk_to_alg(key: JWK) -> str:
case ("OKP", "Ed448"):
return "EdDSA"
raise ValueError(f"Unsupported key type: {kty}" + (f" with curve: {crv}" if crv else ""))


def generate_similar_jwk(key: JWK) -> JWK:
"""Generate similar JWK"""

params = {param: key.get(param) for param in ["kty", "crv"] if param in key}
match key.get("kty"):
case "RSA":
params["size"] = key._get_public_key().key_size
case "oct":
params["size"] = len(base64url_decode(key.k)) * 8
case _:
pass
return JWK.generate(**params)
59 changes: 55 additions & 4 deletions nodeman/x509.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone

from cryptography import x509
from cryptography.hazmat.primitives import hashes, serialization
Expand Down Expand Up @@ -33,11 +34,15 @@ def sign_csr(self, csr: x509.CertificateSigningRequest, name: str) -> Certificat


def get_hash_algorithm_from_key(key: PrivateKey) -> hashes.HashAlgorithm | None:
if isinstance(key, (Ed25519PrivateKey, Ed448PrivateKey)):
"""Get hash algorithm for private key"""
if isinstance(key, RSAPrivateKey):
return hashes.SHA256()
elif isinstance(key, EllipticCurvePrivateKey):
return hashes.SHA384() if isinstance(key.curve, ec.SECP384R1) else hashes.SHA256()
elif isinstance(key, (Ed25519PrivateKey, Ed448PrivateKey)):
return None
if isinstance(key, EllipticCurvePrivateKey) and isinstance(key.curve, ec.SECP384R1):
return hashes.SHA384()
return hashes.SHA256()
else:
raise ValueError("Unsupported private key type")


def generate_x509_csr(name: str, key: PrivateKey) -> x509.CertificateSigningRequest:
Expand Down Expand Up @@ -157,3 +162,49 @@ def process_csr_request(request: Request, csr: x509.CertificateSigningRequest, n
x509_ca_certificate=x509_ca_certificate_pem,
x509_certificate_serial_number=x509_certificate_serial_number,
)


def generate_ca_certificate(
issuer_ca_name: x509.Name,
issuer_ca_private_key: PrivateKey,
root_ca_name: x509.Name | None = None,
root_ca_private_key: PrivateKey | None = None,
validity_days: int = 1,
) -> x509.Certificate:
"""Generate CA Certificate"""

now = datetime.now(tz=timezone.utc)
validity = timedelta(days=validity_days)

builder = x509.CertificateBuilder()
builder = builder.subject_name(issuer_ca_name)
builder = builder.issuer_name(root_ca_name or issuer_ca_name)
builder = builder.not_valid_before(now)
builder = builder.not_valid_after(now + validity)
builder = builder.serial_number(x509.random_serial_number())
builder = builder.public_key(issuer_ca_private_key.public_key())
builder = builder.add_extension(
x509.BasicConstraints(ca=True, path_length=None),
critical=True,
)
builder = builder.add_extension(
x509.KeyUsage(
digital_signature=True,
content_commitment=False,
key_encipherment=False,
data_encipherment=False,
key_agreement=False,
key_cert_sign=True,
crl_sign=True,
encipher_only=False,
decipher_only=False,
),
critical=True,
)

private_key = root_ca_private_key or issuer_ca_private_key

return builder.sign(
private_key=private_key,
algorithm=get_hash_algorithm_from_key(private_key),
)
12 changes: 8 additions & 4 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@
from pydantic_settings import SettingsConfigDict

from nodeman.internal_ca import InternalCertificateAuthority
from nodeman.jose import jwk_to_alg
from nodeman.jose import generate_similar_jwk, jwk_to_alg
from nodeman.models import PublicKeyFormat
from nodeman.server import NodemanServer
from nodeman.settings import Settings
from nodeman.x509 import RSA_EXPONENT, CertificateAuthorityClient, generate_x509_csr
from tests.utils import generate_ca_certificate, rekey
from nodeman.x509 import (
RSA_EXPONENT,
CertificateAuthorityClient,
generate_ca_certificate,
generate_x509_csr,
)

ADMIN_TEST_NODE_COUNT = 100
BACKEND_CREDENTIALS = ("username", "password")
Expand Down Expand Up @@ -171,7 +175,7 @@ def _test_enroll(data_key: JWK, x509_key: PrivateKey, requested_name: str | None
}

jws = JWS(payload=json.dumps(payload))
jws.add_signature(key=rekey(data_key), alg=data_alg, protected={"alg": data_alg})
jws.add_signature(key=generate_similar_jwk(data_key), alg=data_alg, protected={"alg": data_alg})
renew_request = json.loads(jws.serialize())

response = client.post(f"{node_url}/renew", json=renew_request)
Expand Down
10 changes: 8 additions & 2 deletions tests/test_internal_ca.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,14 @@
from cryptography.x509.oid import NameOID

from nodeman.internal_ca import InternalCertificateAuthority
from nodeman.x509 import RSA_EXPONENT, CertificateInformation, PrivateKey, generate_similar_key, generate_x509_csr
from tests.utils import generate_ca_certificate
from nodeman.x509 import (
RSA_EXPONENT,
CertificateInformation,
PrivateKey,
generate_ca_certificate,
generate_similar_key,
generate_x509_csr,
)


def _verify_certification_information(res: CertificateInformation) -> None:
Expand Down
66 changes: 0 additions & 66 deletions tests/utils.py

This file was deleted.

0 comments on commit d13d42a

Please sign in to comment.