Skip to content

Commit

Permalink
Merge pull request #150 from oda-hub:138-add-support-for-annotations-…
Browse files Browse the repository at this point in the history
…of-workflow-access-to-resource-storage-and-compute

138 add support for annotations of workflow access to resource storage and compute
  • Loading branch information
volodymyrss authored Mar 15, 2024
2 parents 55baff0 + 0f4db5c commit 25ae2e9
Show file tree
Hide file tree
Showing 5 changed files with 228 additions and 40 deletions.
149 changes: 111 additions & 38 deletions nb2workflow/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
from textwrap import dedent
import uuid
from kubernetes import client, config
import glob
import rdflib
from oda_api.ontology_helper import Ontology
from nb2workflow.nbadapter import NotebookAdapter

logger = logging.getLogger(__name__)

Expand All @@ -26,6 +30,7 @@
"filename_pattern": '.*',
}

default_ontology_path = "https://odahub.io/ontology/ontology.ttl"

#TODO: probably want an option to really use the dir
def determine_origin(repo):
Expand Down Expand Up @@ -62,14 +67,16 @@ def build_container(git_origin,
engine="docker",
cleanup=False,
nb2wversion=version(),
ontology_path=default_ontology_path,
**kwargs):
if engine == "docker":
return _build_with_docker(git_origin=git_origin,
local=local,
run_tests=run_tests,
registry=registry,
build_timestamp=build_timestamp,
nb2wversion=nb2wversion)
nb2wversion=nb2wversion,
ontology_path=ontology_path)
elif engine == 'kaniko':
if run_tests == True:
logger.warning("KANIKO builder doesn't support run_tests . Will switch off")
Expand All @@ -78,7 +85,8 @@ def build_container(git_origin,
local=local,
build_timestamp=build_timestamp,
namespace=kwargs['namespace'],
nb2wversion=nb2wversion
nb2wversion=nb2wversion,
ontology_path=ontology_path
)
else:
return NotImplementedError('Unknown container build engine: %s', engine)
Expand Down Expand Up @@ -183,7 +191,8 @@ def _build_with_kaniko(git_origin,
build_timestamp=False,
namespace="oda-staging",
cleanup=True,
nb2wversion=version()):
nb2wversion=version(),
ontology_path=default_ontology_path):

#secret should be created beforehand https://github.com/GoogleContainerTools/kaniko#pushing-to-docker-hub

Expand All @@ -192,7 +201,8 @@ def _build_with_kaniko(git_origin,
build_timestamp=build_timestamp,
dry_run=True,
source_from='git',
nb2wversion=nb2wversion)
nb2wversion=nb2wversion,
ontology_path=ontology_path)

dockerfile_content = container_metadata['dockerfile_content']

Expand Down Expand Up @@ -298,15 +308,36 @@ def _build_with_kaniko(git_origin,
return container_metadata


def _build_with_docker(git_origin,
def _extract_resource_requirements(local_repo_path, ontology_path=default_ontology_path):
ontology = Ontology(ontology_path)
resources = {}

search_pattern = os.path.join(local_repo_path,'**/*.ipynb')
for nb_file in glob.glob(search_pattern, recursive=True):
nba = NotebookAdapter(nb_file)
g = rdflib.Graph()
g.parse(data=nba.extra_ttl)
for r in ontology.get_requested_resources(g):
resource_name = r['resource'].lower()
if resource_name in resources:
resource_settings = resources[resource_name]
resource_settings['required'] = resource_settings['required'] or r['required']
resource_settings['env_vars'] = r['env_vars'].union(resource_settings['env_vars'])
else:
resources[resource_name] = r

return resources

def _build_with_docker(git_origin,
local=False,
run_tests=True,
registry="odahub",
build_timestamp=False,
dry_run=False,
source_from='localdir',
cleanup=False,
nb2wversion=version()):
nb2wversion=version(),
ontology_path=default_ontology_path):
if cleanup:
logger.warning('Post-build cleanup is not implemented for docker builds')

Expand All @@ -332,6 +363,8 @@ def _build_with_docker(git_origin,
["git", "log", "-1", "--pretty=format:'%ai'"], # could use all authors too, but it's inside anyway
cwd=local_repo_path ).decode().strip()

meta['resources'] = _extract_resource_requirements(local_repo_path, ontology_path)

dockerfile_content = _nb2w_dockerfile_gen(tmpdir, git_origin, source_from, meta, nb2wversion)

ts = '-' + time.strftime(r'%y%m%d%H%M%S') if build_timestamp else ''
Expand Down Expand Up @@ -376,7 +409,47 @@ def _build_with_docker(git_origin,
"last_change_time": meta['last_change_time'],
"workflow_dispatcher_signature": workflow_dispatcher_signature,
"workflow_nb_signature": workflow_nb_signature,
"dockerfile_content": dockerfile_content}
"dockerfile_content": dockerfile_content,
"resources": meta["resources"]}


def append_k8s_secrets(resources, container, namespace):
secrets = []
for name, resource in resources.items():
verify_resource_secret(name, resource['required'], namespace=namespace)
for env in resource['env_vars']:
secrets.append(
{
"name": env,
"valueFrom": {
"secretKeyRef": {"name": name, "key": "credentials"}
}
}
)
if len(secrets) > 0:
container["env"] = secrets


def get_k8s_secrets(namespace="oda-staging"):
json_data = sp.check_output(["kubectl", "get", "secrets", "-n", namespace, "-o", "json"])
items = json.loads(json_data)['items']
for secret in items:
yield secret['metadata']['name'], secret['data']


def verify_resource_secret(name, required, namespace="oda-staging"):
# credentials
for secret_name, secret in get_k8s_secrets(namespace=namespace):
if secret_name == name:
if 'credentials' not in secret:
raise NameError(f"No credentials defined for secret {name}")
return True
message = f"No secrets defined for {name}"
if required:
raise RuntimeError(message)
else:
logger.warning(message)


def deploy_k8s(container_info,
deployment_base_name,
Expand All @@ -385,38 +458,25 @@ def deploy_k8s(container_info,
check_live_through="oda-dispatcher"):

deployment_name = deployment_base_name + "-backend"
try:
sp.check_call(
["kubectl", "patch", "deployment", deployment_name, "-n", namespace,
container = {"name": deployment_name, "image": container_info['image']}
append_k8s_secrets(container_info['resources'], container, namespace)
patch_command = ["kubectl", "patch", "deployment", deployment_name, "-n", namespace,
"--type", "merge",
"-p",
"-p",
json.dumps(
{"spec":{"template":{"spec":{
"containers":[
{"name": deployment_name, "image": container_info['image']}
container
]}}}})
]
)

try:
sp.check_call(patch_command)
except sp.CalledProcessError:
sp.check_call(
["kubectl", "create", "deployment", deployment_name, "-n", namespace, "--image=" + container_info['image']]
)
time.sleep(5) # avoid race condition. time for deployment to be created in k8s

# needed to set the proper container name
sp.check_call(
["kubectl", "patch", "deployment", deployment_name, "-n", namespace,
"--type", "merge",
"-p",
json.dumps(
{"spec":{"template":{"spec":{
"containers":[
{"name": deployment_name, "image": container_info['image']}
]}}}})
]
)

time.sleep(5) # avoid race condition. time for deployment to be created in k8s
sp.check_call(patch_command) # needed to set the proper container name and env vars
finally:
sp.check_call(
["kubectl", "patch", "deployment", deployment_name, "-n", namespace,
Expand All @@ -433,15 +493,15 @@ def deploy_k8s(container_info,
]}}}})
]
)

