From a34d725c4102f87ffb17c8ae8c7f20538cf3a1ec Mon Sep 17 00:00:00 2001 From: dcvan Date: Tue, 12 Jun 2018 16:40:33 -0400 Subject: [PATCH] [#19] restructured the scheduler code and made it pluggable --- appliance/manager.py | 51 +++++----- commons.py | 2 +- schedule/__init__.py | 93 +++++++++++++++++++ {scheduler => schedule}/appliance/__init__.py | 0 {scheduler => schedule}/appliance/base.py | 0 {scheduler => schedule}/appliance/manager.py | 0 .../appliance/plugin/location/__init__.py | 0 schedule/default.py | 27 ++++++ 8 files changed, 144 insertions(+), 29 deletions(-) create mode 100644 schedule/__init__.py rename {scheduler => schedule}/appliance/__init__.py (100%) rename {scheduler => schedule}/appliance/base.py (100%) rename {scheduler => schedule}/appliance/manager.py (100%) rename {scheduler => schedule}/appliance/plugin/location/__init__.py (100%) create mode 100644 schedule/default.py diff --git a/appliance/manager.py b/appliance/manager.py index 43ab07e..adb6c76 100644 --- a/appliance/manager.py +++ b/appliance/manager.py @@ -1,23 +1,9 @@ -import sys -import importlib - from config import config from commons import MongoClient, AutonomousMonitor from commons import Manager, APIManager from appliance.base import Appliance from container.manager import ContainerManager -from scheduler.appliance.manager import ApplianceDAGDBManager - - -def get_scheduler(): - try: - sched_mod = '.'.join(config.pivot.scheduler.split('.')[:-1]) - sched_class = config.pivot.scheduler.split('.')[-1] - return getattr(importlib.import_module(sched_mod), sched_class) - except Exception as e: - sys.stderr.write(str(e) + '\n') - from scheduler import DefaultApplianceScheduler - return DefaultApplianceScheduler +from schedule import ApplianceScheduleNegotiator class ApplianceManager(Manager): @@ -26,8 +12,6 @@ def __init__(self): self.__app_api = ApplianceAPIManager() self.__contr_mgr = ContainerManager() self.__app_db = ApplianceDBManager() - self.__sched_class = get_scheduler() - self.logger.info('Scheduler: %s'%self.__sched_class.__name__) async def get_appliance(self, app_id): status, app, err = await self.__app_db.get_appliance(app_id) @@ -59,18 +43,15 @@ async def create_appliance(self, data): if err: self.logger.error(err) return status, None, err + err = self._validate_appliance(app) + if err: + return err status, msg, err = await self.save_appliance(app, True) if err: self.logger.error(err) return status, None, err self.logger.info(msg) - self.logger.info("Start monitoring appliance '%s'"%app) - scheduler = self.__sched_class(app) - status, msg, err = await scheduler.initialize() - if status != 200: - self.logger.error(err) - return status, msg, err - scheduler.start() + ApplianceScheduleNegotiator(app.id, 3000).start() return 201, app, None async def delete_appliance(self, app_id): @@ -94,8 +75,24 @@ async def delete_appliance(self, app_id): async def save_appliance(self, app, upsert=True): return await self.__app_db.save_appliance(app, upsert) - async def restore_appliance(self, app_id): - raise NotImplemented + def _validate_appliance(self, app): + + def validate_dependencies(contrs): + contrs = {c.id: c for c in contrs} + parents = {} + for c in contrs.values(): + nonexist = list(filter(lambda c: c not in contrs, c.dependencies)) + if nonexist: + return 422, None, "Dependencies '%s' do not exist in this appliance"%nonexist + parents.setdefault(c.id, set()).update(c.dependencies) + for c, p in parents.items(): + cycles = ['%s<->%s'%(c, pp) + for pp in filter(lambda x: c in parents.get(x, set()), p)] + if cycles: + return 422, None, "Cycle(s) found: %s"%cycles + + return validate_dependencies(app.containers) + class ApplianceAPIManager(APIManager): @@ -147,7 +144,6 @@ def __init__(self, app_id): self.__app_id = app_id self.__app_api = ApplianceAPIManager() self.__app_db = ApplianceDBManager() - self.__dag_db = ApplianceDAGDBManager() async def callback(self): status, _, err = await self.__app_api.get_appliance(self.__app_id) @@ -158,7 +154,6 @@ async def callback(self): if status == 404: self.logger.info("Delete appliance '%s' from database"%self.__app_id) await self.__app_db.delete_appliance(self.__app_id) - await self.__dag_db.delete_appliance_dag(self.__app_id) elif status != 200: self.logger.error(err) self.stop() diff --git a/commons.py b/commons.py index 007a34d..da0c10b 100644 --- a/commons.py +++ b/commons.py @@ -6,7 +6,7 @@ from abc import ABCMeta, abstractmethod from tornado.httpclient import AsyncHTTPClient, HTTPError from tornado.ioloop import PeriodicCallback -from motor.motor_tornado import MotorClient, MotorDatabase +from motor.motor_tornado import MotorClient from util import error, dirname from config import config diff --git a/schedule/__init__.py b/schedule/__init__.py new file mode 100644 index 0000000..ecbc3c2 --- /dev/null +++ b/schedule/__init__.py @@ -0,0 +1,93 @@ +import sys +import importlib +import appliance + +from abc import ABCMeta + +from commons import AutonomousMonitor, Singleton, Loggable +from container.manager import ContainerManager +from config import config + + +def get_scheduler(): + try: + sched_mod = '.'.join(config.pivot.scheduler.split('.')[:-1]) + sched_class = config.pivot.scheduler.split('.')[-1] + return getattr(importlib.import_module(sched_mod), sched_class) + except Exception as e: + sys.stderr.write(str(e) + '\n') + from schedule.default import DefaultApplianceScheduler + return DefaultApplianceScheduler + + +class ApplianceScheduleNegotiator(AutonomousMonitor): + + def __init__(self, app_id, interval=3000): + super(ApplianceScheduleNegotiator, self).__init__(interval) + self.__app_id = app_id + self.__executor = ApplianceScheduleExecutor() + self.__scheduler = get_scheduler()() + self.__app_mgr = appliance.manager.ApplianceManager() + + async def callback(self): + # read appliance from DB + status, app, err = await self.__app_mgr.get_appliance(self.__app_id) + if not app: + if status == 404: + self.logger.info('Appliance %s no longer exists'%self.__app_id) + else: + self.logger.error(err) + self.stop() + return + # contact the scheduler for new schedule + sched = self.__scheduler.schedule(app) + self.logger.info('Containers to be scheduled: %s'%[c.id for c in sched.containers]) + # if the scheduling is done + if sched.done: + self.logger.info('Scheduling is done for appliance %s'%self.__app_id) + self.stop() + return + # execute the new schedule + await self.__executor.execute(sched) + + +class ApplianceScheduleExecutor(Loggable, metaclass=Singleton): + + def __init__(self): + self.__contr_mgr = ContainerManager() + + async def execute(self, sched): + for c in sched.containers: + _, msg, err = await self.__contr_mgr.provision_container(c) + if err: + self.logger.error(err) + self.logger.info('Container %s is being provisioned'%c.id) + + +class Schedule: + + def __init__(self, done=False, containers=[]): + self.__done = done + self.__containers = list(containers) + + @property + def done(self): + return self.__done + + @property + def containers(self): + return list(self.__containers) + + @done.setter + def done(self, done): + self.__done = done + + def add_containers(self, contrs): + self.__containers += list(contrs) + + +class ApplianceScheduler(Loggable, metaclass=ABCMeta): + + def schedule(self, app): + raise NotImplemented + diff --git a/scheduler/appliance/__init__.py b/schedule/appliance/__init__.py similarity index 100% rename from scheduler/appliance/__init__.py rename to schedule/appliance/__init__.py diff --git a/scheduler/appliance/base.py b/schedule/appliance/base.py similarity index 100% rename from scheduler/appliance/base.py rename to schedule/appliance/base.py diff --git a/scheduler/appliance/manager.py b/schedule/appliance/manager.py similarity index 100% rename from scheduler/appliance/manager.py rename to schedule/appliance/manager.py diff --git a/scheduler/appliance/plugin/location/__init__.py b/schedule/appliance/plugin/location/__init__.py similarity index 100% rename from scheduler/appliance/plugin/location/__init__.py rename to schedule/appliance/plugin/location/__init__.py diff --git a/schedule/default.py b/schedule/default.py new file mode 100644 index 0000000..d158ffa --- /dev/null +++ b/schedule/default.py @@ -0,0 +1,27 @@ +from container.base import ContainerType, ContainerState +from schedule import ApplianceScheduler, Schedule + + +class DefaultApplianceScheduler(ApplianceScheduler): + + def schedule(self, app): + sched = Schedule() + free_contrs = self._resolve_dependencies(app) + self.logger.info('Free containers: %s'%[c.id for c in free_contrs]) + if not free_contrs: + sched.done = True + return sched + sched.add_containers([c for c in free_contrs if c.state in + (ContainerState.SUBMITTED, ContainerState.FAILED)]) + return sched + + def _resolve_dependencies(self, app): + contrs = {c.id: c for c in app.containers + if (c.type == ContainerType.JOB and c.state != ContainerState.SUCCESS) + or (c.type == ContainerType.SERVICE and c.state != ContainerState.RUNNING)} + parents = {} + for c in contrs.values(): + parents.setdefault(c.id, set()).update([d for d in c.dependencies if d in contrs]) + return [contrs[k] for k, v in parents.items() if not v] + +