diff --git a/nodeman/internal_ca.py b/nodeman/internal_ca.py index 0f3a21b..01206ac 100644 --- a/nodeman/internal_ca.py +++ b/nodeman/internal_ca.py @@ -18,7 +18,7 @@ CertificateInformation, PrivateKey, get_hash_algorithm_from_key, - verify_x509_csr, + verify_x509_csr_signature, ) @@ -103,7 +103,7 @@ def sign_csr(self, csr: x509.CertificateSigningRequest, name: str) -> Certificat self.logger.debug("Processing CSR from %s", name) - verify_x509_csr(csr=csr, name=name, validate_name=False) + verify_x509_csr_signature(csr=csr, name=name) now = datetime.now(tz=timezone.utc) diff --git a/nodeman/step.py b/nodeman/step.py index 8ddb597..cd31b77 100644 --- a/nodeman/step.py +++ b/nodeman/step.py @@ -13,7 +13,7 @@ from jwcrypto.jwt import JWT from .jose import jwk_to_alg -from .x509 import CertificateAuthorityClient, CertificateInformation, verify_x509_csr +from .x509 import CertificateAuthorityClient, CertificateInformation, verify_x509_csr_data, verify_x509_csr_signature class StepClient(CertificateAuthorityClient): @@ -39,7 +39,8 @@ def sign_csr(self, csr: x509.CertificateSigningRequest, name: str) -> Certificat self.logger.debug("Processing CSR from %s", name) - verify_x509_csr(csr=csr, name=name, validate_name=True) + verify_x509_csr_signature(csr=csr, name=name) + verify_x509_csr_data(csr=csr, name=name) csr_pem = csr.public_bytes(encoding=serialization.Encoding.PEM).decode() token = self._get_token(name) diff --git a/nodeman/x509.py b/nodeman/x509.py index b93f0b8..dcc260f 100644 --- a/nodeman/x509.py +++ b/nodeman/x509.py @@ -1,15 +1,14 @@ import logging from abc import ABC, abstractmethod from dataclasses import dataclass -from typing import Any from cryptography import x509 from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import ec, rsa -from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePrivateKey, EllipticCurvePublicKey -from cryptography.hazmat.primitives.asymmetric.ed448 import Ed448PrivateKey, Ed448PublicKey -from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey, Ed25519PublicKey -from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey, RSAPublicKey +from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePrivateKey +from cryptography.hazmat.primitives.asymmetric.ed448 import Ed448PrivateKey +from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey +from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey from cryptography.x509.oid import ExtensionOID, NameOID from fastapi import HTTPException, Request, status @@ -85,15 +84,8 @@ class SubjectAlternativeNameMismatchError(CertificateSigningRequestException): pass -def verify_x509_csr(csr: x509.CertificateSigningRequest, name: str, validate_name: bool = True) -> None: - """Verify X.509 CSR""" - - verify_x509_csr_signature(csr=csr, name=name) - verify_x509_csr_data(csr=csr, name=name, validate_name=validate_name) - - -def verify_x509_csr_data(csr: x509.CertificateSigningRequest, name: str, validate_name: bool = True) -> None: - """Verify X.509 CSR against name""" +def verify_x509_csr_data(csr: x509.CertificateSigningRequest, name: str) -> None: + """Verify X.509 CSR data""" # ensure Subject is correct if len(csr.subject) != 1: @@ -112,9 +104,6 @@ def verify_x509_csr_data(csr: x509.CertificateSigningRequest, name: str, validat elif len(csr.extensions) > 1: raise CertificateSigningRequestException("Multiple extensions") - if not csr.is_signature_valid: - raise CertificateSigningRequestException("Invalid CSR signature") - # ensure SubjectAlternativeName is correct san_ext = csr.extensions.get_extension_for_oid(ExtensionOID.SUBJECT_ALTERNATIVE_NAME) san_value = san_ext.value.get_values_for_type(x509.DNSName) @@ -127,28 +116,8 @@ def verify_x509_csr_data(csr: x509.CertificateSigningRequest, name: str, validat def verify_x509_csr_signature(csr: x509.CertificateSigningRequest, name: str) -> None: """Verify X.509 CSR signature""" - public_key = csr.public_key() - verify_kwargs: dict[str, Any] = {} - - if isinstance(public_key, RSAPublicKey): - verify_kwargs = { - "algorithm": csr.signature_hash_algorithm, - "padding": csr.signature_algorithm_parameters, - } - elif isinstance(public_key, EllipticCurvePublicKey): - verify_kwargs = { - "signature_algorithm": ec.ECDSA(csr.signature_hash_algorithm), - } - elif isinstance(public_key, (Ed25519PublicKey, Ed448PublicKey)): - pass - else: - raise ValueError(f"Unsupported algorithm: {public_key}") - - try: - public_key.verify(signature=csr.signature, data=csr.tbs_certrequest_bytes, **verify_kwargs) - except Exception as exc: - logger.error("CSR signature not valid: %s", exc, exc_info=exc) - raise ValueError("Invalid CSR signature") from exc + if not csr.is_signature_valid: + raise CertificateSigningRequestException("Invalid CSR signature") logger.info("Verified CSR signature for %s", name) @@ -156,8 +125,6 @@ def verify_x509_csr_signature(csr: x509.CertificateSigningRequest, name: str) -> def process_csr_request(request: Request, csr: x509.CertificateSigningRequest, name: str) -> NodeCertificate: """Verify CSR and issue certificate""" - verify_x509_csr_data(name=name, csr=csr) - try: ca_response = request.app.ca_client.sign_csr(csr, name) except Exception as exc: