Skip to content

Commit

Permalink
Add internal CA
Browse files Browse the repository at this point in the history
  • Loading branch information
jschlyter committed Dec 16, 2024
1 parent 170e3d5 commit 453febb
Show file tree
Hide file tree
Showing 5 changed files with 197 additions and 89 deletions.
88 changes: 88 additions & 0 deletions nodeman/internal_ca.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
from datetime import datetime, timedelta, timezone
from typing import Self

from cryptography import x509
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from cryptography.x509.oid import ExtendedKeyUsageOID, NameOID

from nodeman.x509 import CertificateAuthorityClient, CertificateInformation, PrivateKey, get_hash_algorithm_from_key


class InternalCertificateAuthority(CertificateAuthorityClient):
"""Internal CA"""

KEY_USAGE = x509.KeyUsage(
digital_signature=True,
content_commitment=False,
key_encipherment=False,
data_encipherment=False,
key_agreement=False,
key_cert_sign=False,
crl_sign=False,
encipher_only=False,
decipher_only=False,
)

EXTENDED_KEY_USAGE = x509.ExtendedKeyUsage(
usages=[
ExtendedKeyUsageOID.CLIENT_AUTH,
ExtendedKeyUsageOID.SERVER_AUTH,
]
)

def __init__(
self,
ca_certificate: x509.Certificate,
ca_private_key: PrivateKey,
validity: timedelta | None = None,
time_skew: timedelta | None = None,
):
self.ca_certificate = ca_certificate
self.ca_private_key = ca_private_key
self.time_skew = time_skew or timedelta(minutes=10)
self.validity = validity or timedelta(minutes=10)
self.signature_hash_algorithm = get_hash_algorithm_from_key(self.ca_private_key)

@classmethod
def load(
cls, ca_certificate_file: str, ca_private_key_file: str, validity: timedelta, w: timedelta | None = None
) -> Self:
with open(ca_private_key_file, "rb") as fp:
ca_certificate = x509.load_pem_x509_certificate(fp.read())

with open(ca_private_key_file, "rb") as fp:
ca_private_key = load_pem_private_key(fp.read())
if not isinstance(ca_private_key, PrivateKey):
raise ValueError("Unsupported private key algorithm")

return cls(ca_certificate=ca_certificate, ca_private_key=ca_private_key, validity=validity)

def sign_csr(self, csr: x509.CertificateSigningRequest, name: str) -> CertificateInformation:
"""Sign CSR with CA private key"""

now = datetime.now(tz=timezone.utc)

builder = x509.CertificateBuilder()
builder = builder.subject_name(x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, name)]))
builder = builder.issuer_name(self.ca_certificate.subject)
builder = builder.not_valid_before(now - self.time_skew)
builder = builder.not_valid_after(now + self.validity)
builder = builder.serial_number(x509.random_serial_number())
builder = builder.public_key(csr.public_key())

builder = builder.add_extension(self.KEY_USAGE, critical=True)
builder = builder.add_extension(self.EXTENDED_KEY_USAGE, critical=False)

builder = builder.add_extension(x509.SubjectKeyIdentifier.from_public_key(csr.public_key()), critical=False)
builder = builder.add_extension(
x509.AuthorityKeyIdentifier.from_issuer_public_key(self.ca_certificate.public_key()), critical=False
)
builder = builder.add_extension(x509.SubjectAlternativeName([x509.DNSName(name)]), critical=False)
builder = builder.add_extension(
x509.BasicConstraints(ca=False, path_length=None),
critical=True,
)

certificate = builder.sign(private_key=self.ca_private_key, algorithm=self.signature_hash_algorithm)

return CertificateInformation(cert_chain=[certificate], ca_cert=self.ca_certificate)
16 changes: 12 additions & 4 deletions tests/test_api.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import json
import logging
import uuid
from datetime import datetime, timezone
from datetime import datetime, timedelta, timezone
from urllib.parse import urljoin

import pytest
Expand All @@ -17,12 +17,13 @@
from jwcrypto.jws import JWS
from pydantic_settings import SettingsConfigDict

from nodeman.internal_ca import InternalCertificateAuthority
from nodeman.jose import jwk_to_alg
from nodeman.models import PublicKeyFormat
from nodeman.server import NodemanServer
from nodeman.settings import Settings
from nodeman.x509 import generate_x509_csr
from tests.utils import CaTestClient, rekey
from nodeman.x509 import CertificateAuthorityClient, generate_x509_csr
from tests.utils import generate_ca_certificate, rekey

ADMIN_TEST_NODE_COUNT = 100
BACKEND_CREDENTIALS = ("username", "password")
Expand All @@ -33,9 +34,16 @@
settings = Settings()


def get_ca_client(ca_name: str) -> CertificateAuthorityClient:
ca_private_key = ec.generate_private_key(ec.SECP256R1())
ca_certificate = generate_ca_certificate(ca_name, ca_private_key)
validity = timedelta(minutes=10)
return InternalCertificateAuthority(ca_certificate=ca_certificate, ca_private_key=ca_private_key, validity=validity)


def get_test_client() -> TestClient:
app = NodemanServer(settings)
app.ca_client = CaTestClient()
app.ca_client = get_ca_client("ca.example.com")
app.connect_mongodb()
return TestClient(app)

Expand Down
19 changes: 0 additions & 19 deletions tests/test_dummy_ca.py

This file was deleted.

61 changes: 61 additions & 0 deletions tests/test_internal_ca.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from datetime import timedelta

from cryptography import x509
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ec, rsa
from cryptography.hazmat.primitives.asymmetric.ed448 import Ed448PrivateKey
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
from cryptography.x509.oid import NameOID

from nodeman.internal_ca import InternalCertificateAuthority
from nodeman.x509 import PrivateKey, generate_x509_csr, verify_x509_csr
from tests.utils import generate_ca_certificate


def _test_internal_ca(ca_private_key: PrivateKey) -> None:
"""Test Internal CA"""

ca_name = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, "Internal Test CA")])
ca_certificate = generate_ca_certificate(ca_name, ca_private_key)

validity = timedelta(minutes=10)
ca_client = InternalCertificateAuthority(
ca_certificate=ca_certificate, ca_private_key=ca_private_key, validity=validity
)

name = "hostname.example.com"
key = ec.generate_private_key(ec.SECP256R1())
csr = generate_x509_csr(key=key, name=name)

verify_x509_csr(name=name, csr=csr)

res = ca_client.sign_csr(csr, name)
x509_certificate_pem = "".join(
[certificate.public_bytes(serialization.Encoding.PEM).decode() for certificate in res.cert_chain]
)
print(x509_certificate_pem)


def test_internal_ca_rsa() -> None:
ca_private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
return _test_internal_ca(ca_private_key)


def test_internal_ca_p256() -> None:
ca_private_key = ec.generate_private_key(ec.SECP256R1())
return _test_internal_ca(ca_private_key)


def test_internal_ca_p384() -> None:
ca_private_key = ec.generate_private_key(ec.SECP384R1())
return _test_internal_ca(ca_private_key)


def test_internal_ca_ed25519() -> None:
ca_private_key = Ed25519PrivateKey.generate()
return _test_internal_ca(ca_private_key)


def test_internal_ca_ed448() -> None:
ca_private_key = Ed448PrivateKey.generate()
return _test_internal_ca(ca_private_key)
102 changes: 36 additions & 66 deletions tests/utils.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
from datetime import datetime, timedelta, timezone

from cryptography import x509
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.x509.oid import NameOID
from jwcrypto.common import base64url_decode
from jwcrypto.jwk import JWK

from nodeman.x509 import CertificateAuthorityClient, CertificateInformation
from nodeman.x509 import PrivateKey, get_hash_algorithm_from_key


def rekey(key: JWK) -> JWK:
Expand All @@ -23,66 +21,38 @@ def rekey(key: JWK) -> JWK:
return JWK.generate(**params)


class CaTestClient(CertificateAuthorityClient):
def __init__(self):
self.ca_name = "ca.example.com"
self.ca_url = "https://ca.example.com"
self.ca_private_key = ec.generate_private_key(ec.SECP256R1())

now = datetime.now(tz=timezone.utc)
validity = timedelta(days=1)

builder = x509.CertificateBuilder()
builder = builder.subject_name(x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, self.ca_name)]))
builder = builder.issuer_name(x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, self.ca_name)]))
builder = builder.not_valid_before(now)
builder = builder.not_valid_after(now + validity)
builder = builder.serial_number(x509.random_serial_number())
builder = builder.public_key(self.ca_private_key.public_key())
builder = builder.add_extension(x509.IssuerAlternativeName([x509.DNSName(self.ca_name)]), critical=False)
builder = builder.add_extension(
x509.BasicConstraints(ca=True, path_length=None),
critical=True,
)
builder = builder.add_extension(
x509.KeyUsage(
digital_signature=True,
content_commitment=False,
key_encipherment=False,
data_encipherment=False,
key_agreement=False,
key_cert_sign=True,
crl_sign=True,
encipher_only=False,
decipher_only=False,
),
critical=True,
)
self.ca_certificate = builder.sign(
private_key=self.ca_private_key,
algorithm=hashes.SHA256(),
)

def sign_csr(self, csr: x509.CertificateSigningRequest, name: str) -> CertificateInformation:
"""Sign CSR with CA private key"""
now = datetime.now(tz=timezone.utc)
validity = timedelta(minutes=10)

builder = x509.CertificateBuilder()
builder = builder.subject_name(x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, name)]))
builder = builder.issuer_name(x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, self.ca_name)]))
builder = builder.not_valid_before(now)
builder = builder.not_valid_after(now + validity)
builder = builder.serial_number(x509.random_serial_number())
builder = builder.public_key(csr.public_key())
builder = builder.add_extension(x509.SubjectAlternativeName([x509.DNSName(name)]), critical=False)
builder = builder.add_extension(
x509.BasicConstraints(ca=False, path_length=None),
critical=True,
)
certificate = builder.sign(
private_key=self.ca_private_key,
algorithm=hashes.SHA256(),
)

return CertificateInformation(cert_chain=[certificate], ca_cert=self.ca_certificate)
def generate_ca_certificate(ca_name: x509.Name | str, ca_private_key: PrivateKey) -> x509.Certificate:
"""Generate CA Certificate"""

now = datetime.now(tz=timezone.utc)
validity = timedelta(days=1)

ca_name = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, ca_name)]) if isinstance(ca_name, str) else ca_name

builder = x509.CertificateBuilder()
builder = builder.subject_name(ca_name)
builder = builder.issuer_name(ca_name)
builder = builder.not_valid_before(now)
builder = builder.not_valid_after(now + validity)
builder = builder.serial_number(x509.random_serial_number())
builder = builder.public_key(ca_private_key.public_key())
builder = builder.add_extension(
x509.BasicConstraints(ca=True, path_length=None),
critical=True,
)
builder = builder.add_extension(
x509.KeyUsage(
digital_signature=True,
content_commitment=False,
key_encipherment=False,
data_encipherment=False,
key_agreement=False,
key_cert_sign=True,
crl_sign=True,
encipher_only=False,
decipher_only=False,
),
critical=True,
)

return builder.sign(private_key=ca_private_key, algorithm=get_hash_algorithm_from_key(ca_private_key))

0 comments on commit 453febb

Please sign in to comment.