From f802310244034bc11dbebb3a17a34590065d598e Mon Sep 17 00:00:00 2001 From: Jakob Schlyter Date: Mon, 2 Dec 2024 14:30:34 +0100 Subject: [PATCH] Add option to create node with custom name (#10) * add option to create node with custom name * rename --- nodeman/nodes.py | 12 ++++++++++-- tests/test.toml | 2 +- tests/test_api.py | 34 +++++++++++++++++++++++++++++++--- 3 files changed, 42 insertions(+), 6 deletions(-) diff --git a/nodeman/nodes.py b/nodeman/nodes.py index 47bdc9b..241c612 100644 --- a/nodeman/nodes.py +++ b/nodeman/nodes.py @@ -54,10 +54,18 @@ def get_current_username( tags=["backend"], ) async def create_node( - request: Request, username: Annotated[str, Depends(get_current_username)] + request: Request, username: Annotated[str, Depends(get_current_username)], name: str | None = None ) -> NodeBootstrapInformation: secret = JWK.generate(kty="oct", size=256).k - node = TapirNode.create_next_node(domain=request.app.settings.nodes.domain) + domain = request.app.settings.nodes.domain + if name is None: + node = TapirNode.create_next_node(domain=request.app.settings.nodes.domain) + elif name.endswith(f".{domain}"): + logging.debug("Explicit node name %s requested", name) + node = TapirNode(name=name, domain=domain).save() + else: + logging.warning("Explicit node name %s not acceptable", name) + raise HTTPException(status.HTTP_400_BAD_REQUEST, detail="Invalid node name") logging.debug("Created node %s", node.name) node_secret = TapirNodeSecret(name=node.name, secret=secret).save() return NodeBootstrapInformation(name=node.name, secret=node_secret.secret) diff --git a/tests/test.toml b/tests/test.toml index 3f524c3..6b8bc07 100644 --- a/tests/test.toml +++ b/tests/test.toml @@ -2,7 +2,7 @@ server = "mongomock://localhost/nodes" [nodes] -domain = "dev.dnstapir.se" +domain = "test.dnstapir.se" #trusted_keys = "trusted_keys.json" mqtt_broker = "mqtts://localhost" diff --git a/tests/test_api.py b/tests/test_api.py index f78f152..676c0c8 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -3,6 +3,7 @@ import uuid from urllib.parse import urljoin +import pytest from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import ec, rsa from cryptography.hazmat.primitives.asymmetric.ed448 import Ed448PrivateKey @@ -23,18 +24,24 @@ ADMIN_TEST_NODE_COUNT = 100 BACKEND_CREDENTIALS = ("username", "password") +PrivateKey = ec.EllipticCurvePrivateKey | rsa.RSAPublicKey | Ed25519PrivateKey | Ed448PrivateKey + Settings.model_config = SettingsConfigDict(toml_file="tests/test.toml") +settings = Settings() def get_test_client() -> TestClient: - settings = Settings() app = NodemanServer(settings) app.ca_client = CaTestClient() app.connect_mongodb() return TestClient(app) -def _test_enroll(data_key, x509_key) -> None: +class FailedToCreateNode(RuntimeError): + pass + + +def _test_enroll(data_key: JWK, x509_key: PrivateKey, requested_name: str | None = None) -> None: client = get_test_client() admin_client = get_test_client() @@ -45,11 +52,17 @@ def _test_enroll(data_key, x509_key) -> None: logging.basicConfig(level=logging.DEBUG) logging.debug("Testing enrollment") - response = admin_client.post(urljoin(server, "/api/v1/node")) + response = admin_client.post( + urljoin(server, "/api/v1/node"), params={"name": requested_name} if requested_name else None + ) + if response.status_code != 201: + raise FailedToCreateNode assert response.status_code == 201 create_response = response.json() name = create_response["name"] secret = create_response["secret"] + if requested_name: + assert name == requested_name logging.info("Got name=%s secret=%s", name, secret) node_url = urljoin(server, f"/api/v1/node/{name}") @@ -145,6 +158,21 @@ def test_enroll_rsa() -> None: _test_enroll(data_key=data_key, x509_key=x509_key) +def test_enroll_p256_named_node() -> None: + data_key = JWK.generate(kty="EC", crv="P-256") + x509_key = ec.generate_private_key(ec.SECP256R1()) + requested_name = ".".join(["xyzzy", settings.nodes.domain]) + _test_enroll(data_key=data_key, x509_key=x509_key, requested_name=requested_name) + + +def test_enroll_p256_bad_named_node() -> None: + data_key = JWK.generate(kty="EC", crv="P-256") + x509_key = ec.generate_private_key(ec.SECP256R1()) + requested_name = "xyzzy.example.com" + with pytest.raises(FailedToCreateNode): + _test_enroll(data_key=data_key, x509_key=x509_key, requested_name=requested_name) + + def test_enroll_bad_hmac_signature() -> None: client = get_test_client() server = ""