Skip to content

Commit

Permalink
[#19] restructured the scheduler code and made it pluggable
Browse files Browse the repository at this point in the history
  • Loading branch information
dcvan24 committed Jun 12, 2018
1 parent 1bbe453 commit a34d725
Show file tree
Hide file tree
Showing 8 changed files with 144 additions and 29 deletions.
51 changes: 23 additions & 28 deletions appliance/manager.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,9 @@
import sys
import importlib

from config import config
from commons import MongoClient, AutonomousMonitor
from commons import Manager, APIManager
from appliance.base import Appliance
from container.manager import ContainerManager
from scheduler.appliance.manager import ApplianceDAGDBManager


def get_scheduler():
try:
sched_mod = '.'.join(config.pivot.scheduler.split('.')[:-1])
sched_class = config.pivot.scheduler.split('.')[-1]
return getattr(importlib.import_module(sched_mod), sched_class)
except Exception as e:
sys.stderr.write(str(e) + '\n')
from scheduler import DefaultApplianceScheduler
return DefaultApplianceScheduler
from schedule import ApplianceScheduleNegotiator


class ApplianceManager(Manager):
Expand All @@ -26,8 +12,6 @@ def __init__(self):
self.__app_api = ApplianceAPIManager()
self.__contr_mgr = ContainerManager()
self.__app_db = ApplianceDBManager()
self.__sched_class = get_scheduler()
self.logger.info('Scheduler: %s'%self.__sched_class.__name__)

async def get_appliance(self, app_id):
status, app, err = await self.__app_db.get_appliance(app_id)
Expand Down Expand Up @@ -59,18 +43,15 @@ async def create_appliance(self, data):
if err:
self.logger.error(err)
return status, None, err
err = self._validate_appliance(app)
if err:
return err
status, msg, err = await self.save_appliance(app, True)
if err:
self.logger.error(err)
return status, None, err
self.logger.info(msg)
self.logger.info("Start monitoring appliance '%s'"%app)
scheduler = self.__sched_class(app)
status, msg, err = await scheduler.initialize()
if status != 200:
self.logger.error(err)
return status, msg, err
scheduler.start()
ApplianceScheduleNegotiator(app.id, 3000).start()
return 201, app, None

async def delete_appliance(self, app_id):
Expand All @@ -94,8 +75,24 @@ async def delete_appliance(self, app_id):
async def save_appliance(self, app, upsert=True):
return await self.__app_db.save_appliance(app, upsert)

async def restore_appliance(self, app_id):
raise NotImplemented
def _validate_appliance(self, app):

def validate_dependencies(contrs):
contrs = {c.id: c for c in contrs}
parents = {}
for c in contrs.values():
nonexist = list(filter(lambda c: c not in contrs, c.dependencies))
if nonexist:
return 422, None, "Dependencies '%s' do not exist in this appliance"%nonexist
parents.setdefault(c.id, set()).update(c.dependencies)
for c, p in parents.items():
cycles = ['%s<->%s'%(c, pp)
for pp in filter(lambda x: c in parents.get(x, set()), p)]
if cycles:
return 422, None, "Cycle(s) found: %s"%cycles

return validate_dependencies(app.containers)



class ApplianceAPIManager(APIManager):
Expand Down Expand Up @@ -147,7 +144,6 @@ def __init__(self, app_id):
self.__app_id = app_id
self.__app_api = ApplianceAPIManager()
self.__app_db = ApplianceDBManager()
self.__dag_db = ApplianceDAGDBManager()

async def callback(self):
status, _, err = await self.__app_api.get_appliance(self.__app_id)
Expand All @@ -158,7 +154,6 @@ async def callback(self):
if status == 404:
self.logger.info("Delete appliance '%s' from database"%self.__app_id)
await self.__app_db.delete_appliance(self.__app_id)
await self.__dag_db.delete_appliance_dag(self.__app_id)
elif status != 200:
self.logger.error(err)
self.stop()
2 changes: 1 addition & 1 deletion commons.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from abc import ABCMeta, abstractmethod
from tornado.httpclient import AsyncHTTPClient, HTTPError
from tornado.ioloop import PeriodicCallback
from motor.motor_tornado import MotorClient, MotorDatabase
from motor.motor_tornado import MotorClient

from util import error, dirname
from config import config
Expand Down
93 changes: 93 additions & 0 deletions schedule/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import sys
import importlib
import appliance

from abc import ABCMeta

from commons import AutonomousMonitor, Singleton, Loggable
from container.manager import ContainerManager
from config import config


def get_scheduler():
try:
sched_mod = '.'.join(config.pivot.scheduler.split('.')[:-1])
sched_class = config.pivot.scheduler.split('.')[-1]
return getattr(importlib.import_module(sched_mod), sched_class)
except Exception as e:
sys.stderr.write(str(e) + '\n')
from schedule.default import DefaultApplianceScheduler
return DefaultApplianceScheduler


class ApplianceScheduleNegotiator(AutonomousMonitor):

def __init__(self, app_id, interval=3000):
super(ApplianceScheduleNegotiator, self).__init__(interval)
self.__app_id = app_id
self.__executor = ApplianceScheduleExecutor()
self.__scheduler = get_scheduler()()
self.__app_mgr = appliance.manager.ApplianceManager()

async def callback(self):
# read appliance from DB
status, app, err = await self.__app_mgr.get_appliance(self.__app_id)
if not app:
if status == 404:
self.logger.info('Appliance %s no longer exists'%self.__app_id)
else:
self.logger.error(err)
self.stop()
return
# contact the scheduler for new schedule
sched = self.__scheduler.schedule(app)
self.logger.info('Containers to be scheduled: %s'%[c.id for c in sched.containers])
# if the scheduling is done
if sched.done:
self.logger.info('Scheduling is done for appliance %s'%self.__app_id)
self.stop()
return
# execute the new schedule
await self.__executor.execute(sched)


class ApplianceScheduleExecutor(Loggable, metaclass=Singleton):

def __init__(self):
self.__contr_mgr = ContainerManager()

async def execute(self, sched):
for c in sched.containers:
_, msg, err = await self.__contr_mgr.provision_container(c)
if err:
self.logger.error(err)
self.logger.info('Container %s is being provisioned'%c.id)


class Schedule:

def __init__(self, done=False, containers=[]):
self.__done = done
self.__containers = list(containers)

@property
def done(self):
return self.__done

@property
def containers(self):
return list(self.__containers)

@done.setter
def done(self, done):
self.__done = done

def add_containers(self, contrs):
self.__containers += list(contrs)


class ApplianceScheduler(Loggable, metaclass=ABCMeta):

def schedule(self, app):
raise NotImplemented

File renamed without changes.
File renamed without changes.
File renamed without changes.
27 changes: 27 additions & 0 deletions schedule/default.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from container.base import ContainerType, ContainerState
from schedule import ApplianceScheduler, Schedule


class DefaultApplianceScheduler(ApplianceScheduler):

def schedule(self, app):
sched = Schedule()
free_contrs = self._resolve_dependencies(app)
self.logger.info('Free containers: %s'%[c.id for c in free_contrs])
if not free_contrs:
sched.done = True
return sched
sched.add_containers([c for c in free_contrs if c.state in
(ContainerState.SUBMITTED, ContainerState.FAILED)])
return sched

def _resolve_dependencies(self, app):
contrs = {c.id: c for c in app.containers
if (c.type == ContainerType.JOB and c.state != ContainerState.SUCCESS)
or (c.type == ContainerType.SERVICE and c.state != ContainerState.RUNNING)}
parents = {}
for c in contrs.values():
parents.setdefault(c.id, set()).update([d for d in c.dependencies if d in contrs])
return [contrs[k] for k, v in parents.items() if not v]


0 comments on commit a34d725

Please sign in to comment.