Skip to content

Commit

Permalink
Simplify X.509 CSR processing
Browse files Browse the repository at this point in the history
  • Loading branch information
jschlyter committed Dec 18, 2024
1 parent abda24b commit 3741d71
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 45 deletions.
4 changes: 2 additions & 2 deletions nodeman/internal_ca.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
CertificateInformation,
PrivateKey,
get_hash_algorithm_from_key,
verify_x509_csr,
verify_x509_csr_signature,
)


Expand Down Expand Up @@ -103,7 +103,7 @@ def sign_csr(self, csr: x509.CertificateSigningRequest, name: str) -> Certificat

self.logger.debug("Processing CSR from %s", name)

verify_x509_csr(csr=csr, name=name, validate_name=False)
verify_x509_csr_signature(csr=csr, name=name)

now = datetime.now(tz=timezone.utc)

Expand Down
5 changes: 3 additions & 2 deletions nodeman/step.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from jwcrypto.jwt import JWT

from .jose import jwk_to_alg
from .x509 import CertificateAuthorityClient, CertificateInformation, verify_x509_csr
from .x509 import CertificateAuthorityClient, CertificateInformation, verify_x509_csr_data, verify_x509_csr_signature


class StepClient(CertificateAuthorityClient):
Expand All @@ -39,7 +39,8 @@ def sign_csr(self, csr: x509.CertificateSigningRequest, name: str) -> Certificat

self.logger.debug("Processing CSR from %s", name)

verify_x509_csr(csr=csr, name=name, validate_name=True)
verify_x509_csr_signature(csr=csr, name=name)
verify_x509_csr_data(csr=csr, name=name)

csr_pem = csr.public_bytes(encoding=serialization.Encoding.PEM).decode()
token = self._get_token(name)
Expand Down
49 changes: 8 additions & 41 deletions nodeman/x509.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
import logging
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any

from cryptography import x509
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import ec, rsa
from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePrivateKey, EllipticCurvePublicKey
from cryptography.hazmat.primitives.asymmetric.ed448 import Ed448PrivateKey, Ed448PublicKey
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey, Ed25519PublicKey
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey, RSAPublicKey
from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePrivateKey
from cryptography.hazmat.primitives.asymmetric.ed448 import Ed448PrivateKey
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey
from cryptography.x509.oid import ExtensionOID, NameOID
from fastapi import HTTPException, Request, status

Expand Down Expand Up @@ -85,15 +84,8 @@ class SubjectAlternativeNameMismatchError(CertificateSigningRequestException):
pass


def verify_x509_csr(csr: x509.CertificateSigningRequest, name: str, validate_name: bool = True) -> None:
"""Verify X.509 CSR"""

verify_x509_csr_signature(csr=csr, name=name)
verify_x509_csr_data(csr=csr, name=name, validate_name=validate_name)


def verify_x509_csr_data(csr: x509.CertificateSigningRequest, name: str, validate_name: bool = True) -> None:
"""Verify X.509 CSR against name"""
def verify_x509_csr_data(csr: x509.CertificateSigningRequest, name: str) -> None:
"""Verify X.509 CSR data"""

# ensure Subject is correct
if len(csr.subject) != 1:
Expand All @@ -112,9 +104,6 @@ def verify_x509_csr_data(csr: x509.CertificateSigningRequest, name: str, validat
elif len(csr.extensions) > 1:
raise CertificateSigningRequestException("Multiple extensions")

if not csr.is_signature_valid:
raise CertificateSigningRequestException("Invalid CSR signature")

# ensure SubjectAlternativeName is correct
san_ext = csr.extensions.get_extension_for_oid(ExtensionOID.SUBJECT_ALTERNATIVE_NAME)
san_value = san_ext.value.get_values_for_type(x509.DNSName)
Expand All @@ -127,37 +116,15 @@ def verify_x509_csr_data(csr: x509.CertificateSigningRequest, name: str, validat
def verify_x509_csr_signature(csr: x509.CertificateSigningRequest, name: str) -> None:
"""Verify X.509 CSR signature"""

public_key = csr.public_key()
verify_kwargs: dict[str, Any] = {}

if isinstance(public_key, RSAPublicKey):
verify_kwargs = {
"algorithm": csr.signature_hash_algorithm,
"padding": csr.signature_algorithm_parameters,
}
elif isinstance(public_key, EllipticCurvePublicKey):
verify_kwargs = {
"signature_algorithm": ec.ECDSA(csr.signature_hash_algorithm),
}
elif isinstance(public_key, (Ed25519PublicKey, Ed448PublicKey)):
pass
else:
raise ValueError(f"Unsupported algorithm: {public_key}")

try:
public_key.verify(signature=csr.signature, data=csr.tbs_certrequest_bytes, **verify_kwargs)
except Exception as exc:
logger.error("CSR signature not valid: %s", exc, exc_info=exc)
raise ValueError("Invalid CSR signature") from exc
if not csr.is_signature_valid:
raise CertificateSigningRequestException("Invalid CSR signature")

logger.info("Verified CSR signature for %s", name)


def process_csr_request(request: Request, csr: x509.CertificateSigningRequest, name: str) -> NodeCertificate:
"""Verify CSR and issue certificate"""

verify_x509_csr_data(name=name, csr=csr)

try:
ca_response = request.app.ca_client.sign_csr(csr, name)
except Exception as exc:
Expand Down

0 comments on commit 3741d71

Please sign in to comment.