# expose if service doesn't exist
try:
sp.check_call(["kubectl", "get", "service", deployment_name, "-n", namespace])
except sp.CalledProcessError:
except sp.CalledProcessError:
sp.check_call(
["kubectl", "expose", "deployment", deployment_name, "--name", deployment_name,
["kubectl", "expose", "deployment", deployment_name, "--name", deployment_name,
"--port", "8000", "-n", namespace])

if check_live:
logging.info("will check live")

Expand Down Expand Up @@ -506,21 +566,32 @@ def deploy(git_origin,
build_engine='docker',
build_timestamp=False,
cleanup=False,
nb2wversion=version()):
nb2wversion=version(),
ontology_path=default_ontology_path):

container = build_container(git_origin,
container = build_container(git_origin,
local=local,
run_tests=run_tests,
registry=registry,
engine=build_engine,
namespace=namespace,
build_timestamp=build_timestamp,
cleanup=cleanup,
nb2wversion=nb2wversion)
nb2wversion=nb2wversion,
ontology_path=ontology_path)

if local:
env_params = []
for name, resource in container['resources'].items():
for env in resource['env_vars']:
env_val = os.getenv(env)
if env_val:
env_params += ["-e", env]
elif resource['required']:
raise RuntimeError(f'Required environment variable {env} is missing')

sp.check_call( # cli is more stable than python API
["docker", "run", '-p', '8000:8000', container['image']])
["docker", "run", '-p', '8000:8000'] + env_params + [container['image']])
else:
deployment_info = deploy_k8s(container,
deployment_base_name,
Expand All @@ -538,6 +609,7 @@ def main():
parser.add_argument('--local', action="store_true", default=False)
parser.add_argument('--build-engine', metavar="build_engine", default="docker")
parser.add_argument('--nb2wversion', metavar="nb2wversion", default=version())
parser.add_argument('--ontology-path', metavar="ontology_path", default=default_ontology_path)

args = parser.parse_args()

Expand All @@ -548,7 +620,8 @@ def main():
namespace=args.namespace,
local=args.local,
build_engine=args.build_engine,
nb2wversion=args.nb2wversion)
nb2wversion=args.nb2wversion,
ontology_path=args.ontology_path)


if __name__ == "__main__":
Expand Down
1 change: 1 addition & 0 deletions nb2workflow/semantics.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def understand_comment_references(comment, base_uri=None, fallback_type=None) ->
parse_failures = []

variations = [
f"{comment}",
f"{base_uri.n3()} a {comment} .",
f"{base_uri.n3()} {comment} .",
f"{base_uri.n3()} a {comment}",
Expand Down
31 changes: 31 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import signal
import psutil
import subprocess
import tempfile
import requests

import nb2workflow.service

Expand Down Expand Up @@ -63,6 +65,35 @@ def kill_child_processes(parent_pid, sig=signal.SIGTERM):
return


def download_file(url, local_filename=None):
if local_filename is None:
local_filename = url.split('/')[-1]
# NOTE the stream=True parameter below
with requests.get(url, stream=True) as r:
r.raise_for_status()
with open(local_filename, 'wb') as f:
for chunk in r.iter_content(chunk_size=8192):
# If you have chunk encoded response uncomment if
# and set chunk_size parameter to None.
#if chunk:
f.write(chunk)
return local_filename


@pytest.fixture(scope="module")
def temp_dir(request):
with tempfile.TemporaryDirectory() as tmpdir:
yield tmpdir


@pytest.fixture(scope="module")
def ontology_path(temp_dir):
ontology_url = "https://raw.githubusercontent.com/oda-hub/ontology/main/ontology.ttl"
ontology_path = os.path.join(temp_dir, "ontology.ttl")
download_file(ontology_url, ontology_path)
# subprocess.check_call(["wget", ontology_url, "-O", ontology_path])
yield ontology_path

@pytest.fixture
def service_fixture(pytestconfig, test_notebook_repo):
import subprocess
Expand Down
Loading

0 comments on commit 25ae2e9

Please sign in to comment.