diff --git a/components/edge-identity/Makefile b/components/edge-identity/Makefile index f63aeaa..64d225c 100644 --- a/components/edge-identity/Makefile +++ b/components/edge-identity/Makefile @@ -55,6 +55,7 @@ spire-sdk: --python_out=. --grpc_python_out=. \ -I ../../vendor/spire-api-sdk/proto/ \ ../../vendor/spire-api-sdk/proto/spire/api/server/agent/v1/agent.proto \ + ../../vendor/spire-api-sdk/proto/spire/api/server/entry/v1/entry.proto \ ../../vendor/spire-api-sdk/proto/spire/api/types/*.proto .PHONY: spire-sdk diff --git a/components/edge-identity/backend/apps/default/__init__.py b/components/edge-identity/backend/apps/default/__init__.py index b76821a..74f45ea 100644 --- a/components/edge-identity/backend/apps/default/__init__.py +++ b/components/edge-identity/backend/apps/default/__init__.py @@ -1,4 +1,6 @@ +import json import os +from typing import Optional import kubeflow.kubeflow.crud_backend as base from kubeflow.kubeflow.crud_backend import config, logging @@ -9,7 +11,9 @@ log = logging.getLogger(__name__) -def create_app(name=__name__, cfg: config.Config = None): +def create_app( + name=__name__, cfg: config.Config = None, fl_suite_config_path: Optional[str] = None +): cfg = config.Config() if cfg is None else cfg # Properly set the static serving directory @@ -21,9 +25,33 @@ def create_app(name=__name__, cfg: config.Config = None): log.info("Setting STATIC_DIR to: " + static_dir) app.config["STATIC_DIR"] = static_dir + if fl_suite_config_path is None: + app.config["FL_SUITE_CONFIG"] = DEFAULT_FL_SUITE_CONFIG + else: + with open(fl_suite_config_path, "r") as f: + app.config["FL_SUITE_CONFIG"] = json.load(f) # Register the app's blueprints app.register_blueprint(routes_bp) db.init_app(app) return app + + +DEFAULT_FL_SUITE_CONFIG = { + "fl_edge": { + "auth": { + "spire": { + "server_address": "spire-server.spire.svc.cluster.local", + "server_port": 8081, + "trust_domain": "katulu.io", + "skip_kubelet_verification": True, + } + } + }, + "fl_operator": { + "orchestrator_url": "istio-ingressgateway.istio-system.svc.cluster.local", + "orchestrator_port": 443, + "orchestrator_sni": "fl-orchestrator.fl-suite", + }, +} diff --git a/components/edge-identity/backend/apps/default/routes/delete.py b/components/edge-identity/backend/apps/default/routes/delete.py index b4465ae..c7e164e 100644 --- a/components/edge-identity/backend/apps/default/routes/delete.py +++ b/components/edge-identity/backend/apps/default/routes/delete.py @@ -1,4 +1,8 @@ +import grpc +from flask import current_app from kubeflow.kubeflow.crud_backend import api, logging +from spire.api.server.entry.v1 import entry_pb2, entry_pb2_grpc +from spire.api.types import spiffeid_pb2 from ..db import get_db from . import bp @@ -13,6 +17,13 @@ def delete_edge(edge_name, namespace): """ log.info("Deregistering Edge %s/%s...", namespace, edge_name) + deregister_spire_workloads( + current_app.config["FL_SUITE_CONFIG"]["fl_edge"]["auth"]["spire"][ + "trust_domain" + ], + edge_name, + ) + # TODO: Refactor into model db_connection = get_db() cur = db_connection.cursor() @@ -26,3 +37,32 @@ def delete_edge(edge_name, namespace): return api.success_response( "message", "Edge %s successfully deregistered." % edge_name ) + + +def deregister_spire_workloads(trust_domain: str, edge_name: str): + # Hard-coded to use spire-server's unix socket only. Used to get admin access. + with grpc.insecure_channel("unix:///tmp/spire-server/private/api.sock") as channel: + stub = entry_pb2_grpc.EntryStub(channel) + + edge_spiffe_id = spiffeid_pb2.SPIFFEID( + trust_domain=trust_domain, path=f"/{edge_name}" + ) + + workload_list_entries_response = stub.ListEntries( + entry_pb2.ListEntriesRequest( + filter=entry_pb2.ListEntriesRequest.Filter(by_parent_id=edge_spiffe_id) + ) + ) + workloads_ids = [entry.id for entry in workload_list_entries_response.entries] + + edge_list_entries_response = stub.ListEntries( + entry_pb2.ListEntriesRequest( + filter=entry_pb2.ListEntriesRequest.Filter(by_spiffe_id=edge_spiffe_id) + ) + ) + edge_ids = [entry.id for entry in edge_list_entries_response.entries] + + batch_delete_request = entry_pb2.BatchDeleteEntryRequest( + ids=workloads_ids + edge_ids + ) + stub.BatchDeleteEntry(batch_delete_request) diff --git a/components/edge-identity/backend/apps/default/routes/get.py b/components/edge-identity/backend/apps/default/routes/get.py index 0c6321e..4916a95 100644 --- a/components/edge-identity/backend/apps/default/routes/get.py +++ b/components/edge-identity/backend/apps/default/routes/get.py @@ -1,3 +1,4 @@ +from flask import current_app from kubeflow.kubeflow.crud_backend import api, helpers, logging, status from ..db import query_db @@ -18,17 +19,20 @@ def get_edges(namespace): "WHERE e.namespace = ?;", (namespace,), ) + spire_config = current_app.config["FL_SUITE_CONFIG"]["fl_edge"]["auth"]["spire"] + fl_operator_config = current_app.config["FL_SUITE_CONFIG"]["fl_operator"] if results is not None: for edge in results: - edge_list.append( - { - "name": edge["name"], - "namespace": edge["namespace"], - # The status is hardcoded because it is only used for presentation purposes - "status": status.create_status(status.STATUS_PHASE.READY, "Ready"), - "age": helpers.get_uptime(edge["created_at"]), - "join_token": edge["join_token"], - } - ) + edge_dict = { + "name": edge["name"], + "namespace": edge["namespace"], + # The status is hardcoded because it is only used for presentation purposes + "status": status.create_status(status.STATUS_PHASE.READY, "Ready"), + "age": helpers.get_uptime(edge["created_at"]), + "join_token": edge["join_token"], + } + edge_dict.update(spire_config) + edge_dict.update(fl_operator_config) + edge_list.append(edge_dict) return api.success_response("edges", edge_list) diff --git a/components/edge-identity/backend/apps/default/routes/post.py b/components/edge-identity/backend/apps/default/routes/post.py index 7cc8617..4b9cc51 100644 --- a/components/edge-identity/backend/apps/default/routes/post.py +++ b/components/edge-identity/backend/apps/default/routes/post.py @@ -1,10 +1,13 @@ from datetime import datetime import grpc -from flask import request +from flask import current_app, request from kubeflow.kubeflow.crud_backend import (api, decorators, helpers, logging, status) from spire.api.server.agent.v1 import agent_pb2, agent_pb2_grpc +from spire.api.server.entry.v1 import entry_pb2, entry_pb2_grpc +from spire.api.types import entry_pb2 as entry_type +from spire.api.types import selector_pb2, spiffeid_pb2 from ..db import get_db, query_db from . import bp @@ -23,12 +26,11 @@ def post_edge(namespace): edge_name = body["name"] created_at = datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ") - # Hard-coded to use spire-server's unix socket only. Used to get admin access. - with grpc.insecure_channel("unix:///tmp/spire-server/private/api.sock") as channel: - stub = agent_pb2_grpc.AgentStub(channel) - - join_token_request = agent_pb2.CreateJoinTokenRequest(ttl=600) - join_token = stub.CreateJoinToken(join_token_request) + spire_config = current_app.config["FL_SUITE_CONFIG"]["fl_edge"]["auth"]["spire"] + join_token = register_spire_workloads( + spire_config["trust_domain"], + edge_name, + ) db_connection = get_db() cur = db_connection.cursor() @@ -47,9 +49,10 @@ def post_edge(namespace): insert_join_token_auth_sql = ( "INSERT INTO JoinTokenAuth(edge_id, join_token) VALUES(?, ?)" ) - cur.execute(insert_join_token_auth_sql, (new_edge_id, join_token.value)) + cur.execute(insert_join_token_auth_sql, (new_edge_id, join_token)) db_connection.commit() + fl_operator_config = current_app.config["FL_SUITE_CONFIG"]["fl_operator"] # TODO: Refactor into model new_edge = { "name": edge_name, @@ -57,9 +60,54 @@ def post_edge(namespace): # The status is hardcoded because it is only used for presentation purposes "status": status.create_status(status.STATUS_PHASE.READY, "Ready"), "age": helpers.get_uptime(created_at), - "join_token": join_token.value, + "join_token": join_token, } + new_edge.update(spire_config) + new_edge.update(fl_operator_config) log.info("Successfully created Edge %s/%s", namespace, edge_name) return api.success_response("edge", new_edge) + + +def register_spire_workloads(trust_domain: str, edge_name: str) -> str: + # Hard-coded to use spire-server's unix socket only. Used to get admin access. + with grpc.insecure_channel("unix:///tmp/spire-server/private/api.sock") as channel: + stub = agent_pb2_grpc.AgentStub(channel) + + edge_spiffe_id = spiffeid_pb2.SPIFFEID( + trust_domain=trust_domain, path=f"/{edge_name}" + ) + join_token_request = agent_pb2.CreateJoinTokenRequest( + ttl=600, agent_id=edge_spiffe_id + ) + join_token = stub.CreateJoinToken(join_token_request) + + fl_operator_entry = entry_type.Entry( + parent_id=edge_spiffe_id, + spiffe_id=spiffeid_pb2.SPIFFEID( + trust_domain=trust_domain, path="/fl-operator" + ), + selectors=[ + selector_pb2.Selector( + type="k8s", value="pod-label:app:fl-operator-envoyproxy" + ) + ], + ) + flower_client_entry = entry_type.Entry( + parent_id=edge_spiffe_id, + spiffe_id=spiffeid_pb2.SPIFFEID( + trust_domain=trust_domain, + path="/flower-client", + ), + selectors=[ + selector_pb2.Selector(type="k8s", value="pod-label:app:flower-client") + ], + ) + entry_stub = entry_pb2_grpc.EntryStub(channel) + entry_batch_request = entry_pb2.BatchCreateEntryRequest( + entries=[flower_client_entry, fl_operator_entry] + ) + entry_stub.BatchCreateEntry(entry_batch_request) + + return join_token.value diff --git a/components/edge-identity/backend/entrypoint.py b/components/edge-identity/backend/entrypoint.py index 656cc7b..9331331 100755 --- a/components/edge-identity/backend/entrypoint.py +++ b/components/edge-identity/backend/entrypoint.py @@ -26,11 +26,12 @@ def get_config(mode): BACKEND_MODE = os.environ.get("BACKEND_MODE", config.BackendMode.PRODUCTION.value) PREFIX = os.environ.get("APP_PREFIX", "/") +FL_SUITE_CONFIG_PATH = os.environ.get("FL_SUITE_CONFIG_PATH") cfg = get_config(BACKEND_MODE) cfg.PREFIX = PREFIX -app = default.create_app("Edge identity", cfg) +app = default.create_app("Edge identity", cfg, FL_SUITE_CONFIG_PATH) if __name__ == "__main__": diff --git a/components/edge-identity/backend/spire/api/server/entry/v1/entry_pb2.py b/components/edge-identity/backend/spire/api/server/entry/v1/entry_pb2.py new file mode 100644 index 0000000..f92acee --- /dev/null +++ b/components/edge-identity/backend/spire/api/server/entry/v1/entry_pb2.py @@ -0,0 +1,207 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: spire/api/server/entry/v1/entry.proto +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + +from spire.api.types import entry_pb2 as spire_dot_api_dot_types_dot_entry__pb2 +from spire.api.types import federateswith_pb2 as spire_dot_api_dot_types_dot_federateswith__pb2 +from spire.api.types import selector_pb2 as spire_dot_api_dot_types_dot_selector__pb2 +from spire.api.types import spiffeid_pb2 as spire_dot_api_dot_types_dot_spiffeid__pb2 +from spire.api.types import status_pb2 as spire_dot_api_dot_types_dot_status__pb2 + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n%spire/api/server/entry/v1/entry.proto\x12\x19spire.api.server.entry.v1\x1a\x1bspire/api/types/entry.proto\x1a#spire/api/types/federateswith.proto\x1a\x1espire/api/types/selector.proto\x1a\x1espire/api/types/spiffeid.proto\x1a\x1cspire/api/types/status.proto\"\x15\n\x13\x43ountEntriesRequest\"%\n\x14\x43ountEntriesResponse\x12\r\n\x05\x63ount\x18\x01 \x01(\x05\"\x95\x03\n\x12ListEntriesRequest\x12\x44\n\x06\x66ilter\x18\x01 \x01(\x0b\x32\x34.spire.api.server.entry.v1.ListEntriesRequest.Filter\x12/\n\x0boutput_mask\x18\x02 \x01(\x0b\x32\x1a.spire.api.types.EntryMask\x12\x11\n\tpage_size\x18\x03 \x01(\x05\x12\x12\n\npage_token\x18\x04 \x01(\t\x1a\xe0\x01\n\x06\x46ilter\x12/\n\x0c\x62y_spiffe_id\x18\x01 \x01(\x0b\x32\x19.spire.api.types.SPIFFEID\x12/\n\x0c\x62y_parent_id\x18\x02 \x01(\x0b\x32\x19.spire.api.types.SPIFFEID\x12\x34\n\x0c\x62y_selectors\x18\x03 \x01(\x0b\x32\x1e.spire.api.types.SelectorMatch\x12>\n\x11\x62y_federates_with\x18\x04 \x01(\x0b\x32#.spire.api.types.FederatesWithMatch\"W\n\x13ListEntriesResponse\x12\'\n\x07\x65ntries\x18\x01 \x03(\x0b\x32\x16.spire.api.types.Entry\x12\x17\n\x0fnext_page_token\x18\x02 \x01(\t\"N\n\x0fGetEntryRequest\x12\n\n\x02id\x18\x01 \x01(\t\x12/\n\x0boutput_mask\x18\x02 \x01(\x0b\x32\x1a.spire.api.types.EntryMask\"s\n\x17\x42\x61tchCreateEntryRequest\x12\'\n\x07\x65ntries\x18\x01 \x03(\x0b\x32\x16.spire.api.types.Entry\x12/\n\x0boutput_mask\x18\x02 \x01(\x0b\x32\x1a.spire.api.types.EntryMask\"\xc1\x01\n\x18\x42\x61tchCreateEntryResponse\x12K\n\x07results\x18\x01 \x03(\x0b\x32:.spire.api.server.entry.v1.BatchCreateEntryResponse.Result\x1aX\n\x06Result\x12\'\n\x06status\x18\x01 \x01(\x0b\x32\x17.spire.api.types.Status\x12%\n\x05\x65ntry\x18\x02 \x01(\x0b\x32\x16.spire.api.types.Entry\"\xa3\x01\n\x17\x42\x61tchUpdateEntryRequest\x12\'\n\x07\x65ntries\x18\x01 \x03(\x0b\x32\x16.spire.api.types.Entry\x12.\n\ninput_mask\x18\x02 \x01(\x0b\x32\x1a.spire.api.types.EntryMask\x12/\n\x0boutput_mask\x18\x03 \x01(\x0b\x32\x1a.spire.api.types.EntryMask\"\xc1\x01\n\x18\x42\x61tchUpdateEntryResponse\x12K\n\x07results\x18\x01 \x03(\x0b\x32:.spire.api.server.entry.v1.BatchUpdateEntryResponse.Result\x1aX\n\x06Result\x12\'\n\x06status\x18\x01 \x01(\x0b\x32\x17.spire.api.types.Status\x12%\n\x05\x65ntry\x18\x02 \x01(\x0b\x32\x16.spire.api.types.Entry\"&\n\x17\x42\x61tchDeleteEntryRequest\x12\x0b\n\x03ids\x18\x01 \x03(\t\"\xa6\x01\n\x18\x42\x61tchDeleteEntryResponse\x12K\n\x07results\x18\x01 \x03(\x0b\x32:.spire.api.server.entry.v1.BatchDeleteEntryResponse.Result\x1a=\n\x06Result\x12\'\n\x06status\x18\x01 \x01(\x0b\x32\x17.spire.api.types.Status\x12\n\n\x02id\x18\x02 \x01(\t\"N\n\x1bGetAuthorizedEntriesRequest\x12/\n\x0boutput_mask\x18\x01 \x01(\x0b\x32\x1a.spire.api.types.EntryMask\"G\n\x1cGetAuthorizedEntriesResponse\x12\'\n\x07\x65ntries\x18\x01 \x03(\x0b\x32\x16.spire.api.types.Entry2\xb7\x06\n\x05\x45ntry\x12o\n\x0c\x43ountEntries\x12..spire.api.server.entry.v1.CountEntriesRequest\x1a/.spire.api.server.entry.v1.CountEntriesResponse\x12l\n\x0bListEntries\x12-.spire.api.server.entry.v1.ListEntriesRequest\x1a..spire.api.server.entry.v1.ListEntriesResponse\x12N\n\x08GetEntry\x12*.spire.api.server.entry.v1.GetEntryRequest\x1a\x16.spire.api.types.Entry\x12{\n\x10\x42\x61tchCreateEntry\x12\x32.spire.api.server.entry.v1.BatchCreateEntryRequest\x1a\x33.spire.api.server.entry.v1.BatchCreateEntryResponse\x12{\n\x10\x42\x61tchUpdateEntry\x12\x32.spire.api.server.entry.v1.BatchUpdateEntryRequest\x1a\x33.spire.api.server.entry.v1.BatchUpdateEntryResponse\x12{\n\x10\x42\x61tchDeleteEntry\x12\x32.spire.api.server.entry.v1.BatchDeleteEntryRequest\x1a\x33.spire.api.server.entry.v1.BatchDeleteEntryResponse\x12\x87\x01\n\x14GetAuthorizedEntries\x12\x36.spire.api.server.entry.v1.GetAuthorizedEntriesRequest\x1a\x37.spire.api.server.entry.v1.GetAuthorizedEntriesResponseBIZGgithub.com/spiffe/spire-api-sdk/proto/spire/api/server/entry/v1;entryv1b\x06proto3') + + + +_COUNTENTRIESREQUEST = DESCRIPTOR.message_types_by_name['CountEntriesRequest'] +_COUNTENTRIESRESPONSE = DESCRIPTOR.message_types_by_name['CountEntriesResponse'] +_LISTENTRIESREQUEST = DESCRIPTOR.message_types_by_name['ListEntriesRequest'] +_LISTENTRIESREQUEST_FILTER = _LISTENTRIESREQUEST.nested_types_by_name['Filter'] +_LISTENTRIESRESPONSE = DESCRIPTOR.message_types_by_name['ListEntriesResponse'] +_GETENTRYREQUEST = DESCRIPTOR.message_types_by_name['GetEntryRequest'] +_BATCHCREATEENTRYREQUEST = DESCRIPTOR.message_types_by_name['BatchCreateEntryRequest'] +_BATCHCREATEENTRYRESPONSE = DESCRIPTOR.message_types_by_name['BatchCreateEntryResponse'] +_BATCHCREATEENTRYRESPONSE_RESULT = _BATCHCREATEENTRYRESPONSE.nested_types_by_name['Result'] +_BATCHUPDATEENTRYREQUEST = DESCRIPTOR.message_types_by_name['BatchUpdateEntryRequest'] +_BATCHUPDATEENTRYRESPONSE = DESCRIPTOR.message_types_by_name['BatchUpdateEntryResponse'] +_BATCHUPDATEENTRYRESPONSE_RESULT = _BATCHUPDATEENTRYRESPONSE.nested_types_by_name['Result'] +_BATCHDELETEENTRYREQUEST = DESCRIPTOR.message_types_by_name['BatchDeleteEntryRequest'] +_BATCHDELETEENTRYRESPONSE = DESCRIPTOR.message_types_by_name['BatchDeleteEntryResponse'] +_BATCHDELETEENTRYRESPONSE_RESULT = _BATCHDELETEENTRYRESPONSE.nested_types_by_name['Result'] +_GETAUTHORIZEDENTRIESREQUEST = DESCRIPTOR.message_types_by_name['GetAuthorizedEntriesRequest'] +_GETAUTHORIZEDENTRIESRESPONSE = DESCRIPTOR.message_types_by_name['GetAuthorizedEntriesResponse'] +CountEntriesRequest = _reflection.GeneratedProtocolMessageType('CountEntriesRequest', (_message.Message,), { + 'DESCRIPTOR' : _COUNTENTRIESREQUEST, + '__module__' : 'spire.api.server.entry.v1.entry_pb2' + # @@protoc_insertion_point(class_scope:spire.api.server.entry.v1.CountEntriesRequest) + }) +_sym_db.RegisterMessage(CountEntriesRequest) + +CountEntriesResponse = _reflection.GeneratedProtocolMessageType('CountEntriesResponse', (_message.Message,), { + 'DESCRIPTOR' : _COUNTENTRIESRESPONSE, + '__module__' : 'spire.api.server.entry.v1.entry_pb2' + # @@protoc_insertion_point(class_scope:spire.api.server.entry.v1.CountEntriesResponse) + }) +_sym_db.RegisterMessage(CountEntriesResponse) + +ListEntriesRequest = _reflection.GeneratedProtocolMessageType('ListEntriesRequest', (_message.Message,), { + + 'Filter' : _reflection.GeneratedProtocolMessageType('Filter', (_message.Message,), { + 'DESCRIPTOR' : _LISTENTRIESREQUEST_FILTER, + '__module__' : 'spire.api.server.entry.v1.entry_pb2' + # @@protoc_insertion_point(class_scope:spire.api.server.entry.v1.ListEntriesRequest.Filter) + }) + , + 'DESCRIPTOR' : _LISTENTRIESREQUEST, + '__module__' : 'spire.api.server.entry.v1.entry_pb2' + # @@protoc_insertion_point(class_scope:spire.api.server.entry.v1.ListEntriesRequest) + }) +_sym_db.RegisterMessage(ListEntriesRequest) +_sym_db.RegisterMessage(ListEntriesRequest.Filter) + +ListEntriesResponse = _reflection.GeneratedProtocolMessageType('ListEntriesResponse', (_message.Message,), { + 'DESCRIPTOR' : _LISTENTRIESRESPONSE, + '__module__' : 'spire.api.server.entry.v1.entry_pb2' + # @@protoc_insertion_point(class_scope:spire.api.server.entry.v1.ListEntriesResponse) + }) +_sym_db.RegisterMessage(ListEntriesResponse) + +GetEntryRequest = _reflection.GeneratedProtocolMessageType('GetEntryRequest', (_message.Message,), { + 'DESCRIPTOR' : _GETENTRYREQUEST, + '__module__' : 'spire.api.server.entry.v1.entry_pb2' + # @@protoc_insertion_point(class_scope:spire.api.server.entry.v1.GetEntryRequest) + }) +_sym_db.RegisterMessage(GetEntryRequest) + +BatchCreateEntryRequest = _reflection.GeneratedProtocolMessageType('BatchCreateEntryRequest', (_message.Message,), { + 'DESCRIPTOR' : _BATCHCREATEENTRYREQUEST, + '__module__' : 'spire.api.server.entry.v1.entry_pb2' + # @@protoc_insertion_point(class_scope:spire.api.server.entry.v1.BatchCreateEntryRequest) + }) +_sym_db.RegisterMessage(BatchCreateEntryRequest) + +BatchCreateEntryResponse = _reflection.GeneratedProtocolMessageType('BatchCreateEntryResponse', (_message.Message,), { + + 'Result' : _reflection.GeneratedProtocolMessageType('Result', (_message.Message,), { + 'DESCRIPTOR' : _BATCHCREATEENTRYRESPONSE_RESULT, + '__module__' : 'spire.api.server.entry.v1.entry_pb2' + # @@protoc_insertion_point(class_scope:spire.api.server.entry.v1.BatchCreateEntryResponse.Result) + }) + , + 'DESCRIPTOR' : _BATCHCREATEENTRYRESPONSE, + '__module__' : 'spire.api.server.entry.v1.entry_pb2' + # @@protoc_insertion_point(class_scope:spire.api.server.entry.v1.BatchCreateEntryResponse) + }) +_sym_db.RegisterMessage(BatchCreateEntryResponse) +_sym_db.RegisterMessage(BatchCreateEntryResponse.Result) + +BatchUpdateEntryRequest = _reflection.GeneratedProtocolMessageType('BatchUpdateEntryRequest', (_message.Message,), { + 'DESCRIPTOR' : _BATCHUPDATEENTRYREQUEST, + '__module__' : 'spire.api.server.entry.v1.entry_pb2' + # @@protoc_insertion_point(class_scope:spire.api.server.entry.v1.BatchUpdateEntryRequest) + }) +_sym_db.RegisterMessage(BatchUpdateEntryRequest) + +BatchUpdateEntryResponse = _reflection.GeneratedProtocolMessageType('BatchUpdateEntryResponse', (_message.Message,), { + + 'Result' : _reflection.GeneratedProtocolMessageType('Result', (_message.Message,), { + 'DESCRIPTOR' : _BATCHUPDATEENTRYRESPONSE_RESULT, + '__module__' : 'spire.api.server.entry.v1.entry_pb2' + # @@protoc_insertion_point(class_scope:spire.api.server.entry.v1.BatchUpdateEntryResponse.Result) + }) + , + 'DESCRIPTOR' : _BATCHUPDATEENTRYRESPONSE, + '__module__' : 'spire.api.server.entry.v1.entry_pb2' + # @@protoc_insertion_point(class_scope:spire.api.server.entry.v1.BatchUpdateEntryResponse) + }) +_sym_db.RegisterMessage(BatchUpdateEntryResponse) +_sym_db.RegisterMessage(BatchUpdateEntryResponse.Result) + +BatchDeleteEntryRequest = _reflection.GeneratedProtocolMessageType('BatchDeleteEntryRequest', (_message.Message,), { + 'DESCRIPTOR' : _BATCHDELETEENTRYREQUEST, + '__module__' : 'spire.api.server.entry.v1.entry_pb2' + # @@protoc_insertion_point(class_scope:spire.api.server.entry.v1.BatchDeleteEntryRequest) + }) +_sym_db.RegisterMessage(BatchDeleteEntryRequest) + +BatchDeleteEntryResponse = _reflection.GeneratedProtocolMessageType('BatchDeleteEntryResponse', (_message.Message,), { + + 'Result' : _reflection.GeneratedProtocolMessageType('Result', (_message.Message,), { + 'DESCRIPTOR' : _BATCHDELETEENTRYRESPONSE_RESULT, + '__module__' : 'spire.api.server.entry.v1.entry_pb2' + # @@protoc_insertion_point(class_scope:spire.api.server.entry.v1.BatchDeleteEntryResponse.Result) + }) + , + 'DESCRIPTOR' : _BATCHDELETEENTRYRESPONSE, + '__module__' : 'spire.api.server.entry.v1.entry_pb2' + # @@protoc_insertion_point(class_scope:spire.api.server.entry.v1.BatchDeleteEntryResponse) + }) +_sym_db.RegisterMessage(BatchDeleteEntryResponse) +_sym_db.RegisterMessage(BatchDeleteEntryResponse.Result) + +GetAuthorizedEntriesRequest = _reflection.GeneratedProtocolMessageType('GetAuthorizedEntriesRequest', (_message.Message,), { + 'DESCRIPTOR' : _GETAUTHORIZEDENTRIESREQUEST, + '__module__' : 'spire.api.server.entry.v1.entry_pb2' + # @@protoc_insertion_point(class_scope:spire.api.server.entry.v1.GetAuthorizedEntriesRequest) + }) +_sym_db.RegisterMessage(GetAuthorizedEntriesRequest) + +GetAuthorizedEntriesResponse = _reflection.GeneratedProtocolMessageType('GetAuthorizedEntriesResponse', (_message.Message,), { + 'DESCRIPTOR' : _GETAUTHORIZEDENTRIESRESPONSE, + '__module__' : 'spire.api.server.entry.v1.entry_pb2' + # @@protoc_insertion_point(class_scope:spire.api.server.entry.v1.GetAuthorizedEntriesResponse) + }) +_sym_db.RegisterMessage(GetAuthorizedEntriesResponse) + +_ENTRY = DESCRIPTOR.services_by_name['Entry'] +if _descriptor._USE_C_DESCRIPTORS == False: + + DESCRIPTOR._options = None + DESCRIPTOR._serialized_options = b'ZGgithub.com/spiffe/spire-api-sdk/proto/spire/api/server/entry/v1;entryv1' + _COUNTENTRIESREQUEST._serialized_start=228 + _COUNTENTRIESREQUEST._serialized_end=249 + _COUNTENTRIESRESPONSE._serialized_start=251 + _COUNTENTRIESRESPONSE._serialized_end=288 + _LISTENTRIESREQUEST._serialized_start=291 + _LISTENTRIESREQUEST._serialized_end=696 + _LISTENTRIESREQUEST_FILTER._serialized_start=472 + _LISTENTRIESREQUEST_FILTER._serialized_end=696 + _LISTENTRIESRESPONSE._serialized_start=698 + _LISTENTRIESRESPONSE._serialized_end=785 + _GETENTRYREQUEST._serialized_start=787 + _GETENTRYREQUEST._serialized_end=865 + _BATCHCREATEENTRYREQUEST._serialized_start=867 + _BATCHCREATEENTRYREQUEST._serialized_end=982 + _BATCHCREATEENTRYRESPONSE._serialized_start=985 + _BATCHCREATEENTRYRESPONSE._serialized_end=1178 + _BATCHCREATEENTRYRESPONSE_RESULT._serialized_start=1090 + _BATCHCREATEENTRYRESPONSE_RESULT._serialized_end=1178 + _BATCHUPDATEENTRYREQUEST._serialized_start=1181 + _BATCHUPDATEENTRYREQUEST._serialized_end=1344 + _BATCHUPDATEENTRYRESPONSE._serialized_start=1347 + _BATCHUPDATEENTRYRESPONSE._serialized_end=1540 + _BATCHUPDATEENTRYRESPONSE_RESULT._serialized_start=1090 + _BATCHUPDATEENTRYRESPONSE_RESULT._serialized_end=1178 + _BATCHDELETEENTRYREQUEST._serialized_start=1542 + _BATCHDELETEENTRYREQUEST._serialized_end=1580 + _BATCHDELETEENTRYRESPONSE._serialized_start=1583 + _BATCHDELETEENTRYRESPONSE._serialized_end=1749 + _BATCHDELETEENTRYRESPONSE_RESULT._serialized_start=1688 + _BATCHDELETEENTRYRESPONSE_RESULT._serialized_end=1749 + _GETAUTHORIZEDENTRIESREQUEST._serialized_start=1751 + _GETAUTHORIZEDENTRIESREQUEST._serialized_end=1829 + _GETAUTHORIZEDENTRIESRESPONSE._serialized_start=1831 + _GETAUTHORIZEDENTRIESRESPONSE._serialized_end=1902 + _ENTRY._serialized_start=1905 + _ENTRY._serialized_end=2728 +# @@protoc_insertion_point(module_scope) diff --git a/components/edge-identity/backend/spire/api/server/entry/v1/entry_pb2_grpc.py b/components/edge-identity/backend/spire/api/server/entry/v1/entry_pb2_grpc.py new file mode 100644 index 0000000..82dc865 --- /dev/null +++ b/components/edge-identity/backend/spire/api/server/entry/v1/entry_pb2_grpc.py @@ -0,0 +1,290 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +"""Client and server classes corresponding to protobuf-defined services.""" +import grpc + +from spire.api.server.entry.v1 import entry_pb2 as spire_dot_api_dot_server_dot_entry_dot_v1_dot_entry__pb2 +from spire.api.types import entry_pb2 as spire_dot_api_dot_types_dot_entry__pb2 + + +class EntryStub(object): + """Manages registration entries stored by the SPIRE Server. + """ + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.CountEntries = channel.unary_unary( + '/spire.api.server.entry.v1.Entry/CountEntries', + request_serializer=spire_dot_api_dot_server_dot_entry_dot_v1_dot_entry__pb2.CountEntriesRequest.SerializeToString, + response_deserializer=spire_dot_api_dot_server_dot_entry_dot_v1_dot_entry__pb2.CountEntriesResponse.FromString, + ) + self.ListEntries = channel.unary_unary( + '/spire.api.server.entry.v1.Entry/ListEntries', + request_serializer=spire_dot_api_dot_server_dot_entry_dot_v1_dot_entry__pb2.ListEntriesRequest.SerializeToString, + response_deserializer=spire_dot_api_dot_server_dot_entry_dot_v1_dot_entry__pb2.ListEntriesResponse.FromString, + ) + self.GetEntry = channel.unary_unary( + '/spire.api.server.entry.v1.Entry/GetEntry', + request_serializer=spire_dot_api_dot_server_dot_entry_dot_v1_dot_entry__pb2.GetEntryRequest.SerializeToString, + response_deserializer=spire_dot_api_dot_types_dot_entry__pb2.Entry.FromString, + ) + self.BatchCreateEntry = channel.unary_unary( + '/spire.api.server.entry.v1.Entry/BatchCreateEntry', + request_serializer=spire_dot_api_dot_server_dot_entry_dot_v1_dot_entry__pb2.BatchCreateEntryRequest.SerializeToString, + response_deserializer=spire_dot_api_dot_server_dot_entry_dot_v1_dot_entry__pb2.BatchCreateEntryResponse.FromString, + ) + self.BatchUpdateEntry = channel.unary_unary( + '/spire.api.server.entry.v1.Entry/BatchUpdateEntry', + request_serializer=spire_dot_api_dot_server_dot_entry_dot_v1_dot_entry__pb2.BatchUpdateEntryRequest.SerializeToString, + response_deserializer=spire_dot_api_dot_server_dot_entry_dot_v1_dot_entry__pb2.BatchUpdateEntryResponse.FromString, + ) + self.BatchDeleteEntry = channel.unary_unary( + '/spire.api.server.entry.v1.Entry/BatchDeleteEntry', + request_serializer=spire_dot_api_dot_server_dot_entry_dot_v1_dot_entry__pb2.BatchDeleteEntryRequest.SerializeToString, + response_deserializer=spire_dot_api_dot_server_dot_entry_dot_v1_dot_entry__pb2.BatchDeleteEntryResponse.FromString, + ) + self.GetAuthorizedEntries = channel.unary_unary( + '/spire.api.server.entry.v1.Entry/GetAuthorizedEntries', + request_serializer=spire_dot_api_dot_server_dot_entry_dot_v1_dot_entry__pb2.GetAuthorizedEntriesRequest.SerializeToString, + response_deserializer=spire_dot_api_dot_server_dot_entry_dot_v1_dot_entry__pb2.GetAuthorizedEntriesResponse.FromString, + ) + + +class EntryServicer(object): + """Manages registration entries stored by the SPIRE Server. + """ + + def CountEntries(self, request, context): + """Count entries. + + The caller must be local or present an admin X509-SVID. + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def ListEntries(self, request, context): + """Lists entries. + + The caller must be local or present an admin X509-SVID. + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def GetEntry(self, request, context): + """Gets an entry. If the entry does not exist, NOT_FOUND is returned. + + The caller must be local or present an admin X509-SVID. + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def BatchCreateEntry(self, request, context): + """Batch creates one or more entries. + + The caller must be local or present an admin X509-SVID. + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def BatchUpdateEntry(self, request, context): + """Batch updates one or more entries. + + The caller must be local or present an admin X509-SVID. + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def BatchDeleteEntry(self, request, context): + """Batch deletes one or more entries. + + The caller must be local or present an admin X509-SVID. + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def GetAuthorizedEntries(self, request, context): + """Gets the entries the caller is authorized for. + + The caller must present an active agent X509-SVID. See the Agent + AttestAgent/RenewAgent RPCs. + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + +def add_EntryServicer_to_server(servicer, server): + rpc_method_handlers = { + 'CountEntries': grpc.unary_unary_rpc_method_handler( + servicer.CountEntries, + request_deserializer=spire_dot_api_dot_server_dot_entry_dot_v1_dot_entry__pb2.CountEntriesRequest.FromString, + response_serializer=spire_dot_api_dot_server_dot_entry_dot_v1_dot_entry__pb2.CountEntriesResponse.SerializeToString, + ), + 'ListEntries': grpc.unary_unary_rpc_method_handler( + servicer.ListEntries, + request_deserializer=spire_dot_api_dot_server_dot_entry_dot_v1_dot_entry__pb2.ListEntriesRequest.FromString, + response_serializer=spire_dot_api_dot_server_dot_entry_dot_v1_dot_entry__pb2.ListEntriesResponse.SerializeToString, + ), + 'GetEntry': grpc.unary_unary_rpc_method_handler( + servicer.GetEntry, + request_deserializer=spire_dot_api_dot_server_dot_entry_dot_v1_dot_entry__pb2.GetEntryRequest.FromString, + response_serializer=spire_dot_api_dot_types_dot_entry__pb2.Entry.SerializeToString, + ), + 'BatchCreateEntry': grpc.unary_unary_rpc_method_handler( + servicer.BatchCreateEntry, + request_deserializer=spire_dot_api_dot_server_dot_entry_dot_v1_dot_entry__pb2.BatchCreateEntryRequest.FromString, + response_serializer=spire_dot_api_dot_server_dot_entry_dot_v1_dot_entry__pb2.BatchCreateEntryResponse.SerializeToString, + ), + 'BatchUpdateEntry': grpc.unary_unary_rpc_method_handler( + servicer.BatchUpdateEntry, + request_deserializer=spire_dot_api_dot_server_dot_entry_dot_v1_dot_entry__pb2.BatchUpdateEntryRequest.FromString, + response_serializer=spire_dot_api_dot_server_dot_entry_dot_v1_dot_entry__pb2.BatchUpdateEntryResponse.SerializeToString, + ), + 'BatchDeleteEntry': grpc.unary_unary_rpc_method_handler( + servicer.BatchDeleteEntry, + request_deserializer=spire_dot_api_dot_server_dot_entry_dot_v1_dot_entry__pb2.BatchDeleteEntryRequest.FromString, + response_serializer=spire_dot_api_dot_server_dot_entry_dot_v1_dot_entry__pb2.BatchDeleteEntryResponse.SerializeToString, + ), + 'GetAuthorizedEntries': grpc.unary_unary_rpc_method_handler( + servicer.GetAuthorizedEntries, + request_deserializer=spire_dot_api_dot_server_dot_entry_dot_v1_dot_entry__pb2.GetAuthorizedEntriesRequest.FromString, + response_serializer=spire_dot_api_dot_server_dot_entry_dot_v1_dot_entry__pb2.GetAuthorizedEntriesResponse.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + 'spire.api.server.entry.v1.Entry', rpc_method_handlers) + server.add_generic_rpc_handlers((generic_handler,)) + + + # This class is part of an EXPERIMENTAL API. +class Entry(object): + """Manages registration entries stored by the SPIRE Server. + """ + + @staticmethod + def CountEntries(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/spire.api.server.entry.v1.Entry/CountEntries', + spire_dot_api_dot_server_dot_entry_dot_v1_dot_entry__pb2.CountEntriesRequest.SerializeToString, + spire_dot_api_dot_server_dot_entry_dot_v1_dot_entry__pb2.CountEntriesResponse.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def ListEntries(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/spire.api.server.entry.v1.Entry/ListEntries', + spire_dot_api_dot_server_dot_entry_dot_v1_dot_entry__pb2.ListEntriesRequest.SerializeToString, + spire_dot_api_dot_server_dot_entry_dot_v1_dot_entry__pb2.ListEntriesResponse.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def GetEntry(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/spire.api.server.entry.v1.Entry/GetEntry', + spire_dot_api_dot_server_dot_entry_dot_v1_dot_entry__pb2.GetEntryRequest.SerializeToString, + spire_dot_api_dot_types_dot_entry__pb2.Entry.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def BatchCreateEntry(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/spire.api.server.entry.v1.Entry/BatchCreateEntry', + spire_dot_api_dot_server_dot_entry_dot_v1_dot_entry__pb2.BatchCreateEntryRequest.SerializeToString, + spire_dot_api_dot_server_dot_entry_dot_v1_dot_entry__pb2.BatchCreateEntryResponse.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def BatchUpdateEntry(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/spire.api.server.entry.v1.Entry/BatchUpdateEntry', + spire_dot_api_dot_server_dot_entry_dot_v1_dot_entry__pb2.BatchUpdateEntryRequest.SerializeToString, + spire_dot_api_dot_server_dot_entry_dot_v1_dot_entry__pb2.BatchUpdateEntryResponse.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def BatchDeleteEntry(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/spire.api.server.entry.v1.Entry/BatchDeleteEntry', + spire_dot_api_dot_server_dot_entry_dot_v1_dot_entry__pb2.BatchDeleteEntryRequest.SerializeToString, + spire_dot_api_dot_server_dot_entry_dot_v1_dot_entry__pb2.BatchDeleteEntryResponse.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def GetAuthorizedEntries(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/spire.api.server.entry.v1.Entry/GetAuthorizedEntries', + spire_dot_api_dot_server_dot_entry_dot_v1_dot_entry__pb2.GetAuthorizedEntriesRequest.SerializeToString, + spire_dot_api_dot_server_dot_entry_dot_v1_dot_entry__pb2.GetAuthorizedEntriesResponse.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) diff --git a/components/edge-identity/frontend/src/app/pages/detail-dialog/detail-dialog.component.ts b/components/edge-identity/frontend/src/app/pages/detail-dialog/detail-dialog.component.ts index 1394d1a..02dc659 100644 --- a/components/edge-identity/frontend/src/app/pages/detail-dialog/detail-dialog.component.ts +++ b/components/edge-identity/frontend/src/app/pages/detail-dialog/detail-dialog.component.ts @@ -34,15 +34,28 @@ export class DetailDialogComponent implements OnInit, OnDestroy { public snackBar: SnackBarService, @Inject(MAT_DIALOG_DATA) public edge: EdgeResponseObject, ) { - this.crdText = `apiVersion: fl.katulu.io/v1alpha1 + this.crdText = `--- +apiVersion: fl.katulu.io/v1alpha1 kind: FlEdge metadata: name: ${edge.name} spec: - join-token: ${edge.join_token} - fl-suite-url: fl.katulu.io - fl-suite-port: 443 - orchestrator-sni: fl-orchestrator.fl.katulu.io`; + auth: + spire: + server-address: ${edge.server_address} + server-port: ${edge.server_port} + trust-domain: ${edge.trust_domain} + join-token: ${edge.join_token} + skip-kubelet-verification: ${edge.skip_kubelet_verification} +--- +apiVersion: fl.katulu.io/v1alpha1 +kind: FlOperator +metadata: + name: ${edge.name} +spec: + orchestrator-url: ${edge.orchestrator_url} + orchestrator-port: ${edge.orchestrator_port} + orchestrator-sni: ${edge.orchestrator_sni}`; } ngOnInit() { } diff --git a/components/edge-identity/frontend/src/app/types/backend.ts b/components/edge-identity/frontend/src/app/types/backend.ts index fe038bf..848aec7 100644 --- a/components/edge-identity/frontend/src/app/types/backend.ts +++ b/components/edge-identity/frontend/src/app/types/backend.ts @@ -10,6 +10,13 @@ export interface EdgeResponseObject { status: Status; age: string; join_token: string; + server_address: string; + server_port: number; + trust_domain: string; + skip_kubelet_verification: boolean; + orchestrator_url: string; + orchestrator_port: number; + orchestrator_sni: string; } export interface EdgeProcessedObject extends EdgeResponseObject { diff --git a/components/fl-operator/Makefile b/components/fl-operator/Makefile index 76936b4..bb21316 100644 --- a/components/fl-operator/Makefile +++ b/components/fl-operator/Makefile @@ -56,7 +56,7 @@ generate: bin/controller-gen ## Generate code containing DeepCopy, DeepCopyInto, PATH="$(GOBIN):$(PATH)" controller-gen object:headerFile="hack/boilerplate.go.txt" paths="./..." .PHONY: test -test: bin/setup-envtest ## Run tests. +test: manifests generate bin/setup-envtest ## Run tests. KUBEBUILDER_ASSETS="$(shell $(GOBIN)/setup-envtest use $(ENVTEST_K8S_VERSION) -p path)" go test -v ./... -coverprofile cover.out ##@ Build diff --git a/components/fl-operator/PROJECT b/components/fl-operator/PROJECT index 1aab484..751c0e1 100644 --- a/components/fl-operator/PROJECT +++ b/components/fl-operator/PROJECT @@ -16,4 +16,13 @@ resources: kind: FlOperator path: github.com/katulu-io/fl-suite/fl-operator/api/v1alpha1 version: v1alpha1 +- api: + crdVersion: v1 + namespaced: true + controller: true + domain: katulu.io + group: fl + kind: FlEdge + path: github.com/katulu-io/fl-suite/fl-operator/api/v1alpha1 + version: v1alpha1 version: "3" diff --git a/components/fl-operator/api/v1alpha1/fledge_types.go b/components/fl-operator/api/v1alpha1/fledge_types.go new file mode 100644 index 0000000..27fc448 --- /dev/null +++ b/components/fl-operator/api/v1alpha1/fledge_types.go @@ -0,0 +1,74 @@ +/* +Copyright 2022. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN! +// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized. + +// FlEdgeSpec defines the desired state of FlEdge +type FlEdgeSpec struct { + Auth FlEdgeAuth `json:"auth,omitempty"` + // Important: Run "make" to regenerate code after modifying this file +} + +type FlEdgeAuth struct { + Spire *FlEdgeSpireAuth `json:"spire,omitempty"` +} +type FlEdgeSpireAuth struct { + ServerAddress string `json:"server-address"` + ServerPort int16 `json:"server-port"` + TrustDomain string `json:"trust-domain"` + JoinToken string `json:"join-token"` + SkipKubeletVerification bool `json:"skip-kubelet-verification,omitempty"` + // Important: Run "make" to regenerate code after modifying this file +} + +// FlEdgeStatus defines the observed state of FlEdge +type FlEdgeStatus struct { + CurrentConfigMapName string `json:"current-configmap-name,omitempty"` + // Important: Run "make" to regenerate code after modifying this file +} + +//+kubebuilder:object:root=true +//+kubebuilder:subresource:status +//+kubebuilder:resource:scope=Cluster + +// FlEdge is the Schema for the fledges API +type FlEdge struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + Spec FlEdgeSpec `json:"spec,omitempty"` + Status FlEdgeStatus `json:"status,omitempty"` +} + +//+kubebuilder:object:root=true + +// FlEdgeList contains a list of FlEdge +type FlEdgeList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + Items []FlEdge `json:"items"` +} + +func init() { + SchemeBuilder.Register(&FlEdge{}, &FlEdgeList{}) +} diff --git a/components/fl-operator/api/v1alpha1/zz_generated.deepcopy.go b/components/fl-operator/api/v1alpha1/zz_generated.deepcopy.go index 42ce396..41c1493 100644 --- a/components/fl-operator/api/v1alpha1/zz_generated.deepcopy.go +++ b/components/fl-operator/api/v1alpha1/zz_generated.deepcopy.go @@ -25,6 +25,131 @@ import ( runtime "k8s.io/apimachinery/pkg/runtime" ) +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *FlEdge) DeepCopyInto(out *FlEdge) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + in.Spec.DeepCopyInto(&out.Spec) + out.Status = in.Status +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FlEdge. +func (in *FlEdge) DeepCopy() *FlEdge { + if in == nil { + return nil + } + out := new(FlEdge) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *FlEdge) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *FlEdgeAuth) DeepCopyInto(out *FlEdgeAuth) { + *out = *in + if in.Spire != nil { + in, out := &in.Spire, &out.Spire + *out = new(FlEdgeSpireAuth) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FlEdgeAuth. +func (in *FlEdgeAuth) DeepCopy() *FlEdgeAuth { + if in == nil { + return nil + } + out := new(FlEdgeAuth) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *FlEdgeList) DeepCopyInto(out *FlEdgeList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]FlEdge, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FlEdgeList. +func (in *FlEdgeList) DeepCopy() *FlEdgeList { + if in == nil { + return nil + } + out := new(FlEdgeList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *FlEdgeList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *FlEdgeSpec) DeepCopyInto(out *FlEdgeSpec) { + *out = *in + in.Auth.DeepCopyInto(&out.Auth) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FlEdgeSpec. +func (in *FlEdgeSpec) DeepCopy() *FlEdgeSpec { + if in == nil { + return nil + } + out := new(FlEdgeSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *FlEdgeSpireAuth) DeepCopyInto(out *FlEdgeSpireAuth) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FlEdgeSpireAuth. +func (in *FlEdgeSpireAuth) DeepCopy() *FlEdgeSpireAuth { + if in == nil { + return nil + } + out := new(FlEdgeSpireAuth) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *FlEdgeStatus) DeepCopyInto(out *FlEdgeStatus) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FlEdgeStatus. +func (in *FlEdgeStatus) DeepCopy() *FlEdgeStatus { + if in == nil { + return nil + } + out := new(FlEdgeStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *FlOperator) DeepCopyInto(out *FlOperator) { *out = *in diff --git a/components/fl-operator/config/crd/bases/fl.katulu.io_fledges.yaml b/components/fl-operator/config/crd/bases/fl.katulu.io_fledges.yaml new file mode 100644 index 0000000..6598ed9 --- /dev/null +++ b/components/fl-operator/config/crd/bases/fl.katulu.io_fledges.yaml @@ -0,0 +1,70 @@ +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.9.0 + creationTimestamp: null + name: fledges.fl.katulu.io +spec: + group: fl.katulu.io + names: + kind: FlEdge + listKind: FlEdgeList + plural: fledges + singular: fledge + scope: Cluster + versions: + - name: v1alpha1 + schema: + openAPIV3Schema: + description: FlEdge is the Schema for the fledges API + properties: + apiVersion: + description: 'APIVersion defines the versioned schema of this representation + of an object. Servers should convert recognized schemas to the latest + internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources' + type: string + kind: + description: 'Kind is a string value representing the REST resource this + object represents. Servers may infer this from the endpoint the client + submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + type: string + metadata: + type: object + spec: + description: FlEdgeSpec defines the desired state of FlEdge + properties: + auth: + properties: + spire: + properties: + join-token: + type: string + server-address: + type: string + server-port: + type: integer + skip-kubelet-verification: + type: boolean + trust-domain: + type: string + required: + - join-token + - server-address + - server-port + - trust-domain + type: object + type: object + type: object + status: + description: FlEdgeStatus defines the observed state of FlEdge + properties: + current-configmap-name: + type: string + type: object + type: object + served: true + storage: true + subresources: + status: {} diff --git a/components/fl-operator/config/crd/kustomization.yaml b/components/fl-operator/config/crd/kustomization.yaml index fa6ae09..e363497 100644 --- a/components/fl-operator/config/crd/kustomization.yaml +++ b/components/fl-operator/config/crd/kustomization.yaml @@ -3,17 +3,20 @@ # It should be run by config/default resources: - bases/fl.katulu.io_floperators.yaml +- bases/fl.katulu.io_fledges.yaml #+kubebuilder:scaffold:crdkustomizeresource patchesStrategicMerge: # [WEBHOOK] To enable webhook, uncomment all the sections with [WEBHOOK] prefix. # patches here are for enabling the conversion webhook for each CRD #- patches/webhook_in_floperators.yaml +#- patches/webhook_in_fledges.yaml #+kubebuilder:scaffold:crdkustomizewebhookpatch # [CERTMANAGER] To enable cert-manager, uncomment all the sections with [CERTMANAGER] prefix. # patches here are for enabling the CA injection for each CRD #- patches/cainjection_in_floperators.yaml +#- patches/cainjection_in_fledges.yaml #+kubebuilder:scaffold:crdkustomizecainjectionpatch # the following config is for teaching kustomize how to do kustomization for CRDs. diff --git a/components/fl-operator/config/crd/patches/cainjection_in_fledges.yaml b/components/fl-operator/config/crd/patches/cainjection_in_fledges.yaml new file mode 100644 index 0000000..89c90a5 --- /dev/null +++ b/components/fl-operator/config/crd/patches/cainjection_in_fledges.yaml @@ -0,0 +1,7 @@ +# The following patch adds a directive for certmanager to inject CA into the CRD +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + cert-manager.io/inject-ca-from: $(CERTIFICATE_NAMESPACE)/$(CERTIFICATE_NAME) + name: fledges.fl.katulu.io diff --git a/components/fl-operator/config/crd/patches/webhook_in_fledges.yaml b/components/fl-operator/config/crd/patches/webhook_in_fledges.yaml new file mode 100644 index 0000000..061b003 --- /dev/null +++ b/components/fl-operator/config/crd/patches/webhook_in_fledges.yaml @@ -0,0 +1,16 @@ +# The following patch enables a conversion webhook for the CRD +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: fledges.fl.katulu.io +spec: + conversion: + strategy: Webhook + webhook: + clientConfig: + service: + namespace: system + name: webhook-service + path: /convert + conversionReviewVersions: + - v1 diff --git a/components/fl-operator/config/rbac/fledge_editor_role.yaml b/components/fl-operator/config/rbac/fledge_editor_role.yaml new file mode 100644 index 0000000..b4755a6 --- /dev/null +++ b/components/fl-operator/config/rbac/fledge_editor_role.yaml @@ -0,0 +1,24 @@ +# permissions for end users to edit fledges. +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: fledge-editor-role +rules: +- apiGroups: + - fl.katulu.io + resources: + - fledges + verbs: + - create + - delete + - get + - list + - patch + - update + - watch +- apiGroups: + - fl.katulu.io + resources: + - fledges/status + verbs: + - get diff --git a/components/fl-operator/config/rbac/fledge_viewer_role.yaml b/components/fl-operator/config/rbac/fledge_viewer_role.yaml new file mode 100644 index 0000000..e6f4db8 --- /dev/null +++ b/components/fl-operator/config/rbac/fledge_viewer_role.yaml @@ -0,0 +1,20 @@ +# permissions for end users to view fledges. +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: fledge-viewer-role +rules: +- apiGroups: + - fl.katulu.io + resources: + - fledges + verbs: + - get + - list + - watch +- apiGroups: + - fl.katulu.io + resources: + - fledges/status + verbs: + - get diff --git a/components/fl-operator/config/rbac/kustomization.yaml b/components/fl-operator/config/rbac/kustomization.yaml index 731832a..2eccddb 100644 --- a/components/fl-operator/config/rbac/kustomization.yaml +++ b/components/fl-operator/config/rbac/kustomization.yaml @@ -6,6 +6,7 @@ resources: # subjects if changing service account names. - service_account.yaml - role.yaml +- spire_agent_cluster_role.yaml - role_binding.yaml - leader_election_role.yaml - leader_election_role_binding.yaml diff --git a/components/fl-operator/config/rbac/role.yaml b/components/fl-operator/config/rbac/role.yaml index e5c7354..bc953dc 100644 --- a/components/fl-operator/config/rbac/role.yaml +++ b/components/fl-operator/config/rbac/role.yaml @@ -53,6 +53,68 @@ rules: - patch - update - watch +- apiGroups: + - "" + resources: + - namespaces + verbs: + - create + - delete + - get + - list + - patch + - update + - watch +- apiGroups: + - "" + resources: + - nodes + verbs: + - get +- apiGroups: + - "" + resources: + - nodes/proxy + verbs: + - get +- apiGroups: + - "" + resources: + - serviceaccounts + verbs: + - create + - delete + - get + - list + - patch + - update + - watch +- apiGroups: + - fl.katulu.io + resources: + - fledges + verbs: + - create + - delete + - get + - list + - patch + - update + - watch +- apiGroups: + - fl.katulu.io + resources: + - fledges/finalizers + verbs: + - update +- apiGroups: + - fl.katulu.io + resources: + - fledges/status + verbs: + - get + - patch + - update - apiGroups: - fl.katulu.io resources: @@ -79,3 +141,15 @@ rules: - get - patch - update +- apiGroups: + - rbac.authorization.k8s.io + resources: + - clusterrolebindings + verbs: + - create + - delete + - get + - list + - patch + - update + - watch diff --git a/components/fl-operator/config/rbac/spire_agent_cluster_role.yaml b/components/fl-operator/config/rbac/spire_agent_cluster_role.yaml new file mode 100644 index 0000000..0161631 --- /dev/null +++ b/components/fl-operator/config/rbac/spire_agent_cluster_role.yaml @@ -0,0 +1,9 @@ +--- +kind: ClusterRole +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: spire-agent +rules: +- apiGroups: [""] + resources: ["pods","nodes","nodes/proxy"] + verbs: ["get"] diff --git a/components/fl-operator/config/samples/fl_v1alpha1_fledge.yaml b/components/fl-operator/config/samples/fl_v1alpha1_fledge.yaml new file mode 100644 index 0000000..340c728 --- /dev/null +++ b/components/fl-operator/config/samples/fl_v1alpha1_fledge.yaml @@ -0,0 +1,12 @@ +apiVersion: fl.katulu.io/v1alpha1 +kind: FlEdge +metadata: + name: fledge-sample +spec: + auth: + spire: + server-address: spire-server.spire.svc.cluster.local + server-port: 8081 + trust-domain: katulu.io + join-token: f081a301-cb4d-4457-bd25-cade3145c542 + skip-kubelet-verification: true diff --git a/components/fl-operator/config/samples/fl_v1alpha1_floperator.yaml b/components/fl-operator/config/samples/fl_v1alpha1_floperator.yaml index 559bc4b..30ae532 100644 --- a/components/fl-operator/config/samples/fl_v1alpha1_floperator.yaml +++ b/components/fl-operator/config/samples/fl_v1alpha1_floperator.yaml @@ -2,8 +2,7 @@ apiVersion: fl.katulu.io/v1alpha1 kind: FlOperator metadata: name: floperator-sample - namespace: katulu-fl spec: - orchestrator-url: localhost + orchestrator-url: istio-ingressgateway.istio-system.svc.cluster.local orchestrator-port: 443 - orchestrator-sni: fl-orchestrator.fl.katulu.io + orchestrator-sni: fl-orchestrator.fl-suite diff --git a/components/fl-operator/config/samples/kustomization.yaml b/components/fl-operator/config/samples/kustomization.yaml index dc7ca55..da83108 100644 --- a/components/fl-operator/config/samples/kustomization.yaml +++ b/components/fl-operator/config/samples/kustomization.yaml @@ -1,4 +1,5 @@ ## Append samples you want in your CSV to this file as resources ## resources: - fl_v1alpha1_floperator.yaml +- fl_v1alpha1_fledge.yaml #+kubebuilder:scaffold:manifestskustomizesamples diff --git a/components/fl-operator/controllers/fledge_controller.go b/components/fl-operator/controllers/fledge_controller.go new file mode 100644 index 0000000..1c09410 --- /dev/null +++ b/components/fl-operator/controllers/fledge_controller.go @@ -0,0 +1,417 @@ +/* +Copyright 2022. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllers + +import ( + "context" + "fmt" + "strconv" + + "github.com/katulu-io/fl-suite/fl-operator/pkg/resources" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + rbacv1 "k8s.io/api/rbac/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + logger "sigs.k8s.io/controller-runtime/pkg/log" + + flv1alpha1 "github.com/katulu-io/fl-suite/fl-operator/api/v1alpha1" + "github.com/katulu-io/fl-suite/fl-operator/pkg/utils" +) + +// FlEdgeReconciler reconciles a FlEdge object +type FlEdgeReconciler struct { + client.Client + Scheme *runtime.Scheme +} + +//+kubebuilder:rbac:groups=fl.katulu.io,resources=fledges,verbs=get;list;watch;create;update;patch;delete +//+kubebuilder:rbac:groups=fl.katulu.io,resources=fledges/status,verbs=get;update;patch +//+kubebuilder:rbac:groups=fl.katulu.io,resources=fledges/finalizers,verbs=update +//+kubebuilder:rbac:groups=core,resources=namespaces,verbs=get;list;watch;create;update;patch;delete +//+kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete +//+kubebuilder:rbac:groups=rbac.authorization.k8s.io,resources=clusterrolebindings,verbs=get;list;watch;create;update;patch;delete +//+kubebuilder:rbac:groups=core,resources=serviceaccounts,verbs=get;list;watch;create;update;patch;delete +// To grant the spire-agent permissions to get nodes, nodes/proxy and pods the controller needs to have them as well +// Ref: https://kubernetes.io/docs/reference/access-authn-authz/rbac/#restrictions-on-role-creation-or-update +//+kubebuilder:rbac:groups=core,resources=nodes,verbs=get +//+kubebuilder:rbac:groups=core,resources=nodes/proxy,verbs=get + +// Reconcile is part of the main kubernetes reconciliation loop which aims to +// move the current state of the cluster closer to the desired state. +// TODO(user): Modify the Reconcile function to compare the state specified by +// the FlEdge object against the actual cluster state, and then +// perform operations to make the cluster state reflect the state specified by +// the user. +// +// For more details, check Reconcile and its Result here: +// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.12.1/pkg/reconcile +func (r *FlEdgeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + log := logger.FromContext(ctx) + flEdge := &flv1alpha1.FlEdge{} + + err := r.Get(ctx, req.NamespacedName, flEdge) + if err != nil { + if errors.IsNotFound(err) { + log.Error(err, "FlEdge resource not found. Ignoring since object must be deleted") + return ctrl.Result{}, nil + } + + log.Error(err, "Could not get FlEdge") + return ctrl.Result{}, err + } + + if flEdge.Spec.Auth.Spire != nil { + err = r.setupSpireAuthentication(ctx, flEdge) + if err != nil { + log.Error(err, "Could not setup the spire authentication") + return ctrl.Result{}, err + } + } + + return ctrl.Result{}, nil +} + +func (r *FlEdgeReconciler) setupSpireAuthentication( + ctx context.Context, + flEdge *flv1alpha1.FlEdge, +) error { + log := logger.FromContext(ctx) + + namespacedName, err := r.setupSpireAgentNamespace(ctx, flEdge) + if err != nil { + log.Error(err, "Could not setup the spire-agent namespace") + return err + } + + err = r.setupSpireAgentRBAC(ctx, flEdge, namespacedName) + if err != nil { + log.Error(err, "Could not setup the spire-agent cluster-role binding") + return err + } + + configMapName, err := r.setupSpireAgentConfigMap(ctx, flEdge, namespacedName) + if err != nil { + log.Error(err, "Could not setup the spire-agent configmap") + return err + } + + err = r.setupSpireAgentDeployment(ctx, flEdge, namespacedName, configMapName) + if err != nil { + log.Error(err, "Could not setup the spire-agent pod") + return err + } + + flEdge.Status.CurrentConfigMapName = configMapName + err = r.Status().Update(ctx, flEdge) + if err != nil { + log.Error(err, "Could not update the fledge status") + return err + } + + return nil +} + +func (r *FlEdgeReconciler) setupSpireAgentNamespace( + ctx context.Context, + flEdge *flv1alpha1.FlEdge, +) (types.NamespacedName, error) { + log := logger.FromContext(ctx) + + namespace := &corev1.Namespace{ + TypeMeta: metav1.TypeMeta{ + Kind: "Namespace", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-spire-agent", flEdge.Name), + }, + } + + err := ctrl.SetControllerReference(flEdge, namespace, r.Scheme) + if err != nil { + log.Error(err, "Could not set controller reference in the spire-agent namespace") + return types.NamespacedName{}, err + } + + err = r.Patch(ctx, namespace, client.Apply, client.ForceOwnership, client.FieldOwner(flEdge.GetName())) + if err != nil { + log.Error(err, "Could not apply the spire-agent namespace") + return types.NamespacedName{}, err + } + + return types.NamespacedName{ + Name: namespace.Name, + Namespace: namespace.Name, + }, nil +} + +func (r *FlEdgeReconciler) setupSpireAgentRBAC( + ctx context.Context, + flEdge *flv1alpha1.FlEdge, + namespacedName types.NamespacedName, +) error { + log := logger.FromContext(ctx) + + serviceAccount := &corev1.ServiceAccount{ + TypeMeta: metav1.TypeMeta{ + Kind: "ServiceAccount", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: namespacedName.Name, + Namespace: namespacedName.Namespace, + }, + } + + err := ctrl.SetControllerReference(flEdge, serviceAccount, r.Scheme) + if err != nil { + log.Error(err, "Could not set controller reference in the spire-agent serviceaccount") + return err + } + + err = r.Patch(ctx, serviceAccount, client.Apply, client.ForceOwnership, client.FieldOwner(flEdge.GetName())) + if err != nil { + log.Error(err, "Could not apply the spire-agent serviceaccount") + return err + } + + clusterRoleBinding := &rbacv1.ClusterRoleBinding{ + TypeMeta: metav1.TypeMeta{ + Kind: "ClusterRoleBinding", + APIVersion: "rbac.authorization.k8s.io/v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: namespacedName.Name, + }, + Subjects: []rbacv1.Subject{{Kind: "ServiceAccount", Name: serviceAccount.Name, Namespace: serviceAccount.Namespace}}, + RoleRef: rbacv1.RoleRef{ + Kind: "ClusterRole", + // Defined in the root of the component config/rbac/spire_agent_cluster_role.yaml. + // Expects the fl-operator- prefix to be added to the cluster role name. + Name: "fl-operator-spire-agent", + APIGroup: "rbac.authorization.k8s.io", + }, + } + + err = ctrl.SetControllerReference(flEdge, clusterRoleBinding, r.Scheme) + if err != nil { + log.Error(err, "Could not set controller reference in the spire-agent cluster-role binding") + return err + } + + err = r.Patch(ctx, clusterRoleBinding, client.Apply, client.ForceOwnership, client.FieldOwner(flEdge.GetName())) + if err != nil { + log.Error(err, "Could not apply the spire-agent cluster-role binding") + return err + } + + return nil +} + +func (r *FlEdgeReconciler) setupSpireAgentConfigMap( + ctx context.Context, + flEdge *flv1alpha1.FlEdge, + configMapName types.NamespacedName, +) (string, error) { + log := logger.FromContext(ctx) + + configMapContent, err := resources.RenderSpireAgentConfig( + resources.SpireAgentConfigContext{ + ServerAddress: flEdge.Spec.Auth.Spire.ServerAddress, + ServerPort: flEdge.Spec.Auth.Spire.ServerPort, + TrustDomain: flEdge.Spec.Auth.Spire.TrustDomain, + JoinToken: flEdge.Spec.Auth.Spire.JoinToken, + SkipKubeletVerification: strconv.FormatBool(flEdge.Spec.Auth.Spire.SkipKubeletVerification), + }, + "../deployment/spire-agent.conf.tpl", + ) + if err != nil { + log.Error(err, "Could not render spire-agent config") + return "", err + } + + configMap := &corev1.ConfigMap{ + TypeMeta: metav1.TypeMeta{ + Kind: "ConfigMap", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: configMapName.Name, + Namespace: configMapName.Namespace, + }, + Data: map[string]string{ + "agent.conf": configMapContent, + }, + } + hash, _ := utils.HashConfigMap(configMap) + if err != nil { + log.Error(err, "Could not hash the spire-agent configmap name") + return "", err + } + configMap.ObjectMeta.Name = fmt.Sprintf("%s-%s", configMapName.Name, hash) + + err = ctrl.SetControllerReference(flEdge, configMap, r.Scheme) + if err != nil { + log.Error(err, "Could not set controller reference in the spire-agent configmap") + return "", err + } + + err = r.Patch(ctx, configMap, client.Apply, client.ForceOwnership, client.FieldOwner(flEdge.GetName())) + if err != nil { + log.Error(err, "Could not apply the spire-agent configmap") + return "", err + } + + return configMap.ObjectMeta.Name, nil +} + +func (r *FlEdgeReconciler) setupSpireAgentDeployment( + ctx context.Context, + flEdge *flv1alpha1.FlEdge, + namespaceName types.NamespacedName, + configMapName string, +) error { + log := logger.FromContext(ctx) + + configMapVolumeName := "spire-agent-config" + labels := map[string]string{ + "app": "spire-agent", + } + // join-tokens cannot be reused to authenticate multiple spire-agents hence the number of replicas set 1. + replicas := int32(1) + hostPathType := corev1.HostPathDirectoryOrCreate + deployment := &appsv1.Deployment{ + TypeMeta: metav1.TypeMeta{ + Kind: "Deployment", + APIVersion: "apps/v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: namespaceName.Name, + Namespace: namespaceName.Namespace, + Labels: labels, + }, + Spec: appsv1.DeploymentSpec{ + Replicas: &replicas, + Selector: &metav1.LabelSelector{ + MatchLabels: labels, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: labels, + }, + Spec: corev1.PodSpec{ + HostPID: true, + HostNetwork: true, + DNSPolicy: corev1.DNSClusterFirstWithHostNet, + ServiceAccountName: namespaceName.Name, + Containers: []corev1.Container{ + { + Name: "spire-agent", + Image: "gcr.io/spiffe-io/spire-agent:1.3.0", + ImagePullPolicy: corev1.PullIfNotPresent, + Args: []string{"-config", "/run/spire/config/agent.conf"}, + VolumeMounts: []corev1.VolumeMount{ + { + Name: configMapVolumeName, + MountPath: "/run/spire/config", + ReadOnly: true, + }, + { + Name: "spire-agent-socket", + MountPath: "/run/spire/sockets", + ReadOnly: false, + }, + }, + LivenessProbe: &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + Exec: &corev1.ExecAction{ + Command: []string{"/opt/spire/bin/spire-agent", "healthcheck", "-socketPath", "/run/spire/sockets/agent.sock"}, + }, + }, + FailureThreshold: 2, + InitialDelaySeconds: 15, + PeriodSeconds: 60, + TimeoutSeconds: 3, + }, + ReadinessProbe: &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + Exec: &corev1.ExecAction{ + Command: []string{"/opt/spire/bin/spire-agent", "healthcheck", "-socketPath", "/run/spire/sockets/agent.sock", "--shallow"}, + }, + }, + InitialDelaySeconds: 5, + PeriodSeconds: 5, + }, + }, + }, + Volumes: []corev1.Volume{ + { + Name: configMapVolumeName, + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: configMapName, + }, + }, + }, + }, + { + Name: "spire-agent-socket", + VolumeSource: corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{ + Path: "/run/spire/sockets", + Type: &hostPathType, + }, + }, + }, + }, + }, + }, + }, + } + + err := ctrl.SetControllerReference(flEdge, deployment, r.Scheme) + if err != nil { + log.Error(err, "Could not set controller reference in the spire-agent deployment") + return err + } + + err = r.Patch(ctx, deployment, client.Apply, client.ForceOwnership, client.FieldOwner(flEdge.GetName())) + if err != nil { + log.Error(err, "Could not apply spire-agent deployment") + return err + } + + return nil +} + +// SetupWithManager sets up the controller with the Manager. +func (r *FlEdgeReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&flv1alpha1.FlEdge{}). + Owns(&corev1.Namespace{}). + Owns(&corev1.ServiceAccount{}). + Owns(&corev1.ConfigMap{}). + Owns(&rbacv1.ClusterRoleBinding{}). + Owns(&appsv1.Deployment{}). + Complete(r) +} diff --git a/components/fl-operator/controllers/fledge_controller_test.go b/components/fl-operator/controllers/fledge_controller_test.go new file mode 100644 index 0000000..46d5b2f --- /dev/null +++ b/components/fl-operator/controllers/fledge_controller_test.go @@ -0,0 +1,199 @@ +package controllers + +import ( + "time" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + rbacv1 "k8s.io/api/rbac/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + + "github.com/katulu-io/fl-suite/fl-operator/api/v1alpha1" +) + +var _ = Describe("FlEdge Controller", func() { + const ( + timeout = time.Second * 10 + interval = time.Millisecond * 250 + ) + var flEdgeKey types.NamespacedName + var flEdge *v1alpha1.FlEdge + var ownerReference metav1.OwnerReference + + BeforeEach(func() { + flEdgeKey = types.NamespacedName{ + Name: "sample", + } + flEdge = &v1alpha1.FlEdge{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "fl.katulu.io/v1alpha1", + Kind: "FlEdge", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: flEdgeKey.Name, + }, + Spec: v1alpha1.FlEdgeSpec{ + Auth: v1alpha1.FlEdgeAuth{ + Spire: &v1alpha1.FlEdgeSpireAuth{ + ServerAddress: "test-server", + ServerPort: 8888, + TrustDomain: "example.com", + JoinToken: "test-join-token-goes-here", + SkipKubeletVerification: true, + }, + }, + }, + } + Expect(k8sClient.Create(ctx, flEdge)).Should(Succeed()) + + // Eventually the status is set + Eventually(func(g Gomega) { + err := k8sClient.Get(ctx, flEdgeKey, flEdge) + g.Expect(err).NotTo(HaveOccurred()) + + g.Expect(flEdge.Status.CurrentConfigMapName).NotTo(Equal("")) + }, timeout, interval).Should(Succeed()) + + blockOwnerDeletion := true + ownerReference = metav1.OwnerReference{ + APIVersion: "fl.katulu.io/v1alpha1", + Kind: "FlEdge", + UID: flEdge.UID, + Name: flEdge.Name, + Controller: &blockOwnerDeletion, + BlockOwnerDeletion: &blockOwnerDeletion, + } + }) + + AfterEach(func() { + Expect(k8sClient.Delete(ctx, flEdge)).Should(Succeed()) + }) + + Context("When creating a new fl-edge", func() { + It("should bootstrap a clusterrolebinding, deployment and a configmap", func() { + // We check that the owner reference is set on each resource to assert that the resource will be deleted because + // envtest does not run the gc controller in charge of deleting the resources. + // ref: https://book-v2.book.kubebuilder.io/reference/envtest.html#testing-considerations + + // Eventually a new namespace is created + Eventually(func(g Gomega) { + key := types.NamespacedName{ + Name: "sample-spire-agent", + } + c := &corev1.Namespace{} + err := k8sClient.Get(ctx, key, c) + + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(c.OwnerReferences).To(ContainElement(ownerReference)) + }, timeout, interval).Should(Succeed()) + + // Eventually the clusterrolebinding is created + Eventually(func(g Gomega) { + key := types.NamespacedName{ + Name: "sample-spire-agent", + } + c := &rbacv1.ClusterRoleBinding{} + err := k8sClient.Get(ctx, key, c) + + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(c.OwnerReferences).To(ContainElement(ownerReference)) + }, timeout, interval).Should(Succeed()) + + // Eventually the service-account is created + Eventually(func(g Gomega) { + key := types.NamespacedName{ + Name: "sample-spire-agent", + Namespace: "sample-spire-agent", + } + c := &corev1.ServiceAccount{} + err := k8sClient.Get(ctx, key, c) + + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(c.OwnerReferences).To(ContainElement(ownerReference)) + }, timeout, interval).Should(Succeed()) + + // Eventually the configmap is created + Eventually(func(g Gomega) { + key := types.NamespacedName{ + Name: flEdge.Status.CurrentConfigMapName, + Namespace: "sample-spire-agent", + } + c := &corev1.ConfigMap{} + err := k8sClient.Get(ctx, key, c) + + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(c.OwnerReferences).To(ContainElement(ownerReference)) + }, timeout, interval).Should(Succeed()) + + // Eventually the deployment is created + Eventually(func(g Gomega) { + key := types.NamespacedName{ + Name: "sample-spire-agent", + Namespace: "sample-spire-agent", + } + d := &appsv1.Deployment{} + err := k8sClient.Get(ctx, key, d) + + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(d.OwnerReferences).To(ContainElement(ownerReference)) + g.Expect(isDeploymentUsingConfigMap(d, flEdge.Status.CurrentConfigMapName)).To(BeTrue()) + g.Expect(d.Spec.Template.Spec.ServiceAccountName).To(Equal("sample-spire-agent")) + }, timeout, interval).Should(Succeed()) + }) + }) + + Context("When updating a fl-edge", func() { + It("should create a new configmap and update the deployment", func() { + flEdge.Spec.Auth.Spire.ServerAddress = "spire-server" + flEdge.Spec.Auth.Spire.ServerPort = 8000 + flEdge.Spec.Auth.Spire.TrustDomain = "another-domain.com" + Expect(k8sClient.Update(ctx, flEdge)).Should(Succeed()) + + // Eventually the FlEdge status is updated with a new configmap name + updatedFlEdge := &v1alpha1.FlEdge{} + Eventually(func(g Gomega) { + err := k8sClient.Get(ctx, flEdgeKey, updatedFlEdge) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(flEdge.Status.CurrentConfigMapName).NotTo(Equal(updatedFlEdge.Status.CurrentConfigMapName)) + }).Should(Succeed()) + + // Eventually a new configmap is created + Eventually(func(g Gomega) { + key := types.NamespacedName{ + Name: updatedFlEdge.Status.CurrentConfigMapName, + Namespace: "sample-spire-agent", + } + c := &corev1.ConfigMap{} + err := k8sClient.Get(ctx, key, c) + g.Expect(err).NotTo(HaveOccurred()) + }, timeout, interval).Should(Succeed()) + + // Eventually the deployment is updated + Eventually(func(g Gomega) { + key := types.NamespacedName{ + Name: "sample-spire-agent", + Namespace: "sample-spire-agent", + } + d := &appsv1.Deployment{} + err := k8sClient.Get(ctx, key, d) + + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(isDeploymentUsingConfigMap(d, updatedFlEdge.Status.CurrentConfigMapName)).To(BeTrue()) + }, timeout, interval).Should(Succeed()) + }) + }) +}) + +func isDeploymentUsingConfigMap(d *appsv1.Deployment, configMapName string) bool { + for v := range d.Spec.Template.Spec.Volumes { + configMap := d.Spec.Template.Spec.Volumes[v].VolumeSource.ConfigMap + if configMap.Name == configMapName { + return true + } + } + return false +} diff --git a/components/fl-operator/controllers/suite_test.go b/components/fl-operator/controllers/suite_test.go index 0f58c6b..11a8705 100644 --- a/components/fl-operator/controllers/suite_test.go +++ b/components/fl-operator/controllers/suite_test.go @@ -89,6 +89,12 @@ var _ = BeforeSuite(func() { }).SetupWithManager(k8sManager) Expect(err).NotTo(HaveOccurred()) + err = (&FlEdgeReconciler{ + Client: k8sClient, + Scheme: k8sManager.GetScheme(), + }).SetupWithManager(k8sManager) + Expect(err).NotTo(HaveOccurred()) + go func() { defer GinkgoRecover() err := k8sManager.Start(ctx) diff --git a/components/fl-operator/deployment/spire-agent.conf.tpl b/components/fl-operator/deployment/spire-agent.conf.tpl new file mode 100644 index 0000000..930fd2d --- /dev/null +++ b/components/fl-operator/deployment/spire-agent.conf.tpl @@ -0,0 +1,27 @@ +agent { + data_dir = "/run/spire" + log_level = "DEBUG" + server_address = "{{.ServerAddress}}" + server_port = "{{.ServerPort}}" + socket_path = "/run/spire/sockets/agent.sock" + # TODO: Figure out how to not do an insecure bootstrap + insecure_bootstrap = true + trust_domain = "{{.TrustDomain}}" + join_token = "{{.JoinToken}}" +} + +plugins { + NodeAttestor "join_token" { + } + + KeyManager "memory" { + plugin_data { + } + } + + WorkloadAttestor "k8s" { + plugin_data { + skip_kubelet_verification = {{.SkipKubeletVerification}} + } + } +} diff --git a/components/fl-operator/main.go b/components/fl-operator/main.go index 465550b..c1cf476 100644 --- a/components/fl-operator/main.go +++ b/components/fl-operator/main.go @@ -87,6 +87,13 @@ func main() { setupLog.Error(err, "unable to create controller", "controller", "FlOperator") os.Exit(1) } + if err = (&controllers.FlEdgeReconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + }).SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "FlEdge") + os.Exit(1) + } //+kubebuilder:scaffold:builder if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { diff --git a/components/fl-operator/pkg/resources/resources.go b/components/fl-operator/pkg/resources/resources.go index c83fae5..bb59406 100644 --- a/components/fl-operator/pkg/resources/resources.go +++ b/components/fl-operator/pkg/resources/resources.go @@ -50,6 +50,35 @@ func RenderEnvoyproxyConfig(context EnvoyConfigContext, envoyConfigFile string) return out.String(), nil } +func RenderSpireAgentConfig(context SpireAgentConfigContext, envoyConfigFile string) (string, error) { + envoyConfigTemplate, err := ioutil.ReadFile(envoyConfigFile) + if err != nil { + return "", err + } + + t, err := template.New("spire-agent").Parse(string(envoyConfigTemplate)) + if err != nil { + return "", err + } + + out := &bytes.Buffer{} + + err = t.Execute(out, context) + if err != nil { + return "", err + } + + return out.String(), nil +} + +type SpireAgentConfigContext struct { + ServerAddress string + ServerPort int16 + TrustDomain string + SkipKubeletVerification string + JoinToken string +} + // Creates new pod for the flower-client func NewPod(task *pb.OrchestratorMessage_TaskSpec, name types.NamespacedName, envoyConfigName string) *corev1.Pod { shareProcessNamespace := true diff --git a/components/fl-operator/pkg/utils/utils.go b/components/fl-operator/pkg/utils/utils.go index 1af3739..4e907f1 100644 --- a/components/fl-operator/pkg/utils/utils.go +++ b/components/fl-operator/pkg/utils/utils.go @@ -1,6 +1,11 @@ package utils import ( + "crypto/sha256" + "encoding/json" + "fmt" + + corev1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1" ) @@ -11,3 +16,60 @@ func Int32Ptr(i int32) *int32 { func HostPathTypePtr(t v1.HostPathType) *v1.HostPathType { return &t } + +// Inspired by kustomize: https://github.com/kubernetes-sigs/kustomize/blob/master/api/hasher/hasher.go#L60 +func HashConfigMap(cm *corev1.ConfigMap) (string, error) { + encoded, err := encodeConfigMap(cm) + if err != nil { + return "", err + } + return encode(hex256(encoded)) +} + +// Inspired by kustomize: https://github.com/kubernetes-sigs/kustomize/blob/master/api/hasher/hasher.go#L109 +func encodeConfigMap(cm *corev1.ConfigMap) (string, error) { + // get fields + m := map[string]interface{}{ + "kind": "ConfigMap", + "name": cm.GetName(), + "data": cm.Data, + } + + // json.Marshal sorts the keys in a stable order in the encoding + data, err := json.Marshal(m) + if err != nil { + return "", err + } + + return string(data), nil +} + +// Taken from kustomize: https://github.com/kubernetes-sigs/kustomize/blob/master/api/hasher/hasher.go#L28 +// Which in turn is taken from: https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/kubectl/pkg/util/hash/hash.go#L105 +func encode(hex string) (string, error) { + if len(hex) < 10 { + return "", fmt.Errorf( + "input length must be at least 10") + } + enc := []rune(hex[:10]) + for i := range enc { + switch enc[i] { + case '0': + enc[i] = 'g' + case '1': + enc[i] = 'h' + case '3': + enc[i] = 'k' + case 'a': + enc[i] = 'm' + case 'e': + enc[i] = 't' + } + } + return string(enc), nil +} + +// Taken from kustomize: https://github.com/kubernetes-sigs/kustomize/blob/master/api/hasher/hasher.go#L52 +func hex256(data string) string { + return fmt.Sprintf("%x", sha256.Sum256([]byte(data))) +}