Skip to content
This repository has been archived by the owner on Jul 20, 2023. It is now read-only.

Commit

Permalink
Merge pull request #26 from katulu-io/GerardoGR/edge-controller
Browse files Browse the repository at this point in the history
Implementing new FLEdge CRD to manage and authenticate the edge devices
  • Loading branch information
GerardoGR authored Aug 10, 2022
2 parents 9189bdf + e49daa3 commit 2fe0ac3
Show file tree
Hide file tree
Showing 33 changed files with 1,860 additions and 30 deletions.
1 change: 1 addition & 0 deletions components/edge-identity/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ spire-sdk:
--python_out=. --grpc_python_out=. \
-I ../../vendor/spire-api-sdk/proto/ \
../../vendor/spire-api-sdk/proto/spire/api/server/agent/v1/agent.proto \
../../vendor/spire-api-sdk/proto/spire/api/server/entry/v1/entry.proto \
../../vendor/spire-api-sdk/proto/spire/api/types/*.proto
.PHONY: spire-sdk

Expand Down
30 changes: 29 additions & 1 deletion components/edge-identity/backend/apps/default/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import json
import os
from typing import Optional

import kubeflow.kubeflow.crud_backend as base
from kubeflow.kubeflow.crud_backend import config, logging
Expand All @@ -9,7 +11,9 @@
log = logging.getLogger(__name__)


def create_app(name=__name__, cfg: config.Config = None):
def create_app(
name=__name__, cfg: config.Config = None, fl_suite_config_path: Optional[str] = None
):
cfg = config.Config() if cfg is None else cfg

# Properly set the static serving directory
Expand All @@ -21,9 +25,33 @@ def create_app(name=__name__, cfg: config.Config = None):

log.info("Setting STATIC_DIR to: " + static_dir)
app.config["STATIC_DIR"] = static_dir
if fl_suite_config_path is None:
app.config["FL_SUITE_CONFIG"] = DEFAULT_FL_SUITE_CONFIG
else:
with open(fl_suite_config_path, "r") as f:
app.config["FL_SUITE_CONFIG"] = json.load(f)

# Register the app's blueprints
app.register_blueprint(routes_bp)
db.init_app(app)

return app


DEFAULT_FL_SUITE_CONFIG = {
"fl_edge": {
"auth": {
"spire": {
"server_address": "spire-server.spire.svc.cluster.local",
"server_port": 8081,
"trust_domain": "katulu.io",
"skip_kubelet_verification": True,
}
}
},
"fl_operator": {
"orchestrator_url": "istio-ingressgateway.istio-system.svc.cluster.local",
"orchestrator_port": 443,
"orchestrator_sni": "fl-orchestrator.fl-suite",
},
}
40 changes: 40 additions & 0 deletions components/edge-identity/backend/apps/default/routes/delete.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import grpc
from flask import current_app
from kubeflow.kubeflow.crud_backend import api, logging
from spire.api.server.entry.v1 import entry_pb2, entry_pb2_grpc
from spire.api.types import spiffeid_pb2

from ..db import get_db
from . import bp
Expand All @@ -13,6 +17,13 @@ def delete_edge(edge_name, namespace):
"""
log.info("Deregistering Edge %s/%s...", namespace, edge_name)

deregister_spire_workloads(
current_app.config["FL_SUITE_CONFIG"]["fl_edge"]["auth"]["spire"][
"trust_domain"
],
edge_name,
)

# TODO: Refactor into model
db_connection = get_db()
cur = db_connection.cursor()
Expand All @@ -26,3 +37,32 @@ def delete_edge(edge_name, namespace):
return api.success_response(
"message", "Edge %s successfully deregistered." % edge_name
)


def deregister_spire_workloads(trust_domain: str, edge_name: str):
# Hard-coded to use spire-server's unix socket only. Used to get admin access.
with grpc.insecure_channel("unix:///tmp/spire-server/private/api.sock") as channel:
stub = entry_pb2_grpc.EntryStub(channel)

edge_spiffe_id = spiffeid_pb2.SPIFFEID(
trust_domain=trust_domain, path=f"/{edge_name}"
)

workload_list_entries_response = stub.ListEntries(
entry_pb2.ListEntriesRequest(
filter=entry_pb2.ListEntriesRequest.Filter(by_parent_id=edge_spiffe_id)
)
)
workloads_ids = [entry.id for entry in workload_list_entries_response.entries]

edge_list_entries_response = stub.ListEntries(
entry_pb2.ListEntriesRequest(
filter=entry_pb2.ListEntriesRequest.Filter(by_spiffe_id=edge_spiffe_id)
)
)
edge_ids = [entry.id for entry in edge_list_entries_response.entries]

batch_delete_request = entry_pb2.BatchDeleteEntryRequest(
ids=workloads_ids + edge_ids
)
stub.BatchDeleteEntry(batch_delete_request)
24 changes: 14 additions & 10 deletions components/edge-identity/backend/apps/default/routes/get.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from flask import current_app
from kubeflow.kubeflow.crud_backend import api, helpers, logging, status

from ..db import query_db
Expand All @@ -18,17 +19,20 @@ def get_edges(namespace):
"WHERE e.namespace = ?;",
(namespace,),
)
spire_config = current_app.config["FL_SUITE_CONFIG"]["fl_edge"]["auth"]["spire"]
fl_operator_config = current_app.config["FL_SUITE_CONFIG"]["fl_operator"]
if results is not None:
for edge in results:
edge_list.append(
{
"name": edge["name"],
"namespace": edge["namespace"],
# The status is hardcoded because it is only used for presentation purposes
"status": status.create_status(status.STATUS_PHASE.READY, "Ready"),
"age": helpers.get_uptime(edge["created_at"]),
"join_token": edge["join_token"],
}
)
edge_dict = {
"name": edge["name"],
"namespace": edge["namespace"],
# The status is hardcoded because it is only used for presentation purposes
"status": status.create_status(status.STATUS_PHASE.READY, "Ready"),
"age": helpers.get_uptime(edge["created_at"]),
"join_token": edge["join_token"],
}
edge_dict.update(spire_config)
edge_dict.update(fl_operator_config)
edge_list.append(edge_dict)

return api.success_response("edges", edge_list)
66 changes: 57 additions & 9 deletions components/edge-identity/backend/apps/default/routes/post.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from datetime import datetime

import grpc
from flask import request
from flask import current_app, request
from kubeflow.kubeflow.crud_backend import (api, decorators, helpers, logging,
status)
from spire.api.server.agent.v1 import agent_pb2, agent_pb2_grpc
from spire.api.server.entry.v1 import entry_pb2, entry_pb2_grpc
from spire.api.types import entry_pb2 as entry_type
from spire.api.types import selector_pb2, spiffeid_pb2

from ..db import get_db, query_db
from . import bp
Expand All @@ -23,12 +26,11 @@ def post_edge(namespace):
edge_name = body["name"]
created_at = datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ")

# Hard-coded to use spire-server's unix socket only. Used to get admin access.
with grpc.insecure_channel("unix:///tmp/spire-server/private/api.sock") as channel:
stub = agent_pb2_grpc.AgentStub(channel)

join_token_request = agent_pb2.CreateJoinTokenRequest(ttl=600)
join_token = stub.CreateJoinToken(join_token_request)
spire_config = current_app.config["FL_SUITE_CONFIG"]["fl_edge"]["auth"]["spire"]
join_token = register_spire_workloads(
spire_config["trust_domain"],
edge_name,
)

db_connection = get_db()
cur = db_connection.cursor()
Expand All @@ -47,19 +49,65 @@ def post_edge(namespace):
insert_join_token_auth_sql = (
"INSERT INTO JoinTokenAuth(edge_id, join_token) VALUES(?, ?)"
)
cur.execute(insert_join_token_auth_sql, (new_edge_id, join_token.value))
cur.execute(insert_join_token_auth_sql, (new_edge_id, join_token))
db_connection.commit()

fl_operator_config = current_app.config["FL_SUITE_CONFIG"]["fl_operator"]
# TODO: Refactor into model
new_edge = {
"name": edge_name,
"namespace": namespace,
# The status is hardcoded because it is only used for presentation purposes
"status": status.create_status(status.STATUS_PHASE.READY, "Ready"),
"age": helpers.get_uptime(created_at),
"join_token": join_token.value,
"join_token": join_token,
}
new_edge.update(spire_config)
new_edge.update(fl_operator_config)

log.info("Successfully created Edge %s/%s", namespace, edge_name)

return api.success_response("edge", new_edge)


def register_spire_workloads(trust_domain: str, edge_name: str) -> str:
# Hard-coded to use spire-server's unix socket only. Used to get admin access.
with grpc.insecure_channel("unix:///tmp/spire-server/private/api.sock") as channel:
stub = agent_pb2_grpc.AgentStub(channel)

edge_spiffe_id = spiffeid_pb2.SPIFFEID(
trust_domain=trust_domain, path=f"/{edge_name}"
)
join_token_request = agent_pb2.CreateJoinTokenRequest(
ttl=600, agent_id=edge_spiffe_id
)
join_token = stub.CreateJoinToken(join_token_request)

fl_operator_entry = entry_type.Entry(
parent_id=edge_spiffe_id,
spiffe_id=spiffeid_pb2.SPIFFEID(
trust_domain=trust_domain, path="/fl-operator"
),
selectors=[
selector_pb2.Selector(
type="k8s", value="pod-label:app:fl-operator-envoyproxy"
)
],
)
flower_client_entry = entry_type.Entry(
parent_id=edge_spiffe_id,
spiffe_id=spiffeid_pb2.SPIFFEID(
trust_domain=trust_domain,
path="/flower-client",
),
selectors=[
selector_pb2.Selector(type="k8s", value="pod-label:app:flower-client")
],
)
entry_stub = entry_pb2_grpc.EntryStub(channel)
entry_batch_request = entry_pb2.BatchCreateEntryRequest(
entries=[flower_client_entry, fl_operator_entry]
)
entry_stub.BatchCreateEntry(entry_batch_request)

return join_token.value
3 changes: 2 additions & 1 deletion components/edge-identity/backend/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ def get_config(mode):

BACKEND_MODE = os.environ.get("BACKEND_MODE", config.BackendMode.PRODUCTION.value)
PREFIX = os.environ.get("APP_PREFIX", "/")
FL_SUITE_CONFIG_PATH = os.environ.get("FL_SUITE_CONFIG_PATH")

cfg = get_config(BACKEND_MODE)
cfg.PREFIX = PREFIX

app = default.create_app("Edge identity", cfg)
app = default.create_app("Edge identity", cfg, FL_SUITE_CONFIG_PATH)


if __name__ == "__main__":
Expand Down
Loading

0 comments on commit 2fe0ac3

Please sign in to comment.