Skip to content

Commit

Permalink
[#19] allow passing custom configurations for appliance-level schedul…
Browse files Browse the repository at this point in the history
…er; decoupled iRODS configuration from PIVOT framework; bug fix
  • Loading branch information
dcvan24 committed Jul 5, 2018
1 parent f07b2aa commit 451e749
Show file tree
Hide file tree
Showing 10 changed files with 128 additions and 59 deletions.
13 changes: 8 additions & 5 deletions appliance/base.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import swagger

from container.base import Container, get_short_ids
from schedule.base import Scheduler


@swagger.model
Expand Down Expand Up @@ -45,10 +46,11 @@ def parse(cls, data):
return 200, app, None

def __init__(self, id, containers=[],
scheduler='schedule.local.DefaultApplianceScheduler', **kwargs):
scheduler=Scheduler(name='schedule.local.DefaultApplianceScheduler'),
**kwargs):
self.__id = id
self.__containers = list(containers)
self.__scheduler = scheduler
self.__scheduler = scheduler if isinstance(scheduler, Scheduler) else Scheduler(**scheduler)

@property
@swagger.property
Expand Down Expand Up @@ -85,7 +87,7 @@ def scheduler(self):
Appliance-level scheduler for the appliance
---
type: str
type: Scheduler
"""
return self.__scheduler
Expand All @@ -95,11 +97,12 @@ def containers(self, contrs):
self.__containers = list(contrs)

def to_render(self):
return dict(id=self.id, scheduler=self.scheduler,
return dict(id=self.id,
scheduler=self.scheduler.to_render(),
containers=[c.to_render() for c in self.containers])

def to_save(self):
return dict(id=self.id, scheduler=self.scheduler)
return dict(id=self.id, scheduler=self.scheduler.to_save())

def __str__(self):
return self.id
Expand Down
8 changes: 4 additions & 4 deletions appliance/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,11 @@ def validate_dependencies(contrs):

return validate_dependencies(app.containers)

def _get_scheduler(self, scheduler_name):
def _get_scheduler(self, sched):
try:
sched_mod = '.'.join(scheduler_name.split('.')[:-1])
sched_class = scheduler_name.split('.')[-1]
return getattr(importlib.import_module(sched_mod), sched_class)()
sched_mod = '.'.join(sched.name.split('.')[:-1])
sched_class = sched.name.split('.')[-1]
return getattr(importlib.import_module(sched_mod), sched_class)(sched.config)
except Exception as e:
self.logger.error(str(e))
return schedule.local.DefaultApplianceScheduler()
Expand Down
3 changes: 1 addition & 2 deletions commons.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,7 @@ async def _fetch(self, host, port, endpoint, method, body, is_https=False, **hea
except (ConnectionRefusedError, ConnectionResetError):
self.logger.warning('Connection refused/reset, retry after 3 seconds')
time.sleep(3)
await self._fetch(host, port, endpoint, method, body, is_https, **headers)

return await self._fetch(host, port, endpoint, method, body, is_https, **headers)


class Manager(Loggable, metaclass=Singleton):
Expand Down
16 changes: 2 additions & 14 deletions config.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,6 @@ def __init__(self, port=8181, *args, **kwargs):
super(ExhibitorAPI, self).__init__(*args, **kwargs)


class iRODSAPI(API):

def __init__(self, port=0, *args, **kwargs):
kwargs.update(port=port, endpoint='/v1')
super(iRODSAPI, self).__init__(*args, **kwargs)


class GeneralConfig:

Expand Down Expand Up @@ -145,18 +139,16 @@ def read_config(cls, cfg_file_path):
mesos=MesosAPI(**cfg.get('mesos', {})),
marathon=MarathonAPI(**cfg.get('marathon', {})),
chronos=ChronosAPI(**cfg.get('chronos', {})),
exhibitor=ExhibitorAPI(**cfg.get('exhibitor', {})),
irods=iRODSAPI(**cfg.get('irods', {})))
exhibitor=ExhibitorAPI(**cfg.get('exhibitor', {})))

def __init__(self, pivot, db, mesos=None, marathon=None, chronos=None,
exhibitor=None, irods=None, *args, **kwargs):
exhibitor=None, *args, **kwargs):
self.__pivot = pivot
self.__db = db
self.__mesos = mesos
self.__marathon = marathon
self.__chronos = chronos
self.__exhibitor = exhibitor
self.__irods = irods

@property
def pivot(self):
Expand All @@ -182,10 +174,6 @@ def chronos(self):
def exhibitor(self):
return self.__exhibitor

@property
def irods(self):
return self.__irods


config = Configuration.read_config('%s/config.yml'%dirname(__file__))

Expand Down
3 changes: 0 additions & 3 deletions config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,5 @@ chronos:
port: 9090
exhibitor:
port: 8181
irods:
host: 54.236.240.212
port: 8080


2 changes: 2 additions & 0 deletions container/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -849,6 +849,7 @@ def cloud(self):
---
type: str
read_only: true
example: aws
"""
return self.__cloud
Expand All @@ -861,6 +862,7 @@ def host(self):
---
type: str
read_only: true
example: 10.52.0.32
"""
return self.__host
Expand Down
14 changes: 4 additions & 10 deletions docker/pivot/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

def parse_args():
parser = ArgumentParser(description="Launch PIVOT")
parser.add_argument('--master', dest='master', type=str, default='zk-1.zk',
parser.add_argument('--master', dest='master', type=str, required=True,
help='Mesos/Marathon master host')
parser.add_argument('--port', dest='port', type=int, default=9090,
help='PIVOT listen port')
Expand All @@ -26,23 +26,17 @@ def parse_args():
help='MongoDB listen port')
parser.add_argument('--db_name', dest='db_name', type=str, default='pivot',
help='Database name')
parser.add_argument('--ha', action='store_true', help='Turn on HA mode')
parser.add_argument('--irods_api_host', dest='irods_api_host',
type=str, help='iRODS API host')
parser.add_argument('--irods_api_port', dest='irods_api_port',
type=int, help='iRODS API port')

return parser.parse_args()


def create_pivot_config(args):
pivot_cfg_f = '/opt/pivot/config.yml'
pivot_cfg = yaml.load(open(pivot_cfg_f))
pivot_cfg['pivot'].update(master=args.master, port=args.port,
n_parallel=args.n_parallel, ha=args.ha)
pivot_cfg['pivot'].update(master=args.master,
port=args.port,
n_parallel=args.n_parallel)
pivot_cfg['db'] = dict(host=args.db_host, port=args.db_port, name=args.db_name)
if args.irods_api_host and args.irods_api_port:
pivot_cfg['irods'] = dict(host=args.irods_api_host, port=args.irods_api_port)
yaml.dump(pivot_cfg, open(pivot_cfg_f, 'w'), default_flow_style=False)


Expand Down
40 changes: 40 additions & 0 deletions schedule/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import swagger


@swagger.model
class Scheduler:

def __init__(self, name, config={}):
self.__name = name
self.__config = dict(config)

@property
@swagger.property
def name(self):
"""
Scheduler module name
---
type: str
example: schedule.local.DefaultApplianceScheduler
"""
return self.__name

@property
@swagger.property
def config(self):
"""
Scheduler configurations
---
type: dict
example:
cfg_key: cfg_val
"""
return dict(self.__config)

def to_render(self):
return dict(name=self.name, config=self.config)

def to_save(self):
return dict(name=self.name, config=self.config)
7 changes: 7 additions & 0 deletions schedule/local/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ async def _execute(self, sched):

class ApplianceScheduler(Loggable, metaclass=ABCMeta):

def __init__(self, config={}):
self.__config = dict(config)

@property
def config(self):
return dict(self.__config)

async def schedule(self, app, agents):
"""
Caution: the parameters should not be overridden by schedulers that extend
Expand Down
81 changes: 60 additions & 21 deletions schedule/plugin/local/location_aware.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,33 +7,41 @@

class LocationAwareApplianceScheduler(DefaultApplianceScheduler):

def __init__(self):
super(LocationAwareApplianceScheduler, self).__init__()
self.__api = iRODSAPIManager()
def __init__(self, config):
super(LocationAwareApplianceScheduler, self).__init__(config)
self._validate_scheduler_config()
self.__api = iRODSAPIManager(config['irods']['host'],
config['irods']['port'],
config['irods']['endpoint'])

async def schedule(self, app, agents):
sched = await super(LocationAwareApplianceScheduler, self).schedule(app, list(agents))
if sched.done:
return sched
if not config.irods.host or not config.irods.port:
self.logger.info('iRODS API is not properly set. Fallback to default scheduler')
return sched
files, data_objs, resources = set(), {}, {}
for c in sched.containers:
if not c.data or not c.data.input: continue
files.update([lfn for lfn in c.data.input])
if files:
data_objs = {d['path']: d for d in await self.__api.get_data_objects(files)}
resource_names = set([repl['resource_name'] for d in data_objs.values()
for repl in d['replicas']])
resources = {r['name']: r for r in await self.__api.get_resources(resource_names)}
for c in sched.containers:
if not c.data or not c.data.input: continue
regions = {}
for lfn in c.data.input:
data_obj = await self._get_data_object(lfn)
data_obj = data_objs.get(lfn)
if not data_obj: continue
for r in await self.__api.get_replica_regions(data_obj.get('replicas', [])):
regions[r] = regions.setdefault(r, 0) + data_obj.get('size', 0)
for repl in data_obj['replicas']:
r = resources.get(repl['resource_name'])
if r:
regions[r['region']] = regions.setdefault(r['region'], 0) + data_obj['size']
matched = [a for a in agents
if a.attributes.get('region') in regions.keys()
and a.resources.cpus >= c.resources.cpus
and a.resources.mem >= c.resources.mem
and a.resources.disk >= c.resources.disk]
if a.attributes.get('region') in regions]
if not matched:
self.logger.info("No matched agents found for '%s'"%c)
return sched
continue
self.logger.info('Candidate regions:')
for a in matched:
region, cloud = a.attributes.get('region'), a.attributes.get('cloud')
Expand All @@ -51,27 +59,58 @@ async def _get_data_object(self, lfn):
self.logger.error(err)
return data_obj

def _validate_scheduler_config(self):
irods = self.config.get('irods')
if not isinstance(irods, dict) \
or not isinstance(irods.get('host'), str) \
or not isinstance(irods.get('port'), int) \
or not isinstance(irods.get('endpoint'), str):
raise Exception('iRODS is not properly configured, fall back to default scheduler')
scale = self.config.get('scale')
if not isinstance(scale, bool):
self.config['scale'] = False


class iRODSAPIManager(APIManager):

def __init__(self):
def __init__(self, host, port, endpoint):
super(iRODSAPIManager, self).__init__()
self.__host = host
self.__port = port
self.__endpoint = endpoint

async def get_data_object(self, lfn):
api = config.irods
endpoint = '%s/getDataObject?filename=%s'%(api.endpoint, url_escape(lfn))
status, data_obj, err = await self.http_cli.get(api.host, api.port, endpoint)
endpoint = '%s/getDataObject?filename=%s'%(self.__endpoint, url_escape(lfn))
status, data_obj, err = await self.http_cli.get(self.__host, self.__port, endpoint)
if status != 200:
return status, None, err
return status, data_obj, None

async def get_data_objects(self, filenames):
endpoint = '%s/getDataObjects?filenames=%s'%(self.__endpoint,
url_escape(','.join(filenames)))
status, data_objs, err = await self.http_cli.get(self.__host, self.__port, endpoint)
if err:
self.logger.error(err)
return []
return data_objs

async def get_resources(self, resource_names):
resource_names = ','.join(resource_names)
endpoint = '%s/getResourcesMetadata?resource_names=%s'%(self.__endpoint,
url_escape(resource_names))
status, resources, err = await self.http_cli.get(self.__host, self.__port, endpoint)
if err:
self.logger.error(err)
return []
return resources

async def get_replica_regions(self, replicas):
api = config.irods
locations = []
for r in replicas:
endpoint = '%s/getResourceMetadata?resource_name=%s'%(api.endpoint,
endpoint = '%s/getResourceMetadata?resource_name=%s'%(self.__endpoint,
url_escape(r['resource_name']))
status, resc, err = await self.http_cli.get(api.host, api.port, endpoint)
status, resc, err = await self.http_cli.get(self.__host, self.__port, endpoint)
if status != 200:
self.logger.error(err)
continue
Expand Down

0 comments on commit 451e749

Please sign in to comment.