diff --git a/scripts/beanstalk_queue.py b/scripts/beanstalk_queue.py new file mode 100644 index 0000000000..a8a0661ecf --- /dev/null +++ b/scripts/beanstalk_queue.py @@ -0,0 +1,35 @@ +import docopt + +import teuthology.config +import teuthology.queue.beanstalk + +doc = """ +usage: teuthology-beanstalk-queue -h + teuthology-beanstalk-queue [-s|-d|-f] -m MACHINE_TYPE + teuthology-beanstalk-queue [-r] -m MACHINE_TYPE + teuthology-beanstalk-queue -m MACHINE_TYPE -D PATTERN + teuthology-beanstalk-queue -p SECONDS [-m MACHINE_TYPE] +List Jobs in queue. +If -D is passed, then jobs with PATTERN in the job name are deleted from the +queue. +Arguments: + -m, --machine_type MACHINE_TYPE [default: multi] + Which machine type queue to work on. +optional arguments: + -h, --help Show this help message and exit + -D, --delete PATTERN Delete Jobs with PATTERN in their name + -d, --description Show job descriptions + -r, --runs Only show run names + -f, --full Print the entire job config. Use with caution. + -s, --status Prints the status of the queue + -p, --pause SECONDS Pause queues for a number of seconds. A value of 0 + will unpause. If -m is passed, pause that queue, + otherwise pause all queues. +""" + + +def main(): + + args = docopt.docopt(doc) + print(args) + teuthology.beanstalk.main(args) diff --git a/scripts/dispatcher.py b/scripts/dispatcher.py index 45dd61b264..63779f1246 100644 --- a/scripts/dispatcher.py +++ b/scripts/dispatcher.py @@ -1,4 +1,31 @@ import argparse +""" +usage: teuthology-dispatcher --help + teuthology-dispatcher --supervisor [-v] --bin-path BIN_PATH --job-config CONFIG --archive-dir DIR + teuthology-dispatcher [-v] [--archive-dir DIR] --log-dir LOG_DIR --machine-type MACHINE_TYPE --queue-backend BACKEND + +Start a dispatcher for the specified machine type. Grab jobs from a paddles/beanstalk +queue and run the teuthology tests they describe as subprocesses. The +subprocess invoked is a teuthology-dispatcher command run in supervisor +mode. + +Supervisor mode: Supervise the job run described by its config. Reimage +target machines and invoke teuthology command. Unlock the target machines +at the end of the run. + +standard arguments: + -h, --help show this help message and exit + -v, --verbose be more verbose + -l, --log-dir LOG_DIR path in which to store logs + -a DIR, --archive-dir DIR path to archive results in + --machine-type MACHINE_TYPE the machine type for the job + --supervisor run dispatcher in job supervisor mode + --bin-path BIN_PATH teuthology bin path + --job-config CONFIG file descriptor of job's config file + --exit-on-empty-queue if the queue is empty, exit + --queue-backend BACKEND choose between paddles and beanstalk +""" + import sys import teuthology.dispatcher.supervisor diff --git a/scripts/kill.py b/scripts/kill.py index 31acc8b1a4..e2a1a4ef09 100644 --- a/scripts/kill.py +++ b/scripts/kill.py @@ -12,7 +12,7 @@ teuthology-kill [-p] -o OWNER -m MACHINE_TYPE -r RUN Kill running teuthology jobs: -1. Removes any queued jobs from the beanstalk queue +1. Removes any queued jobs from the paddles queue 2. Kills any running jobs 3. Nukes any machines involved diff --git a/scripts/queue.py b/scripts/queue.py index 8ea5ca5c2c..1d9112c22e 100644 --- a/scripts/queue.py +++ b/scripts/queue.py @@ -1,14 +1,16 @@ import docopt -import teuthology.config -import teuthology.beanstalk +import teuthology.queue.beanstalk +import teuthology.queue.paddles +from teuthology.config import config doc = """ usage: teuthology-queue -h - teuthology-queue [-s|-d|-f] -m MACHINE_TYPE + teuthology-queue [-s|-d|-f] -m MACHINE_TYPE teuthology-queue [-r] -m MACHINE_TYPE teuthology-queue -m MACHINE_TYPE -D PATTERN - teuthology-queue -p SECONDS [-m MACHINE_TYPE] + teuthology-queue -p SECONDS [-m MACHINE_TYPE] [-U USER] + teuthology-queue -m MACHINE_TYPE -P PRIORITY [-U USER|-R RUN_NAME] List Jobs in queue. If -D is passed, then jobs with PATTERN in the job name are deleted from the @@ -28,9 +30,17 @@ -p, --pause SECONDS Pause queues for a number of seconds. A value of 0 will unpause. If -m is passed, pause that queue, otherwise pause all queues. + -P, --priority PRIORITY + Change priority of queued jobs (only in Paddles queues) + -U, --user USER User who owns the jobs + -R, --run-name RUN_NAME + Used to change priority of all jobs in the run. """ def main(): args = docopt.docopt(doc) - teuthology.beanstalk.main(args) + if config.backend == 'beanstalk': + teuthology.queue.beanstalk.main(args) + else: + teuthology.queue.paddles.main(args) diff --git a/scripts/schedule.py b/scripts/schedule.py index 58f7a46249..e9f0c1f5ff 100644 --- a/scripts/schedule.py +++ b/scripts/schedule.py @@ -21,7 +21,7 @@ Queue backend name, use prefix '@' to append job config to the given file path as yaml. - [default: beanstalk] + [default: paddles] -n , --name Name of suite run the job is part of -d , --description Job description -o , --owner Job owner diff --git a/teuthology/beanstalk.py b/teuthology/beanstalk.py deleted file mode 100644 index 76bc2c97ae..0000000000 --- a/teuthology/beanstalk.py +++ /dev/null @@ -1,215 +0,0 @@ -import beanstalkc -import json -import yaml -import logging -import pprint -import sys -from collections import OrderedDict - -from teuthology import report -from teuthology.config import config - -log = logging.getLogger(__name__) - - -def connect(): - host = config.queue_host - port = config.queue_port - if host is None or port is None: - raise RuntimeError( - 'Beanstalk queue information not found in {conf_path}'.format( - conf_path=config.teuthology_yaml)) - return beanstalkc.Connection(host=host, port=port, parse_yaml=yaml.safe_load) - - -def watch_tube(connection, tube_name): - """ - Watch a given tube, potentially correcting to 'multi' if necessary. Returns - the tube_name that was actually used. - """ - if ',' in tube_name: - log.debug("Correcting tube name to 'multi'") - tube_name = 'multi' - connection.watch(tube_name) - connection.ignore('default') - return tube_name - - -def walk_jobs(connection, tube_name, processor, pattern=None): - """ - def callback(jobs_dict) - """ - log.info("Checking Beanstalk Queue...") - job_count = connection.stats_tube(tube_name)['current-jobs-ready'] - if job_count == 0: - log.info('No jobs in Beanstalk Queue') - return - - # Try to figure out a sane timeout based on how many jobs are in the queue - timeout = job_count / 2000.0 * 60 - for i in range(1, job_count + 1): - print_progress(i, job_count, "Loading") - job = connection.reserve(timeout=timeout) - if job is None or job.body is None: - continue - job_config = yaml.safe_load(job.body) - job_name = job_config['name'] - job_id = job.stats()['id'] - if pattern is not None and pattern not in job_name: - continue - processor.add_job(job_id, job_config, job) - end_progress() - processor.complete() - - -def print_progress(index, total, message=None): - msg = "{m} ".format(m=message) if message else '' - sys.stderr.write("{msg}{i}/{total}\r".format( - msg=msg, i=index, total=total)) - sys.stderr.flush() - - -def end_progress(): - sys.stderr.write('\n') - sys.stderr.flush() - - -class JobProcessor(object): - def __init__(self): - self.jobs = OrderedDict() - - def add_job(self, job_id, job_config, job_obj=None): - job_id = str(job_id) - - job_dict = dict( - index=(len(self.jobs) + 1), - job_config=job_config, - ) - if job_obj: - job_dict['job_obj'] = job_obj - self.jobs[job_id] = job_dict - - self.process_job(job_id) - - def process_job(self, job_id): - pass - - def complete(self): - pass - - -class JobPrinter(JobProcessor): - def __init__(self, show_desc=False, full=False): - super(JobPrinter, self).__init__() - self.show_desc = show_desc - self.full = full - - def process_job(self, job_id): - job_config = self.jobs[job_id]['job_config'] - job_index = self.jobs[job_id]['index'] - job_priority = job_config['priority'] - job_name = job_config['name'] - job_desc = job_config['description'] - print('Job: {i:>4} priority: {pri:>4} {job_name}/{job_id}'.format( - i=job_index, - pri=job_priority, - job_id=job_id, - job_name=job_name, - )) - if self.full: - pprint.pprint(job_config) - elif job_desc and self.show_desc: - for desc in job_desc.split(): - print('\t {}'.format(desc)) - - -class RunPrinter(JobProcessor): - def __init__(self): - super(RunPrinter, self).__init__() - self.runs = list() - - def process_job(self, job_id): - run = self.jobs[job_id]['job_config']['name'] - if run not in self.runs: - self.runs.append(run) - print(run) - - -class JobDeleter(JobProcessor): - def __init__(self, pattern): - self.pattern = pattern - super(JobDeleter, self).__init__() - - def add_job(self, job_id, job_config, job_obj=None): - job_name = job_config['name'] - if self.pattern in job_name: - super(JobDeleter, self).add_job(job_id, job_config, job_obj) - - def process_job(self, job_id): - job_config = self.jobs[job_id]['job_config'] - job_name = job_config['name'] - print('Deleting {job_name}/{job_id}'.format( - job_id=job_id, - job_name=job_name, - )) - job_obj = self.jobs[job_id].get('job_obj') - if job_obj: - job_obj.delete() - report.try_delete_jobs(job_name, job_id) - - -def pause_tube(connection, tube, duration): - duration = int(duration) - if not tube: - tubes = sorted(connection.tubes()) - else: - tubes = [tube] - - prefix = 'Unpausing' if duration == 0 else "Pausing for {dur}s" - templ = prefix + ": {tubes}" - log.info(templ.format(dur=duration, tubes=tubes)) - for tube in tubes: - connection.pause_tube(tube, duration) - - -def stats_tube(connection, tube): - stats = connection.stats_tube(tube) - result = dict( - name=tube, - count=stats['current-jobs-ready'], - paused=(stats['pause'] != 0), - ) - return result - - -def main(args): - machine_type = args['--machine_type'] - status = args['--status'] - delete = args['--delete'] - runs = args['--runs'] - show_desc = args['--description'] - full = args['--full'] - pause_duration = args['--pause'] - try: - connection = connect() - if machine_type and not pause_duration: - # watch_tube needs to be run before we inspect individual jobs; - # it is not needed for pausing tubes - watch_tube(connection, machine_type) - if status: - print(json.dumps(stats_tube(connection, machine_type))) - elif pause_duration: - pause_tube(connection, machine_type, pause_duration) - elif delete: - walk_jobs(connection, machine_type, - JobDeleter(delete)) - elif runs: - walk_jobs(connection, machine_type, - RunPrinter()) - else: - walk_jobs(connection, machine_type, - JobPrinter(show_desc=show_desc, full=full)) - except KeyboardInterrupt: - log.info("Interrupted.") - finally: - connection.close() diff --git a/teuthology/config.py b/teuthology/config.py index 30204aa466..d9216b24ab 100644 --- a/teuthology/config.py +++ b/teuthology/config.py @@ -144,6 +144,7 @@ class TeuthologyConfig(YamlConfig): 'archive_upload_key': None, 'archive_upload_url': None, 'automated_scheduling': False, + 'backend': 'beanstalk', 'reserve_machines': 5, 'ceph_git_base_url': 'https://github.com/ceph/', 'ceph_git_url': None, diff --git a/teuthology/dispatcher/__init__.py b/teuthology/dispatcher/__init__.py index 59f8ae3279..3ec1153fdf 100644 --- a/teuthology/dispatcher/__init__.py +++ b/teuthology/dispatcher/__init__.py @@ -7,6 +7,7 @@ import yaml from typing import Dict, List +from time import sleep from teuthology import ( # non-modules @@ -64,6 +65,14 @@ def load_config(archive_dir=None): else: teuth_config.archive_base = archive_dir +def clean_config(config): + result = {} + for key in config: + if key == 'status': + continue + if config[key] is not None: + result[key] = config[key] + return result def main(args): archive_dir = args.archive_dir or teuth_config.archive_base @@ -75,7 +84,16 @@ def main(args): "There is already a teuthology-dispatcher process running:" f" {procs}" ) + machine_type = args["--machine-type"] + archive_dir = args["--archive-dir"] + exit_on_empty_queue = args["--exit-on-empty-queue"] + backend = args['--queue-backend'] + + if archive_dir is None: + archive_dir = teuth_config.archive_base + if machine_type is None and teuth_config.machine_type is None: + return # setup logging for disoatcher in {log_dir} loglevel = logging.INFO if args.verbose: @@ -88,8 +106,12 @@ def main(args): load_config(archive_dir=archive_dir) - connection = beanstalk.connect() - beanstalk.watch_tube(connection, args.tube) + if backend == 'beanstalk': + connection = beanstalk.connect() + beanstalk.watch_tube(connection, machine_type) + elif backend == 'paddles': + report.create_machine_type_queue(machine_type) + result_proc = None if teuth_config.teuthology_path is None: @@ -118,20 +140,28 @@ def main(args): if rc is not None: worst_returncode = max([worst_returncode, rc]) job_procs.remove(proc) - job = connection.reserve(timeout=60) - if job is None: - if args.exit_on_empty_queue and not job_procs: - log.info("Queue is empty and no supervisor processes running; exiting!") - break - continue - - # bury the job so it won't be re-run if it fails - job.bury() - job_id = job.jid - log.info('Reserved job %d', job_id) - log.info('Config is: %s', job.body) - job_config = yaml.safe_load(job.body) - job_config['job_id'] = str(job_id) + if backend == 'beanstalk': + job = connection.reserve(timeout=60) + if job is None: + continue + job.bury() + job_config = yaml.safe_load(job.body) + job_id = job_config.get('job_id') + log.info('Reserved job %s', job_id) + log.info('Config is: %s', job.body) + else: + job = report.get_queued_job(machine_type) + if job is None: + if exit_on_empty_queue and not job_procs: + log.info("Queue is empty and no supervisor processes running; exiting!") + break + continue + job = clean_config(job) + report.try_push_job_info(job, dict(status='running')) + job_id = job.get('job_id') + log.info('Reserved job %s', job_id) + log.info('Config is: %s', job) + job_config = job if job_config.get('stop_worker'): keep_running = False @@ -194,10 +224,11 @@ def main(args): # This try/except block is to keep the worker from dying when # beanstalkc throws a SocketError - try: - job.delete() - except Exception: - log.exception("Saw exception while trying to delete job") + if backend == 'beanstalk': + try: + job.delete() + except Exception: + log.exception("Saw exception while trying to delete job") return worst_returncode @@ -363,3 +394,18 @@ def create_job_archive(job_name, job_archive_path, archive_dir): if not os.path.exists(run_archive): safepath.makedirs('/', run_archive) safepath.makedirs('/', job_archive_path) + + +def pause_queue(machine_type, paused, paused_by, pause_duration=None): + if paused: + report.pause_queue(machine_type, paused, paused_by, pause_duration) + ''' + If there is a pause duration specified + un-pause the queue after the time elapses + ''' + if pause_duration is not None: + sleep(int(pause_duration)) + paused = False + report.pause_queue(machine_type, paused, paused_by) + elif not paused: + report.pause_queue(machine_type, paused, paused_by) diff --git a/teuthology/dispatcher/supervisor.py b/teuthology/dispatcher/supervisor.py index 83e6d997c5..5dd0e4e4ca 100644 --- a/teuthology/dispatcher/supervisor.py +++ b/teuthology/dispatcher/supervisor.py @@ -79,6 +79,11 @@ def main(args): def run_job(job_config, teuth_bin_path, archive_dir, verbose): safe_archive = safepath.munge(job_config['name']) if job_config.get('first_in_suite') or job_config.get('last_in_suite'): + if teuth_config.results_server: + try: + report.try_delete_jobs(job_config['name'], job_config['job_id']) + except Exception: + log.exception("Unable to delete job %s", job_config['job_id']) job_archive = os.path.join(archive_dir, safe_archive) args = [ os.path.join(teuth_bin_path, 'teuthology-results'), @@ -136,7 +141,7 @@ def run_job(job_config, teuth_bin_path, archive_dir, verbose): '--archive', job_config['archive_path'], '--name', job_config['name'], ]) - if job_config['description'] is not None: + if job_config.get('description') is not None: arg.extend(['--description', job_config['description']]) job_archive = os.path.join(job_config['archive_path'], 'orig.config.yaml') arg.extend(['--', job_archive]) diff --git a/teuthology/kill.py b/teuthology/kill.py index 137e49080e..2e464d6713 100755 --- a/teuthology/kill.py +++ b/teuthology/kill.py @@ -11,10 +11,10 @@ import teuthology.exporter -from teuthology import beanstalk +from teuthology.queue import beanstalk from teuthology import report -from teuthology.config import config from teuthology.lock import ops as lock_ops +from teuthology.config import config log = logging.getLogger(__name__) @@ -123,7 +123,8 @@ def find_run_info(serializer, run_name): if not os.path.isdir(job_dir): continue job_num += 1 - beanstalk.print_progress(job_num, job_total, 'Reading Job: ') + if config.backend == 'beanstalk': + beanstalk.print_progress(job_num, job_total, 'Reading Job: ') job_info = serializer.job_info(run_name, job_id, simple=True) for key in job_info.keys(): if key in run_info_fields and key not in run_info: @@ -163,7 +164,7 @@ def remove_beanstalk_jobs(run_name, tube_name): continue job_config = yaml.safe_load(job.body) if run_name == job_config['name']: - job_id = job.stats()['id'] + job_id = job_config['job_id'] msg = "Deleting job from queue. ID: " + \ "{id} Name: {name} Desc: {desc}".format( id=str(job_id), diff --git a/teuthology/orchestra/run.py b/teuthology/orchestra/run.py index bf6a069533..23bed6d170 100644 --- a/teuthology/orchestra/run.py +++ b/teuthology/orchestra/run.py @@ -182,7 +182,6 @@ def _raise_for_status(self): command=self.command, exitstatus=self.returncode, node=self.hostname, label=self.label ) - def _get_exitstatus(self): """ :returns: the remote command's exit status (return code). Note that diff --git a/teuthology/queue/__init__.py b/teuthology/queue/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/teuthology/queue/beanstalk.py b/teuthology/queue/beanstalk.py new file mode 100644 index 0000000000..c668e4f6bc --- /dev/null +++ b/teuthology/queue/beanstalk.py @@ -0,0 +1,116 @@ +import beanstalkc +import yaml +import logging + +from teuthology.config import config +from teuthology.queue import util + + +log = logging.getLogger(__name__) + + +def connect(): + host = config.queue_host + port = config.queue_port + if host is None or port is None: + raise RuntimeError( + 'Beanstalk queue information not found in {conf_path}'.format( + conf_path=config.teuthology_yaml)) + return beanstalkc.Connection(host=host, port=port) + + +def watch_tube(connection, tube_name): + """ + Watch a given tube, potentially correcting to 'multi' if necessary. Returns + the tube_name that was actually used. + """ + if ',' in tube_name: + log.debug("Correcting tube name to 'multi'") + tube_name = 'multi' + connection.watch(tube_name) + connection.ignore('default') + return tube_name + + +def walk_jobs(connection, tube_name, processor, pattern=None): + """ + def callback(jobs_dict) + """ + log.info("Checking Beanstalk Queue...") + job_count = connection.stats_tube(tube_name)['current-jobs-ready'] + if job_count == 0: + log.info('No jobs in Beanstalk Queue') + return + + # Try to figure out a sane timeout based on how many jobs are in the queue + timeout = job_count / 2000.0 * 60 + for i in range(1, job_count + 1): + util.print_progress(i, job_count, "Loading") + job = connection.reserve(timeout=timeout) + if job is None or job.body is None: + continue + job_config = yaml.safe_load(job.body) + job_name = job_config['name'] + job_id = job.stats()['id'] + if pattern is not None and pattern not in job_name: + continue + processor.add_job(job_id, job_config, job) + util.end_progress() + processor.complete() + + +def pause_tube(connection, tube, duration): + duration = int(duration) + if not tube: + tubes = sorted(connection.tubes()) + else: + tubes = [tube] + + prefix = 'Unpausing' if duration == 0 else "Pausing for {dur}s" + templ = prefix + ": {tubes}" + log.info(templ.format(dur=duration, tubes=tubes)) + for tube in tubes: + connection.pause_tube(tube, duration) + + +def stats_tube(connection, tube): + stats = connection.stats_tube(tube) + result = dict( + name=tube, + count=stats['current-jobs-ready'], + paused=(stats['pause'] != 0), + ) + return result + + +def main(args): + machine_type = args['--machine_type'] + status = args['--status'] + delete = args['--delete'] + runs = args['--runs'] + show_desc = args['--description'] + full = args['--full'] + pause_duration = args['--pause'] + try: + connection = connect() + if machine_type and not pause_duration: + # watch_tube needs to be run before we inspect individual jobs; + # it is not needed for pausing tubes + watch_tube(connection, machine_type) + if status: + print(stats_tube(connection, machine_type)) + elif pause_duration: + pause_tube(connection, machine_type, pause_duration) + elif delete: + walk_jobs(connection, machine_type, + util.JobDeleter(delete)) + elif runs: + walk_jobs(connection, machine_type, + util.RunPrinter()) + else: + walk_jobs(connection, machine_type, + util.JobPrinter(show_desc=show_desc, full=full)) + except KeyboardInterrupt: + log.info("Interrupted.") + finally: + connection.close() diff --git a/teuthology/queue/paddles.py b/teuthology/queue/paddles.py new file mode 100644 index 0000000000..489d638e2f --- /dev/null +++ b/teuthology/queue/paddles.py @@ -0,0 +1,81 @@ +import logging + +from teuthology import report +from teuthology.queue import util + +log = logging.getLogger(__name__) + + +def stats_queue(machine_type): + stats = report.get_queue_stats(machine_type) + if stats['paused'] is None: + log.info("%s queue is currently running with %s jobs queued", + stats['queue'], + stats['queued_jobs']) + else: + log.info("%s queue is paused with %s jobs queued", + stats['queue'], + stats['queued_jobs']) + + +def update_priority(machine_type, priority, run_name=None): + if run_name is not None: + jobs = report.get_jobs_by_run(machine_type, run_name) + for job in jobs: + job['priority'] = priority + report.try_push_job_info(job) + + +def walk_jobs(machine_type, processor, user): + log.info("Checking paddles queue...") + job_count = report.get_queue_stats(machine_type)['queued_jobs'] + + jobs = report.get_user_jobs_queue(machine_type, user) + if job_count == 0: + log.info('No jobs in Paddles queue') + return + + for i in range(1, job_count + 1): + util.print_progress(i, job_count, "Loading") + job = jobs[i-1] + if job is None: + continue + job_id = job['job_id'] + processor.add_job(job_id, job) + util.end_progress() + processor.complete() + + +def main(args): + machine_type = args['--machine_type'] + user = args['--user'] + run_name = args['--run-name'] + status = args['--status'] + delete = args['--delete'] + runs = args['--runs'] + show_desc = args['--description'] + full = args['--full'] + pause_duration = args['--pause'] + priority = args['--priority'] + try: + if status: + stats_queue(machine_type) + if pause_duration: + if not user: + log.info('Please enter user to pause Paddles queue') + return + report.pause_queue(machine_type, user, pause_duration) + elif priority: + update_priority(machine_type, priority, run_name) + elif delete: + walk_jobs(machine_type, + util.JobDeleter(delete), user) + elif runs: + walk_jobs(machine_type, + util.RunPrinter(), user) + else: + walk_jobs(machine_type, + util.JobPrinter(show_desc=show_desc, full=full), + user) + except KeyboardInterrupt: + log.info("Interrupted.") diff --git a/teuthology/queue/util.py b/teuthology/queue/util.py new file mode 100644 index 0000000000..2a7642e726 --- /dev/null +++ b/teuthology/queue/util.py @@ -0,0 +1,101 @@ +import logging +import pprint +import sys +from collections import OrderedDict + +from teuthology import report + +log = logging.getLogger(__name__) + + +def print_progress(index, total, message=None): + msg = "{m} ".format(m=message) if message else '' + sys.stderr.write("{msg}{i}/{total}\r".format( + msg=msg, i=index, total=total)) + sys.stderr.flush() + + +def end_progress(): + sys.stderr.write('\n') + sys.stderr.flush() + + +class JobProcessor(object): + def __init__(self): + self.jobs = OrderedDict() + + def add_job(self, job_id, job_config, job_obj=None): + job_id = str(job_id) + + job_dict = dict( + index=(len(self.jobs) + 1), + job_config=job_config, + ) + if job_obj: + job_dict['job_obj'] = job_obj + self.jobs[job_id] = job_dict + + self.process_job(job_id) + + def process_job(self, job_id): + pass + + def complete(self): + pass + + +class JobPrinter(JobProcessor): + def __init__(self, show_desc=False, full=False): + super(JobPrinter, self).__init__() + self.show_desc = show_desc + self.full = full + + def process_job(self, job_id): + job_config = self.jobs[job_id]['job_config'] + job_index = self.jobs[job_id]['index'] + job_priority = job_config['priority'] + job_name = job_config['name'] + job_desc = job_config['description'] + print('Job: {i:>4} priority: {pri:>4} {job_name}/{job_id}'.format( + i=job_index, + pri=job_priority, + job_id=job_id, + job_name=job_name, + )) + if self.full: + pprint.pprint(job_config) + elif job_desc and self.show_desc: + for desc in job_desc.split(): + print('\t {}'.format(desc)) + + +class RunPrinter(JobProcessor): + def __init__(self): + super(RunPrinter, self).__init__() + self.runs = list() + + def process_job(self, job_id): + run = self.jobs[job_id]['job_config']['name'] + if run not in self.runs: + self.runs.append(run) + print(run) + + +class JobDeleter(JobProcessor): + def __init__(self, pattern): + self.pattern = pattern + super(JobDeleter, self).__init__() + + def add_job(self, job_id, job_config, job_obj=None): + job_name = job_config['name'] + if self.pattern in job_name: + super(JobDeleter, self).add_job(job_id, job_config, job_obj) + + def process_job(self, job_id): + job_config = self.jobs[job_id]['job_config'] + job_name = job_config['name'] + print('Deleting {job_name}/{job_id}'.format( + job_id=job_id, + job_name=job_name, + )) + report.try_delete_jobs(job_name, job_id) diff --git a/teuthology/report.py b/teuthology/report.py index f0a4472017..079b561f44 100644 --- a/teuthology/report.py +++ b/teuthology/report.py @@ -263,6 +263,44 @@ def report_run(self, run_name, dead=False): self.log.debug(" no jobs; skipped") return len(jobs) + def write_new_job(self, run_name, job_info): + """ + Report a new job to the results server. + + :param run_name: The name of the run. The run must already exist. + :param job_info: The job's info dict. Must be present since this is a new job + """ + if job_info is None or not isinstance(job_info, dict): + raise TypeError("Job info must be a dict") + run_uri = "{base}/runs/{name}/jobs/".format( + base=self.base_uri, name=run_name, + ) + job_json = json.dumps(job_info) + headers = {'content-type': 'application/json'} + + inc = random.uniform(0, 1) + with safe_while( + sleep=1, increment=inc, action=f'write job for {run_name}') as proceed: + while proceed(): + response = self.session.post(run_uri, data=job_json, headers=headers) + + if response.status_code == 200: + resp_json = response.json() + job_id = resp_json['job_id'] + return job_id + else: + msg = response.text + self.log.error( + "POST to {uri} failed with status {status}: {msg}".format( + uri=run_uri, + status=response.status_code, + msg=msg, + )) + + response.raise_for_status() + return None + + def report_jobs(self, run_name, job_ids, dead=False): """ Report several jobs to the results server. @@ -292,12 +330,13 @@ def report_job(self, run_name, job_id, job_info=None, dead=False): set_status(job_info, 'dead') job_json = json.dumps(job_info) headers = {'content-type': 'application/json'} + job_uri = os.path.join(run_uri, job_id, '') inc = random.uniform(0, 1) with safe_while( sleep=1, increment=inc, action=f'report job {job_id}') as proceed: while proceed(): - response = self.session.post(run_uri, data=job_json, headers=headers) + response = self.session.put(job_uri, data=job_json, headers=headers) if response.status_code == 200: return @@ -314,15 +353,9 @@ def report_job(self, run_name, job_id, job_info=None, dead=False): else: msg = response.text - if msg and msg.endswith('already exists'): - job_uri = os.path.join(run_uri, job_id, '') - response = self.session.put(job_uri, data=job_json, - headers=headers) - if response.status_code == 200: - return - elif msg: + if msg: self.log.error( - "POST to {uri} failed with status {status}: {msg}".format( + "PUT to {uri} failed with status {status}: {msg}".format( uri=run_uri, status=response.status_code, msg=msg, @@ -352,6 +385,20 @@ def last_run(self): self.__last_run = None if os.path.exists(self.last_run_file): os.remove(self.last_run_file) + + def get_top_job(self, queue): + + uri = "{base}/queue/pop_queue?queue={queue}".format(base=self.base_uri, + queue=queue) + inc = random.uniform(0, 1) + with safe_while( + sleep=1, increment=inc, action=f'get job from {queue}') as proceed: + while proceed(): + response = self.session.get(uri) + if response.status_code == 200: + return response.json() + response.raise_for_status() + def get_jobs(self, run_name, job_id=None, fields=None): """ @@ -457,6 +504,199 @@ def delete_run(self, run_name): response = self.session.delete(uri) response.raise_for_status() + def create_queue(self, queue): + """ + Create a queue on the results server + + :param queue: The queue specified for the job + """ + uri = "{base}/queue/".format( + base=self.base_uri + ) + queue_info = {'queue': queue} + queue_json = json.dumps(queue_info) + headers = {'content-type': 'application/json'} + + inc = random.uniform(0, 1) + with safe_while( + sleep=1, increment=inc, action=f'creating queue {queue}') as proceed: + while proceed(): + response = self.session.post(uri, data=queue_json, headers=headers) + + if response.status_code == 200: + self.log.info("Successfully created queue {queue}".format( + queue=queue, + )) + return + else: + resp_json = response.json() + if resp_json: + msg = resp_json.get('message', '') + else: + msg = response.text + if msg and msg.endswith('already exists'): + return + self.log.error( + "POST to {uri} failed with status {status}: {msg}".format( + uri=uri, + status=response.status_code, + msg=msg, + )) + + response.raise_for_status() + + def update_queue(self, queue, paused_by, pause_duration=None): + uri = "{base}/queue/".format( + base=self.base_uri + ) + + if pause_duration is not None: + pause_duration = int(pause_duration) + queue_info = {'queue': queue, 'paused_by': paused_by, + 'pause_duration': pause_duration} + queue_json = json.dumps(queue_info) + headers = {'content-type': 'application/json'} + + inc = random.uniform(0, 1) + with safe_while( + sleep=1, increment=inc, action=f'updating queue {queue}') as proceed: + while proceed(): + response = self.session.put(uri, data=queue_json, headers=headers) + + if response.status_code == 200: + self.log.info("Successfully updated queue {queue}".format( + queue=queue, + )) + return + else: + msg = response.text + self.log.error( + "PUT to {uri} failed with status {status}: {msg}".format( + uri=uri, + status=response.status_code, + msg=msg, + )) + + response.raise_for_status() + + + def queue_stats(self, queue): + uri = "{base}/queue/stats/".format( + base=self.base_uri + ) + queue_info = {'queue': queue} + queue_json = json.dumps(queue_info) + + headers = {'content-type': 'application/json'} + inc = random.uniform(0, 1) + with safe_while( + sleep=1, increment=inc, action=f'stats for queue {queue}') as proceed: + while proceed(): + response = self.session.post(uri, data=queue_json, headers=headers) + + if response.status_code == 200: + return response.json() + else: + msg = response.text + self.log.error( + "POST to {uri} failed with status {status}: {msg}".format( + uri=uri, + status=response.status_code, + msg=msg, + )) + response.raise_for_status() + + def queued_jobs(self, queue, user, run_name): + uri = "{base}/queue/queued_jobs/".format( + base=self.base_uri + ) + request_info = {'queue': queue} + filter_field = queue + if run_name is not None: + filter_field = run_name + uri += "?run_name=" + str(run_name) + elif user is not None: + filter_field = user + uri += "?user=" + str(user) + + request_json = json.dumps(request_info) + headers = {'content-type': 'application/json'} + inc = random.uniform(0, 1) + with safe_while( + sleep=1, increment=inc, action=f'get queued jobs {filter_field}') as proceed: + while proceed(): + response = self.session.post(uri, data=request_json, headers=headers) + + if response.status_code == 200: + self.log.info("Successfully retrieved jobs for {filter_field}".format( + filter_field=filter_field, + )) + return response.json() + else: + msg = response.text + self.log.error( + "POST to {uri} failed with status {status}: {msg}".format( + uri=uri, + status=response.status_code, + msg=msg, + )) + response.raise_for_status() + + +def create_machine_type_queue(queue): + reporter = ResultsReporter() + if not reporter.base_uri: + return + if ',' in queue: + queue = 'multi' + reporter.create_queue(queue) + return queue + +def get_all_jobs_in_queue(queue, user=None, run_name=None): + reporter = ResultsReporter() + if not reporter.base_uri: + return + if ',' in queue: + queue = 'multi' + return reporter.queued_jobs(queue) + +def get_user_jobs_queue(queue, user, run_name=None): + reporter = ResultsReporter() + if not reporter.base_uri: + return + return reporter.queued_jobs(queue, user, run_name) + +def get_jobs_by_run(queue, run_name): + reporter = ResultsReporter() + if not reporter.base_uri: + return + return reporter.queued_jobs(queue, None, run_name) + + +def pause_queue(queue, paused_by, pause_duration=None): + reporter = ResultsReporter() + if not reporter.base_uri: + return + reporter.update_queue(queue, paused_by, pause_duration) + + +def is_queue_paused(queue): + reporter = ResultsReporter() + if not reporter.base_uri: + return + stats = reporter.queue_stats(queue) + if stats['paused'] != 0 and stats['paused'] is not None: + return True + return False + + +def get_queue_stats(queue): + reporter = ResultsReporter() + if not reporter.base_uri: + return + stats = reporter.queue_stats(queue) + return stats + def push_job_info(run_name, job_id, job_info, base_uri=None): """ @@ -480,6 +720,27 @@ def push_job_info(run_name, job_id, job_info, base_uri=None): ) +def get_queued_job(machine_type): + """ + Retrieve a job that is queued depending on priority + + """ + log = init_logging() + reporter = ResultsReporter() + if not reporter.base_uri: + return + if ',' in machine_type: + queue = 'multi' + else: + queue = machine_type + if is_queue_paused(queue) == True: + log.info("Teuthology queue %s is currently paused", + queue) + return None + else: + return reporter.get_top_job(queue) + + def try_push_job_info(job_config, extra_info=None): """ Wrap push_job_info, gracefully doing nothing if: @@ -519,6 +780,36 @@ def try_push_job_info(job_config, extra_info=None): config.results_server) +def try_create_job(job_config, extra_info=None): + log = init_logging() + + if not config.results_server: + log.warning('No results_server in config; not reporting results') + return + + reporter = ResultsReporter() + if not reporter.base_uri: + return + + run_name = job_config['name'] + + if extra_info is not None: + job_info = extra_info.copy() + job_info.update(job_config) + else: + job_info = job_config + + try: + log.debug("Writing job info to %s", config.results_server) + job_id = reporter.write_new_job(run_name, job_info) + log.info("Job ID: %s", job_id) + if job_id is not None: + return job_id + except report_exceptions: + log.exception("Could not report results to %s", + config.results_server) + + def try_delete_jobs(run_name, job_ids, delete_empty_run=True): """ Using the same error checking and retry mechanism as try_push_job_info(), diff --git a/teuthology/schedule.py b/teuthology/schedule.py index d9af64efc4..3a370e86db 100644 --- a/teuthology/schedule.py +++ b/teuthology/schedule.py @@ -1,7 +1,7 @@ import os import yaml -import teuthology.beanstalk +import teuthology.queue.beanstalk from teuthology.misc import get_user, merge_configs from teuthology import report @@ -23,11 +23,6 @@ def main(args): if args[opt]: raise ValueError(msg_fmt.format(opt=opt)) - if args['--first-in-suite'] or args['--last-in-suite']: - report_status = False - else: - report_status = True - name = args['--name'] if not name or name.isdigit(): raise ValueError("Please use a more descriptive value for --name") @@ -35,13 +30,15 @@ def main(args): backend = args['--queue-backend'] if args['--dry-run']: print('---\n' + yaml.safe_dump(job_config)) - elif backend == 'beanstalk': - schedule_job(job_config, args['--num'], report_status) elif backend.startswith('@'): dump_job_to_file(backend.lstrip('@'), job_config, args['--num']) + elif backend == 'paddles': + paddles_schedule_job(job_config, args['--num']) + elif backend == 'beanstalk': + beanstalk_schedule_job(job_config, args['--num']) else: raise ValueError("Provided schedule backend '%s' is not supported. " - "Try 'beanstalk' or '@path-to-a-file" % backend) + "Try 'paddles', 'beanstalk' or '@path-to-a-file" % backend) def build_config(args): @@ -87,29 +84,52 @@ def build_config(args): return job_config -def schedule_job(job_config, num=1, report_status=True): +def paddles_schedule_job(job_config, backend, num=1): """ - Schedule a job. + Schedule a job with Paddles as the backend. + + :param job_config: The complete job dict + :param num: The number of times to schedule the job + """ + num = int(num) + ''' + Add 'machine_type' queue to DB here. + ''' + queue = report.create_machine_type_queue(job_config['machine_type']) + job_config['queue'] = queue + while num > 0: + job_id = report.try_create_job(job_config, dict(status='queued')) + print('Job scheduled in Paddles with name {name} and ID {job_id}'.format( + name=job_config['name'], job_id=job_id)) + job_config['job_id'] = str(job_id) + + num -= 1 + + +def beanstalk_schedule_job(job_config, backend, num=1): + """ + Schedule a job with Beanstalk as the backend. :param job_config: The complete job dict :param num: The number of times to schedule the job """ num = int(num) - job = yaml.safe_dump(job_config) tube = job_config.pop('tube') - beanstalk = teuthology.beanstalk.connect() + beanstalk = teuthology.queue.beanstalk.connect() beanstalk.use(tube) + queue = report.create_machine_type_queue(job_config['machine_type']) + job_config['queue'] = queue while num > 0: - jid = beanstalk.put( + job_id = report.try_create_job(job_config, dict(status='queued')) + job_config['job_id'] = str(job_id) + job = yaml.safe_dump(job_config) + _ = beanstalk.put( job, ttr=60 * 60 * 24, priority=job_config['priority'], ) - print('Job scheduled with name {name} and ID {jid}'.format( - name=job_config['name'], jid=jid)) - job_config['job_id'] = str(jid) - if report_status: - report.try_push_job_info(job_config, dict(status='queued')) + print('Job scheduled in Beanstalk with name {name} and ID {job_id}'.format( + name=job_config['name'], job_id=job_id)) num -= 1 @@ -140,4 +160,3 @@ def dump_job_to_file(path, job_config, num=1): num -= 1 with open(count_file_path, 'w') as f: f.write(str(jid)) - diff --git a/teuthology/test/test_dispatcher.py b/teuthology/test/test_dispatcher.py new file mode 100644 index 0000000000..16c61618a0 --- /dev/null +++ b/teuthology/test/test_dispatcher.py @@ -0,0 +1,109 @@ +from teuthology import dispatcher +from unittest.mock import patch, Mock +from teuthology import report + +import unittest.mock as mock +import unittest + + +class TestDispatcher(unittest.TestCase): + + def test_mock_get_queue_job(self): + mock_get_patcher = patch('teuthology.dispatcher.report.get_queued_job') + machine_type = 'test_queue' + job_config = { + 'job_id': '1', + 'description': 'DESC', + 'email': 'EMAIL', + 'first_in_suite': False, + 'last_in_suite': True, + 'machine_type': 'test_queue', + 'name': 'NAME', + 'owner': 'OWNER', + 'priority': 99, + 'results_timeout': '6', + 'verbose': False, + } + + mock_get = mock_get_patcher.start() + mock_get.return_value = Mock(status_code = 200) + mock_get.return_value.json.return_value = job_config + + response = report.get_queued_job(machine_type) + + mock_get_patcher.stop() + + self.assertEqual(response.status_code, 200) + self.assertEqual(response.json(), job_config) + + + @patch("teuthology.worker.fetch_teuthology") + @patch("teuthology.dispatcher.fetch_qa_suite") + @patch("teuthology.worker.fetch_qa_suite") + @patch("teuthology.dispatcher.report.get_queued_job") + @patch("teuthology.dispatcher.report.try_push_job_info") + @patch("teuthology.dispatcher.setup_log_file") + @patch("os.path.isdir") + @patch("os.getpid") + @patch("teuthology.dispatcher.teuth_config") + @patch("subprocess.Popen") + @patch("os.path.join") + @patch("teuthology.dispatcher.create_job_archive") + @patch("yaml.safe_dump") + def test_dispatcher_main(self, m_fetch_teuthology, m_fetch_qa_suite, + m_worker_fetch_qa_suite, m_get_queued_job, + m_try_push_job_info, + m_setup_log, + m_isdir, m_getpid, + m_t_config, m_popen, m_join, m_create_archive, m_yaml_dump): + + args = { + '--owner': 'the_owner', + '--archive-dir': '/archive/dir', + '--log-dir': '/worker/log', + '--name': 'the_name', + '--description': 'the_description', + '--machine-type': 'test_queue', + '--supervisor': False, + '--verbose': False, + '--queue-backend': 'paddles', + '--exit-on-empty-queue': False + } + + m = mock.MagicMock() + job_id = {'job_id': '1'} + m.__getitem__.side_effect = job_id.__getitem__ + m.__iter__.side_effect = job_id.__iter__ + job = { + 'job_id': '1', + 'description': 'DESC', + 'email': 'EMAIL', + 'first_in_suite': False, + 'last_in_suite': True, + 'machine_type': 'test_queue', + 'name': 'NAME', + 'owner': 'OWNER', + 'priority': 99, + 'results_timeout': '6', + 'verbose': False, + 'stop_worker': True, + 'archive_path': '/archive/dir/NAME/1' + } + + m_fetch_teuthology.return_value = '/teuth/path' + m_fetch_qa_suite.return_value = '/suite/path' + m_isdir.return_value = True + mock_get_patcher = patch('teuthology.dispatcher.report.get_queued_job') + mock_get = mock_get_patcher.start() + mock_get.return_value = job + + mock_prep_job_patcher = patch('teuthology.dispatcher.prep_job') + mock_prep_job = mock_prep_job_patcher.start() + mock_prep_job.return_value = (job, '/teuth/bin/path') + m_yaml_dump.return_value = '' + + m_try_push_job_info.called_once_with(job, dict(status='running')) + dispatcher.main(args) + mock_get_patcher.stop() + + diff --git a/teuthology/test/test_schedule.py b/teuthology/test/test_schedule.py index dd0a68f845..1add70620a 100644 --- a/teuthology/test/test_schedule.py +++ b/teuthology/test/test_schedule.py @@ -1,8 +1,14 @@ from teuthology.schedule import build_config from teuthology.misc import get_user +from unittest.mock import patch, Mock +from teuthology import report +from teuthology import schedule +import unittest +import os -class TestSchedule(object): + +class TestSchedule(unittest.TestCase): basic_args = { '--verbose': False, '--owner': 'OWNER', @@ -43,3 +49,37 @@ def test_owner(self): job_dict = build_config(self.basic_args) assert job_dict['owner'] == 'scheduled_%s' % get_user() + + def test_dump_job_to_file(self): + path = 'teuthology/test/job' + job_config = { + 'description': 'DESC', + 'email': 'EMAIL', + 'first_in_suite': False, + 'last_in_suite': True, + 'machine_type': 'tala', + 'name': 'NAME', + 'owner': 'OWNER', + 'priority': 99, + 'results_timeout': '6', + 'verbose': False, + 'tube': 'tala', + } + schedule.dump_job_to_file(path, job_config) + + count_file_path = path + '.count' + assert os.path.exists(count_file_path) == True + + + def test_mock_create_queue(self): + mock_get_patcher = patch('teuthology.schedule.report.create_machine_type_queue') + machine_type = 'test_queue' + + mock_get = mock_get_patcher.start() + mock_get.return_value = Mock(status_code = 200) + + response = report.create_machine_type_queue(machine_type) + + mock_get_patcher.stop() + + self.assertEqual(response.status_code, 200)