Skip to content

Commit

Permalink
Merge pull request #296 from CybercentreCanada/persistent-service-update
Browse files Browse the repository at this point in the history
Persistent service update
  • Loading branch information
cccs-douglass authored Sep 14, 2021
2 parents 0d01b05 + 02c00f7 commit 6a7ca28
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 12 deletions.
40 changes: 30 additions & 10 deletions assemblyline_core/scaler/scaler_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class ServiceProfile:
This includes how the service should be run, and conditions related to the scaling of the service.
"""

def __init__(self, name: str, container_config: DockerConfig, config_hash:int=0, min_instances:int=0, max_instances:int=None,
def __init__(self, name: str, container_config: DockerConfig, config_blob:str='', min_instances:int=0, max_instances:int=None,
growth: float = 600, shrink: Optional[float] = None, backlog:int=500, queue=None, shutdown_seconds:int=30):
"""
:param name: Name of the service to manage
Expand All @@ -127,7 +127,7 @@ def __init__(self, name: str, container_config: DockerConfig, config_hash:int=0,
self.high_duty_cycle = 0.7
self.low_duty_cycle = 0.5
self.shutdown_seconds = shutdown_seconds
self.config_hash = config_hash
self.config_blob = config_blob

# How many instances we want, and can have
self.min_instances: int = max(0, int(min_instances))
Expand Down Expand Up @@ -211,7 +211,7 @@ def __deepcopy__(self, memodict=None):
prof = ServiceProfile(
name=self.name,
container_config=DockerConfig(self.container_config.as_primitives()),
config_hash=self.config_hash,
config_blob=self.config_blob,
min_instances=self.min_instances,
max_instances=self.max_instances,
growth=self.growth_threshold,
Expand Down Expand Up @@ -378,7 +378,7 @@ def prepare_container(docker_config: DockerConfig) -> DockerConfig:
# noinspection PyBroadException
try:
if service.enabled and (stage == ServiceStage.Off or name not in self.profiles):
# Enable this service's dependencies
# Enable this service's dependencies before trying to launch the service containers
self.controller.prepare_network(service.name, service.docker_config.allow_internet_access)
for _n, dependency in service.dependencies.items():
dependency.container = prepare_container(dependency.container)
Expand All @@ -401,14 +401,22 @@ def prepare_container(docker_config: DockerConfig) -> DockerConfig:

# Check that all enabled services are enabled
if service.enabled and stage == ServiceStage.Running:
# Compute a hash of service properties not include in the docker config, that
# Compute a blob of service properties not include in the docker config, that
# should still result in a service being restarted when changed
config_hash = hash(str(sorted(service.config.items())))
config_hash = hash((config_hash, str(service.submission_params)))
config_blob = str(sorted(service.config.items()))
config_blob += str(service.submission_params)

# Build the docker config for the service, we are going to either create it or
# update it so we need to know what the current configuration is either way
docker_config = prepare_container(service.docker_config)
config_blob += str(docker_config)

# Build the docker config for the dependencies.
dependency_config = {}
for _n, dependency in service.dependencies.items():
dependency.container = prepare_container(dependency.container)
dependency_config[_n] = dependency
config_blob += str(sorted(dependency_config.items()))

# Add the service to the list of services being scaled
with self.profiles_lock:
Expand All @@ -419,7 +427,7 @@ def prepare_container(docker_config: DockerConfig) -> DockerConfig:
min_instances=default_settings.min_instances,
growth=default_settings.growth,
shrink=default_settings.shrink,
config_hash=config_hash,
config_blob=config_blob,
backlog=default_settings.backlog,
max_instances=service.licence_count,
container_config=docker_config,
Expand All @@ -436,13 +444,25 @@ def prepare_container(docker_config: DockerConfig) -> DockerConfig:
else:
profile._max_instances = service.licence_count

if profile.container_config != docker_config or profile.config_hash != config_hash:
if profile.config_blob != config_blob:
self.log.info(f"Updating deployment information for {name}")
# Update the dependencies. Should do nothing if container spec is the same.
# let kubernetes decide if anything needs to change though.
for _n, dependency in dependency_config.items():
self.controller.start_stateful_container(
service_name=service.name,
container_name=_n,
spec=dependency,
labels={'dependency_for': service.name}
)

# Update the service itself
profile.container_config = docker_config
profile.config_hash = config_hash
profile.config_blob = config_blob
self.controller.restart(profile)
self.log.info(f"Deployment information for {name} replaced")


except Exception:
self.log.exception(f"Error applying service settings from: {service.name}")
self.handle_service_error(service.name)
Expand Down
4 changes: 2 additions & 2 deletions assemblyline_core/server_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import sys
import io
import os
from typing import cast, Callable, TYPE_CHECKING
from typing import Callable, TYPE_CHECKING

from assemblyline.remote.datatypes import get_client
from assemblyline.remote.datatypes.hash import Hash
Expand Down Expand Up @@ -208,7 +208,7 @@ def __init__(self, component_name: str, logger: logging.Logger = None,
)

# Create a cached service data object, and access to the service status
self.service_info = cast(dict[str, Service], forge.CachedObject(self._get_services))
self.service_info: dict[str, Service] = forge.CachedObject(self._get_services)
self._service_stage_hash = get_service_stage_hash(self.redis)

def _get_services(self):
Expand Down

0 comments on commit 6a7ca28

Please sign in to comment.