diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..fc73503 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +cotea.egg-info/ +__pycache__/ +dist/ + diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..b0f0765 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,3 @@ +[build-system] +requires = ["setuptools>=42"] +build-backend = "setuptools.build_meta" diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..28b2e25 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,25 @@ +[metadata] +name = cotea +version = 1.3 +author = David Badalyan +author_email = dbadalyan@ispras.ru +description = Tool that provides Python API to run Ansible programmatically. +long_description = file: README.md +long_description_content_type = text/markdown +url = https://github.com/ispras/cotea +project_urls = + Bug Tracker = https://github.com/ispras/cotea/issues +classifiers = + Programming Language :: Python :: 3 + License :: OSI Approved :: Apache Software License + Operating System :: OS Independent + +[options] +package_dir = + = src +packages = find: +python_requires = >=3.6 + +[options.packages.find] +where = src + diff --git a/src/__pycache__/ansible_execution_sync.cpython-38.pyc b/src/__pycache__/ansible_execution_sync.cpython-38.pyc deleted file mode 100644 index 53f35e4..0000000 Binary files a/src/__pycache__/ansible_execution_sync.cpython-38.pyc and /dev/null differ diff --git a/src/__pycache__/arguments_maker.cpython-38.pyc b/src/__pycache__/arguments_maker.cpython-38.pyc deleted file mode 100644 index 4122fb7..0000000 Binary files a/src/__pycache__/arguments_maker.cpython-38.pyc and /dev/null differ diff --git a/src/__pycache__/runner.cpython-38.pyc b/src/__pycache__/runner.cpython-38.pyc deleted file mode 100644 index 01c43ba..0000000 Binary files a/src/__pycache__/runner.cpython-38.pyc and /dev/null differ diff --git a/src/cotea/__init__.py b/src/cotea/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/ansible_execution_sync.py b/src/cotea/ansible_execution_sync.py similarity index 54% rename from src/ansible_execution_sync.py rename to src/cotea/ansible_execution_sync.py index 2469049..6092202 100644 --- a/src/ansible_execution_sync.py +++ b/src/cotea/ansible_execution_sync.py @@ -6,39 +6,41 @@ def __init__(self, logger): self.runner_event = threading.Event() self.ansible_event = threading.Event() self.logger = logger + self.curr_breakpoint_label = None def status(self): - self.logger.info("Runner event status: %s", self.runner_event.is_set()) - self.logger.info("Ansible event status: %s", + self.logger.debug("Runner event status: %s", self.runner_event.is_set()) + self.logger.debug("Ansible event status: %s", self.ansible_event.is_set()) def runner_just_wait(self): - self.logger.info("runner: waiting...") + #self.logger.debug("runner: waiting...") self.runner_event.wait() self.runner_event.clear() def ansible_just_wait(self): - self.logger.info("ansible: waiting...") + #self.logger.debug("ansible: waiting...") self.ansible_event.wait() self.ansible_event.clear() - def continue_runner_with_stop(self): - self.logger.info("ansible: resume runner work and wait") + def continue_runner_with_stop(self, curr_breakpoint_label): + #self.logger.debug("ansible: resume runner work and wait") + self.curr_breakpoint_label = curr_breakpoint_label self.runner_event.set() self.ansible_event.wait() self.ansible_event.clear() def continue_ansible_with_stop(self): - self.logger.info("runner: resume ansible work and wait") + #self.logger.debug("runner: resume ansible work and wait") self.ansible_event.set() self.runner_event.wait() self.runner_event.clear() - self.logger.debug("runner: ANSIBLE WAKED ME UP") + #self.logger.debug("runner: ANSIBLE WAKED ME UP") def continue_runner(self): - self.logger.info("ansible: resume runner work") + #self.logger.debug("ansible: resume runner work") self.runner_event.set() def continue_ansible(self): - self.logger.info("runner: resume ansible work") + #self.logger.debug("runner: resume ansible work") self.ansible_event.set() diff --git a/src/arguments_maker.py b/src/cotea/arguments_maker.py similarity index 100% rename from src/arguments_maker.py rename to src/cotea/arguments_maker.py diff --git a/src/cotea/runner.py b/src/cotea/runner.py new file mode 100644 index 0000000..0444b7e --- /dev/null +++ b/src/cotea/runner.py @@ -0,0 +1,290 @@ +import logging +import threading + +from ansible.cli import CLI +from ansible.plugins.strategy.linear import StrategyModule +from ansible.plugins.strategy import StrategyBase +from ansible.cli.playbook import PlaybookCLI +from ansible.parsing.yaml.objects import AnsibleUnicode + +from cotea.ansible_execution_sync import ans_sync +from cotea.task_result import TaskResult +from cotea.wrappers.pbcli_run_wrapper import pbcli_run_wrapper +from cotea.wrappers.strategy_run_wrapper import strategy_run_wrapper +from cotea.wrappers.get_next_task_wrapper import get_next_task_wrapper +from cotea.wrappers.update_active_conn_wrapper import update_active_conn_wrapper +from cotea.wrappers.play_prereqs_wrapper import play_prereqs_wrapper + + +class runner: + def __init__(self, pb_path, arg_maker, debug_mod=None): + logging_lvl = logging.INFO + if debug_mod: + logging_lvl= logging.DEBUG + + logging.basicConfig(format="%(name)s %(asctime)s %(message)s", \ + datefmt="%H:%M:%S", level=logging_lvl) + + self.pb_path = pb_path + self.arg_maker = arg_maker + + self.logger = logging.getLogger("RUNNER") + + log_sync = logging.getLogger("SYNC") + self.sync_obj = ans_sync(log_sync) + + self.breakpoint_labeles = {} + self.breakpoint_labeles["before_playbook"] = "before_playbook_run" + self.breakpoint_labeles["after_playbook"] = "after_playbook_run" + self.breakpoint_labeles["before_play"] = "before_play_run" + self.breakpoint_labeles["after_play"] = "after_play_run" + self.breakpoint_labeles["before_task"] = "before_task_run" + self.breakpoint_labeles["after_task"] = "after_task_run" + + self._set_wrappers() + start_ok = self._start_ansible() + self.logger.debug("Ansible start ok: %s", start_ok) + + + def _set_wrappers(self): + wrp_lgr = logging.getLogger("WRPR") + + self.pbcli_run_wrp = pbcli_run_wrapper(PlaybookCLI.run, self.sync_obj, wrp_lgr, + self.breakpoint_labeles["before_playbook"], + self.breakpoint_labeles["after_playbook"]) + PlaybookCLI.run = self.pbcli_run_wrp + + self.play_wrp = strategy_run_wrapper(StrategyModule.run, self.sync_obj, wrp_lgr, + self.breakpoint_labeles["before_play"], + self.breakpoint_labeles["after_play"]) + StrategyModule.run = self.play_wrp + + self.task_wrp = get_next_task_wrapper(StrategyModule._get_next_task_lockstep, + self.sync_obj, wrp_lgr, + self.breakpoint_labeles["before_task"]) + StrategyModule._get_next_task_lockstep = self.task_wrp + + self.update_conn_wrapper = update_active_conn_wrapper(StrategyBase.update_active_connections, + self.sync_obj, wrp_lgr, + self.breakpoint_labeles["after_task"]) + StrategyBase.update_active_connections = self.update_conn_wrapper + + self.play_prereqs_wrp = play_prereqs_wrapper(CLI._play_prereqs, + self.sync_obj, wrp_lgr) + CLI._play_prereqs = self.play_prereqs_wrp + + + def _start_ansible(self): + args = self.arg_maker.args + args.insert(0, "/usr/local/bin/ansible-playbook") + args.insert(1, self.pb_path) + + self.pbCLI = PlaybookCLI(args) + + self.ansible_thread = threading.Thread(target=self.pbCLI.run) + self.ansible_thread.start() + self.sync_obj.runner_just_wait() + + if self.sync_obj.curr_breakpoint_label == self.breakpoint_labeles["before_playbook"]: + return True + + return False + + + def has_next_play(self): + if self.sync_obj.curr_breakpoint_label == self.breakpoint_labeles["after_playbook"]: + return False + + self.sync_obj.continue_ansible_with_stop() + current_bp_label = self.sync_obj.curr_breakpoint_label + self.logger.debug("has_next_play: %s", current_bp_label) + + if current_bp_label == self.breakpoint_labeles["before_play"]: + return True + + return False + + + def has_next_task(self): + if self.sync_obj.curr_breakpoint_label == self.breakpoint_labeles["after_playbook"]: + return False + + self.sync_obj.continue_ansible_with_stop() + current_bp_label = self.sync_obj.curr_breakpoint_label + self.logger.debug("has_next_task: %s", current_bp_label) + + if current_bp_label == self.breakpoint_labeles["before_task"]: + return True + + return False + + + def run_next_task(self): + res = [] + + if self.sync_obj.curr_breakpoint_label == self.breakpoint_labeles["after_playbook"]: + return res + + self.sync_obj.continue_ansible_with_stop() + current_bp_label = self.sync_obj.curr_breakpoint_label + self.logger.debug("run_next_task: %s", current_bp_label) + + if current_bp_label != self.breakpoint_labeles["after_task"]: + self.logger.debug("run_next_task() has come not in to the 'after_task'") + + for task_result_ansible_obj in self.update_conn_wrapper.current_results: + res.append(TaskResult(task_result_ansible_obj)) + + return res + + + def schedule_last_task_again(self): + self.task_wrp.run_last_one = True + + + def finish_ansible(self): + while self.sync_obj.curr_breakpoint_label != self.breakpoint_labeles["after_playbook"]: + self.sync_obj.continue_ansible_with_stop() + + self.sync_obj.continue_ansible() + + + def get_cur_play_name(self): + return self.play_wrp.current_play_name + + + def get_next_task(self): + return self.task_wrp.get_next_task() + + + def get_next_task_name(self): + return self.task_wrp.get_next_task_name() + + + def get_prev_task(self): + return self.task_wrp.get_prev_task() + + + def get_prev_task_name(self): + return self.task_wrp.get_prev_task_name() + + + def get_last_task_result(self): + res = [] + + for task_result_ansible_obj in self.update_conn_wrapper.current_results: + res.append(TaskResult(task_result_ansible_obj)) + + return res + + + # returns True if there was an non ignored error + def was_error(self): + return self.play_wrp.was_error + + + # returns list with all errors, including the ignored ones + def get_all_error_msgs(self): + return self.update_conn_wrapper.error_msgs + + + # returns last error msg that wasn't ignored + def get_error_msg(self): + res = "" + + # the errors didn't have 'ignore_errors' + if self.was_error(): + errors_count = len(self.update_conn_wrapper.error_msgs) + + if errors_count > 0: + res = self.update_conn_wrapper.error_msgs[errors_count - 1] + + return res + + + def get_all_vars(self): + variable_manager = self.play_wrp.variable_manager + cur_play = self.play_wrp.iterator._play + hosts = self.play_wrp.hosts + hosts_all = self.play_wrp.hosts_all + + res = variable_manager.get_vars(play=cur_play, + _hosts=hosts, + _hosts_all=hosts_all) + + return res + + + def get_all_facts(self): + return self.play_prereqs_wrp.variable_manager._fact_cache.copy() + + + def get_variable(self, var_name): + if var_name == "ansible_facts": + return self.get_all_facts() + + all_vars = self.get_all_vars() + + if var_name in all_vars: + return all_vars[var_name] + + # check groups + if "groups" in all_vars: + if var_name in all_vars["groups"]: + return all_vars["groups"][var_name] + + result = {} + + # check hostvars + if "hostvars" in all_vars: + for host in all_vars["hostvars"]: + for key in all_vars["hostvars"][host]: + if key == var_name: + result[host] = {key: all_vars["hostvars"][host][key]} + + if result: + return result + + facts = self.get_all_facts() + for host_key in facts: + if var_name in facts[host_key]: + result[host_key] = facts[host_key][var_name] + + if result: + return result + + self.logger.info("There is no variable with name %s", var_name) + + return None + + + def add_var_as_extra_var(self, new_var_name, value): + variable_manager = self.play_wrp.variable_manager + + ansible_way_var = AnsibleUnicode(new_var_name) + variable_manager._extra_vars[ansible_way_var] = value + + + def _getIP(self): + var_name = "openstack_servers" + host_name = "localhost" + ip1_field_name = "interface_ip" + ip2_field_name = "private_v4" + + res = "" + ostack_var = self.get_variable(var_name) + + try: + if ip1_field_name in ostack_var[host_name][0]: + res = str(ostack_var[host_name][0][ip1_field_name]) + elif ip2_field_name in ostack_var[host_name][0]: + res = str(ostack_var[host_name][0][ip2_field_name]) + except Exception as e: + self.logger.info("During runner._getIP() call error was occured. We skipped it.") + self.logger.info("Error is:\n%s", e) + + self.logger.debug("get_ip res = %s", res) + self.logger.debug(type(res)) + + return res + diff --git a/src/cotea/task_result.py b/src/cotea/task_result.py new file mode 100644 index 0000000..847ab9b --- /dev/null +++ b/src/cotea/task_result.py @@ -0,0 +1,14 @@ +from ansible.executor.task_result import TaskResult as TR + + +class TaskResult: + def __init__(self, ansible_task_result: TR): + self.result = ansible_task_result._result.copy() + self.task_name = ansible_task_result.task_name + self.task_ansible_object = ansible_task_result._task.copy() + self.task_fields = ansible_task_result._task_fields.copy() + + self.is_changed = ansible_task_result.is_changed() + self.is_failed = ansible_task_result.is_failed() + self.is_skipped = ansible_task_result.is_skipped() + self.is_unreachable = ansible_task_result.is_unreachable() diff --git a/src/cotea/wrappers/__init__.py b/src/cotea/wrappers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/cotea/wrappers/ansi_breakpoint.py b/src/cotea/wrappers/ansi_breakpoint.py new file mode 100644 index 0000000..ea0b5c3 --- /dev/null +++ b/src/cotea/wrappers/ansi_breakpoint.py @@ -0,0 +1,7 @@ +class ansi_breakpoint: + def __init__(self, sync_obj, label): + self.sync_obj = sync_obj + self.label = label + + def stop(self): + self.sync_obj.continue_runner_with_stop(self.label) \ No newline at end of file diff --git a/src/cotea/wrappers/get_next_task_wrapper.py b/src/cotea/wrappers/get_next_task_wrapper.py new file mode 100644 index 0000000..ec9e2aa --- /dev/null +++ b/src/cotea/wrappers/get_next_task_wrapper.py @@ -0,0 +1,112 @@ +from ansible.inventory.host import Host + +from cotea.wrappers.wrapper_base import wrapper_base +from cotea.wrappers.ansi_breakpoint import ansi_breakpoint + +HOST_IND = 0 +TASK_IND = 1 + + +# wraps ansible.plugins.strategy.linear.StrategyModule._get_next_task_lockstep() +class get_next_task_wrapper(wrapper_base): + def __init__(self, func, sync_obj, logger, bp_label): + super().__init__(func, sync_obj, logger) + self.before_task_bp = ansi_breakpoint(sync_obj, bp_label) + self.prev_tasks = [] + self.next_tasks = [] + self.run_last_one = False + + + def __call__(self, real_obj, hosts_left, iterator): + result = None + + if self.run_last_one: + result = self.prev_tasks + self.run_last_one = False + else: + result = self.func(real_obj, hosts_left, iterator) + + self._copy_prev_tasks(result) + self.next_tasks = result + self.before_task_bp.stop() + + ''' + self.logger.info("task run") + #print(dir(result[0][1])) + #print(dir(result[0][1]._name)) + if result[0][1] != None: + if result[0][1].name == "Execute bad command": + self.zapas_task = result[0][1].copy() + else: + if self.zapas_task != None: + print("aaaaa") + return [(result[0][0], self.zapas_task)] + #self.sync_obj.continue_runner_with_stop() + + print("zapas:", type(self.zapas_task)) + ''' + + return result + + # все следующие - meta -> прокручиваем их без стопа и без + # возврата управления + # следующий - норм таск -> возвращаем управление, говоря true + # следующих не осталось -> возвращаем управление, говоря false + + + def _copy_prev_tasks(self, hosttasks): + self.prev_tasks = [] + + for hosttask in hosttasks: + temp_host = None + temp_task = None + + if hosttask[HOST_IND]: + temp_host = Host() + temp_host.deserialize(hosttask[HOST_IND].serialize()) + + if hosttask[TASK_IND]: + temp_task = hosttask[TASK_IND].copy() + + if temp_host: + self.prev_tasks.append( (temp_host, temp_task) ) + + + def get_next_task(self): + res = None + + if len(self.next_tasks) > 0: + if len(self.next_tasks[0]) >= 2: + res = self.next_tasks[0][TASK_IND] + + return res + + + def get_next_task_name(self): + res = None + next_task = self.get_next_task() + + if next_task: + res = next_task.get_name() + + return res + + + def get_prev_task(self): + res = None + + if len(self.prev_tasks) > 0: + if len(self.prev_tasks[0]) >= 2: + res = self.prev_tasks[0][TASK_IND] + + return res + + + def get_prev_task_name(self): + res = None + prev_task = self.get_prev_task() + + if prev_task: + res = prev_task.get_name() + + return res \ No newline at end of file diff --git a/src/cotea/wrappers/pbcli_run_wrapper.py b/src/cotea/wrappers/pbcli_run_wrapper.py new file mode 100644 index 0000000..27b2771 --- /dev/null +++ b/src/cotea/wrappers/pbcli_run_wrapper.py @@ -0,0 +1,25 @@ +from cotea.wrappers.wrapper_base import wrapper_base +from cotea.wrappers.ansi_breakpoint import ansi_breakpoint + + +# wraps from ansible.cli.playbook.PlaybookCLI.run() +class pbcli_run_wrapper(wrapper_base): + def __init__(self, func, sync_obj, logger, before_bp_label, after_bp_label): + super().__init__(func, sync_obj, logger) + + #self.parse_error = False + self.before_playbook_bp = ansi_breakpoint(sync_obj, before_bp_label) + self.after_playbook_bp = ansi_breakpoint(sync_obj, after_bp_label) + + + # call of PlaybookCLI.run() + def __call__(self, real_obj): + self.logger.debug("before playbook") + self.before_playbook_bp.stop() + + result = self.func(real_obj) + + self.logger.debug("after playbook") + self.after_playbook_bp.stop() + + return result diff --git a/src/wrappers/play_prereqs_wrapper.py b/src/cotea/wrappers/play_prereqs_wrapper.py similarity index 56% rename from src/wrappers/play_prereqs_wrapper.py rename to src/cotea/wrappers/play_prereqs_wrapper.py index 787d308..008129c 100644 --- a/src/wrappers/play_prereqs_wrapper.py +++ b/src/cotea/wrappers/play_prereqs_wrapper.py @@ -1,4 +1,4 @@ -from cotea.src.wrappers.wrapper_base import wrapper_base +from cotea.wrappers.wrapper_base import wrapper_base # wraps from ansible.cli.CLI._play_prereqs() @@ -12,15 +12,10 @@ def __init__(self, func, sync_obj, logger): def __call__(self, real_obj): loader, inventory, variable_manager = self.func() - self.logger.debug("_pre_reqs call") + self.logger.debug("_play_prereqs() call") - if not self.loader: - self.loader = loader - - if not self.inventory: - self.inventory = inventory - - if not self.variable_manager: - self.variable_manager = variable_manager + self.loader = loader + self.inventory = inventory + self.variable_manager = variable_manager return loader, inventory, variable_manager diff --git a/src/cotea/wrappers/strategy_run_wrapper.py b/src/cotea/wrappers/strategy_run_wrapper.py new file mode 100644 index 0000000..7e61c39 --- /dev/null +++ b/src/cotea/wrappers/strategy_run_wrapper.py @@ -0,0 +1,50 @@ +from cotea.wrappers.wrapper_base import wrapper_base +from cotea.wrappers.ansi_breakpoint import ansi_breakpoint + + +# wraps ansible.plugins.strategy.linear.StrategyModule.run() +class strategy_run_wrapper(wrapper_base): + def __init__(self, func, sync_obj, logger, before_bp_label, after_bp_label): + super().__init__(func, sync_obj, logger) + + self.before_play_bp = ansi_breakpoint(sync_obj, before_bp_label) + self.after_play_bp = ansi_breakpoint(sync_obj, after_bp_label) + self.current_play_name = None + self.iterator = None + self.play_context = None + self.variable_manager = None + self.hosts = None + self.hosts_all = None + self.was_error = False + + + def __call__(self, real_obj, iterator, play_context): + self.logger.debug("play run") + + self.was_error = False + self.iterator = iterator + self.play_context = play_context + self.variable_manager = real_obj._variable_manager + self.hosts = real_obj._hosts_cache + self.hosts_all = real_obj._hosts_cache_all + try: + self.current_play_name = iterator._play.get_name() + except: + pass + + self.before_play_bp.stop() + + result = self.func(real_obj, iterator, play_context) + + self.iterator = None + self.play_context = None + self.current_play_name = None + + self.logger.debug("play end") + + self.after_play_bp.stop() + if result != real_obj._tqm.RUN_OK: + self.was_error = True + + return result + diff --git a/src/cotea/wrappers/update_active_conn_wrapper.py b/src/cotea/wrappers/update_active_conn_wrapper.py new file mode 100644 index 0000000..5433ca2 --- /dev/null +++ b/src/cotea/wrappers/update_active_conn_wrapper.py @@ -0,0 +1,40 @@ +from cotea.wrappers.wrapper_base import wrapper_base +from cotea.wrappers.ansi_breakpoint import ansi_breakpoint + + +# wraps ansible.plugins.strategy.StrategyBase.update_active_connections() +class update_active_conn_wrapper(wrapper_base): + def __init__(self, func, sync_obj, logger, bp_label): + super().__init__(func, sync_obj, logger) + + self.after_task_bp = ansi_breakpoint(sync_obj, bp_label) + self.current_results = None + self.error_msgs = [] + + def __call__(self, real_obj, results): + result = self.func(real_obj, results) + self.current_results = results + self._save_error_msgs(results) + self.after_task_bp.stop() + + ''' + self.logger.info("task end, result:") + if len(results) > 0: + #self.logger.info(results[0].is_failed()) + #self.logger.info(dir(results[0])) + if results[0].is_failed(): + self.logger.info(results[0]._result["stderr_lines"]) + #self.sync_obj.continue_runner_with_stop() + ''' + + return result + + + def _save_error_msgs(self, results): + for res in results: + if res.is_failed() or res.is_unreachable(): + if hasattr(res, "_result"): + if "stderr" in res._result: + self.error_msgs.append(res._result["stderr"]) + elif "msg" in res._result: + self.error_msgs.append(res._result["msg"]) diff --git a/src/wrappers/wrapper_base.py b/src/cotea/wrappers/wrapper_base.py similarity index 100% rename from src/wrappers/wrapper_base.py rename to src/cotea/wrappers/wrapper_base.py diff --git a/src/runner.py b/src/runner.py deleted file mode 100644 index 775414f..0000000 --- a/src/runner.py +++ /dev/null @@ -1,412 +0,0 @@ -import threading -import logging -import sys - -from ansible.cli import CLI -from ansible.cli.playbook import PlaybookCLI -from ansible.playbook import Playbook, Play -from ansible.executor.playbook_executor import PlaybookExecutor -from ansible.inventory.manager import InventoryManager -from ansible.vars.manager import VariableManager -from ansible.parsing.dataloader import DataLoader -from ansible import context -from ansible.plugins.strategy import StrategyBase -from ansible.plugins.strategy.linear import StrategyModule -from ansible.executor.task_queue_manager import TaskQueueManager - -from cotea.src.ansible_execution_sync import ans_sync -from cotea.src.arguments_maker import argument_maker -from cotea.src.wrappers.get_batches_wrapper import get_batches_wrapper -from cotea.src.wrappers.strategy_run_wrapper import strategy_run_wrapper -from cotea.src.wrappers.get_next_task_wrapper import get_next_task_wrapper -from cotea.src.wrappers.wait_pending_wrapper import wait_pending_wrapper -from cotea.src.wrappers.tqm_run_wrapper import tqm_run_wrapper -from cotea.src.wrappers.get_batch_len_wrapper import get_batch_len_wrapper -from cotea.src.wrappers.pbcli_run_wrapper import pbcli_run_wrapper -from cotea.src.wrappers.play_prereqs_wrapper import play_prereqs_wrapper - - -class runner: - def __init__(self, pb_path, arg_maker=None, debug_lvl=None, log_f=None): - # logger configuration - if not debug_lvl: - self.logging_lvl = logging.WARNING # %(levelname)s: - else: - if debug_lvl == "DEBUG": - self.logging_lvl = logging.DEBUG - elif debug_lvl == "INFO": - self.logging_lvl = logging.INFO - else: - self.logging_lvl = logging.WARNING - - if log_f is not None: - logging.basicConfig(format="%(name)s %(asctime)s %(message)s", - datefmt="%H:%M:%S", level=self.logging_lvl, - filename=log_f, filemode="a") - self.log_file = open(log_f, "a") - self.log_file.truncate(0) # clear file - sys.stdout = self.log_file - sys.stderr = self.log_file - else: - logging.basicConfig(format="%(name)s %(asctime)s %(message)s", - datefmt="%H:%M:%S", level=self.logging_lvl) - - self.logger = logging.getLogger("runner") - self.logger.debug("runner - INITED") - self.pb_path = pb_path - self.arg_maker = arg_maker - if not arg_maker: - self.arg_maker = argument_maker() - else: - self.logger.debug("ARGMAKER - OK") - - self.wrapper_set = False - self.setup_play_called_once = False - self.ansible_thread_run = False - self.there_was_an_error = False - self.play_not_loaded = -1 - self.play_ind_in_cur_pb = self.play_not_loaded - self.sync_obj = ans_sync(logging.getLogger("sync_obj")) - - self._check_pb_and_inv_path() - self.cur_plays_len = len(self.playbook.get_plays()) - - self._set_wrappers() - - args = self.arg_maker.args - args.insert(0, "/usr/local/bin/ansible-playbook") - args.insert(1, self.pb_path) - - # getting run string(the same as the cli command) - self.run_str = "" - for run_elem in args: - self.run_str += run_elem + " " - - self.pbCLI = PlaybookCLI(args) - - def _check_pb_and_inv_path(self): - inv_hosts = ("", ) - loader = DataLoader() - loader.set_basedir(".") - - passwords = {} - - inventory = InventoryManager(loader, sources=inv_hosts) - variable_manager = VariableManager(loader=loader, - inventory=inventory) - - # getting playbook - try: - pb = Playbook.load(self.pb_path, - variable_manager=variable_manager, - loader=loader) - except Exception as e: - error_msg = "There is something wrong with playbook path or " - error_msg += "parser has found error in playbook\nError:" - self.logger.error(error_msg) - self.logger.error(self.pb_path) - raise e - - self.playbook = pb # maybe this will be usable in future - - def _set_wrappers(self): - wrp_logger = logging.getLogger("ansible_wrappers") - - # setting wrapper on - # ansible.plugins.strategy.linear.StrategyModule.run() - # this method is called to run play - self._run_play_wrp = strategy_run_wrapper(StrategyModule.run, - self.sync_obj, wrp_logger) - StrategyModule.run = self._run_play_wrp - - # setting wrapper on - # ansible.plugins.strategy.linear.StrategyModule._get_next_task_lockstep() - self._get_next_task_wrp = get_next_task_wrapper(StrategyModule._get_next_task_lockstep, - self.sync_obj, wrp_logger) - StrategyModule._get_next_task_lockstep = self._get_next_task_wrp - - # setting wrapper on ansible.plugins.strategy.StrategyBase._wait_on_pending_results() - # this method is called while waiting for the task ending - # in most cases this method wait for the same task on different hosts - self._wait_res_wrp = wait_pending_wrapper(StrategyBase._wait_on_pending_results, - self.sync_obj, wrp_logger) - StrategyBase._wait_on_pending_results = self._wait_res_wrp - - # setting wrapper on ansible.executor.playbook_executor._get_serialized_batches() - self._get_batches_wrp = get_batches_wrapper(PlaybookExecutor._get_serialized_batches, - self.sync_obj, wrp_logger) - PlaybookExecutor._get_serialized_batches = self._get_batches_wrp - - # setting wrapper on ansible.executor.task_queue_manager.run() - self._tqm_run_wrp = tqm_run_wrapper(TaskQueueManager.run, - self.sync_obj, wrp_logger) - TaskQueueManager.run = self._tqm_run_wrp - - # setting wrapper on ansible.inventory.manager.InventoryManager.restrict_to_hosts() - self._get_batches_len_wrp = get_batch_len_wrapper(InventoryManager.restrict_to_hosts, - self.sync_obj, wrp_logger) - InventoryManager.restrict_to_hosts = self._get_batches_len_wrp - - # setting wrapper on ansible.cli.CLI._play_prereqs() - self._play_prereqs_wrp = play_prereqs_wrapper(CLI._play_prereqs, - self.sync_obj, wrp_logger) - CLI._play_prereqs = self._play_prereqs_wrp - - # setting wrapper on ansible.cli.playbook.PlaybookCLI.run() - ''' - self._pbcli_run_wrp = pbcli_run_wrapper(PlaybookCLI.run, - self.sync_obj, wrp_logger) - PlaybookCLI.run = self._pbcli_run_wrp - ''' - - def finish_ansible(self): - self.logger.debug("we are finishing") - - was_error = self.was_error() - - if was_error or not self._tqm_run_wrp.was_called: - if was_error: - self.logger.debug("finishing with errors") - else: - self.logger.debug("finishing without errors") - self.sync_obj.continue_ansible() - else: - self.logger.debug("finishing without errors") - self.sync_obj.continue_ansible_with_stop() - self.sync_obj.continue_ansible() - - def has_next_play(self): - self.logger.debug("HAS_NEXT_PLAY CALL") - # Ansible will stop his work anyway if there was failed task - # and there was no 'ignore_errors' - - result = True - - if self.there_was_an_error: - self.logger.debug("THERE WAS FAILED TASK") - result = False - - # so strange because self.play_ind_in_cur_pb initial value is -1 - if self.play_ind_in_cur_pb + 1 > self.cur_plays_len - 1: - result = False - self.logger.debug("HAS_NEXT_PLAY from python: result = %s" % result) - self.logger.debug("HAS_NEXT_PLAY from python: cur_play_ind = %d", - self.play_ind_in_cur_pb + 1) - self.logger.debug("HAS_NEXT_PLAY from python: cur_play_ind = %d", - self.cur_plays_len - 1) - - return result - - def setup_play_for_run(self): - self.logger.debug("SETUP_PLAY_FOR_RUN CALL") - - self.play_ind_in_cur_pb += 1 - if self.play_ind_in_cur_pb > self.cur_plays_len - 1: - return False - - if not self.ansible_thread_run: - try: - self.ansible_thread_run = True - ansible_thread = threading.Thread(target=self.pbCLI.run) - ansible_thread.start() - except Exception as e: - self.logger.error("Ansible launch was failed\n") - self.logger.error("Error:") - raise e - - if self.play_ind_in_cur_pb == 0: # first play in pb - self.sync_obj.runner_just_wait() - else: - self.sync_obj.continue_ansible_with_stop() - - # this is not first play - if self.setup_play_called_once and self._tqm_run_wrp.was_called: - # there was an error - if self.was_error(): - self.there_was_an_error = True - return False - - self.sync_obj.continue_ansible_with_stop() - - if self._get_batches_wrp.empty_play: - return False - - self.sync_obj.continue_ansible_with_stop() - - self._wait_res_wrp.ansible_iterator = self._run_play_wrp.ansible_iterator - self._wait_res_wrp.hosts_left = self._run_play_wrp.hosts_left - - self.setup_play_called_once = True - - return True - - def has_next_task(self): - self.logger.debug("HAS_NEXT_TASK CALL") - - self.sync_obj.continue_ansible_with_stop() - - ''' - if self.some_task_was_failed: - self.logger.debug("THERE WAS FAILED TASK") - return False - ''' - - res = self._get_next_task_wrp.has_next_task - self.logger.debug("HAS_NEXT_TASK RES = %s", res) - return res - - def run_next_task(self): - self.logger.debug("RUN_NEXT_TASK CALL") - self.sync_obj.continue_ansible_with_stop() - if self._wait_res_wrp.failed_task: - self.some_task_was_failed = True - return False - - return True - - # returns tasks of current play as a dict, where each host - # is a key and its value is list of its tasks - def get_cur_play_tasks(self): - self.logger.info(self._run_play_wrp.cur_play_hosts_tasks) - return self._run_play_wrp.cur_play_hosts_tasks - - # returns all gotten task results - # each result is an object of ansible.executor.task_result class - def get_results(self): - return self._wait_res_wrp.results - - # returns results of last pass(last run task on each host) - def get_last_results(self): - res = self._wait_res_wrp.results - return res[len(res) - 1] - - def make_results_pretty_print(self): - self._wait_res_wrp.print_results() - - # returns list of next tasks for each host of the play - def get_next_task_name(self): - next_tasks = self._wait_res_wrp.get_next_task() - result_name = "" - - try: - result_name = str(next_tasks[0].name) - except Exception: - pass - # self.logger.info(next_task) - return result_name - - def get_cur_play_name(self): - return self._tqm_run_wrp.play_name - - # will skip next task on every host - def skip_next_task(self): - self._wait_res_wrp.skip_next_task() - - def get_all_vars(self): - all_vars = self._play_prereqs_wrp.variable_manager.get_vars() - return all_vars - - def get_all_facts(self): - facts = self._play_prereqs_wrp.variable_manager._fact_cache.copy() - return facts - - def get_variable(self, var_name): - if var_name == "ansible_facts": - facts = self.get_all_facts() - return facts - - all_vars = self._play_prereqs_wrp.variable_manager.get_vars() - - if var_name in all_vars: - return all_vars[var_name] - - # check groups - if "groups" in all_vars: - if var_name in all_vars["groups"]: - return all_vars["groups"][var_name] - - result = {} - # check hostvars - if "hostvars" in all_vars: - for host in all_vars["hostvars"]: - for key in all_vars["hostvars"][host]: - if key == var_name: - result[host] = {key: all_vars["hostvars"][host][key]} - - if result: - return result - - facts = self.get_all_facts() - for host_key in facts: - if var_name in facts[host_key]: - result[host_key] = facts[host_key][var_name] - - if result: - return result - - self.logger.info("There is no variable with name " + str(var_name)) - return None - - def was_error(self): - if self._tqm_run_wrp.failed_tasks_hosts: - return True - - if self._get_batches_wrp.batches_count: - compare_with = self._get_batches_wrp.batches_count - if self._get_batches_len_wrp.batch_len: - compare_with = self._get_batches_len_wrp.batch_len - - self.logger.debug("From restrict: %d", - self._get_batches_len_wrp.batch_len) - self.logger.debug("From batches: %d", - self._get_batches_wrp.batches_count) - - if self._tqm_run_wrp.failed_hosts_count == compare_with: - return True - - return False - - def get_error_msg(self): - # this is object of ansible.executor.TaskResult class - last_task_result = self.get_last_results() - - msg = "" - - if len(last_task_result) > 0: - if last_task_result[0]._result: - if "msg" in last_task_result[0]._result: - try: - msg = str(last_task_result[0]._result["msg"]) - except Exception as e: - self.logger.debug(e) - msg = "" - - self.logger.debug("get_error_msg res = %s", msg) - self.logger.debug(type(msg)) - return msg - - def _getIP(self): - var_name = "openstack_servers" - host_name = "localhost" - ip1_field_name = "interface_ip" - ip2_field_name = "private_v4" - - res = "" - - ostack_var = self.get_variable(var_name) - try: - if ip1_field_name in ostack_var[host_name][0]: - res = str(ostack_var[host_name][0][ip1_field_name]) - elif ip2_field_name in ostack_var[host_name][0]: - res = str(ostack_var[host_name][0][ip2_field_name]) - except Exception as e: - self.logger.debug("During runner._getIP() call error was occured. We skipped it.") - self.logger.debug("Error is:\n%s", e) - - self.logger.debug("get_ip res = %s", res) - self.logger.debug(type(res)) - return res - - def get_run_string(self): - self.logger.debug("get_run_string res = %s", self.run_str) - return str(self.run_str) diff --git a/src/wrappers/__pycache__/get_batch_len_wrapper.cpython-38.pyc b/src/wrappers/__pycache__/get_batch_len_wrapper.cpython-38.pyc deleted file mode 100644 index 6454538..0000000 Binary files a/src/wrappers/__pycache__/get_batch_len_wrapper.cpython-38.pyc and /dev/null differ diff --git a/src/wrappers/__pycache__/get_batches_wrapper.cpython-38.pyc b/src/wrappers/__pycache__/get_batches_wrapper.cpython-38.pyc deleted file mode 100644 index 46c93fe..0000000 Binary files a/src/wrappers/__pycache__/get_batches_wrapper.cpython-38.pyc and /dev/null differ diff --git a/src/wrappers/__pycache__/get_next_task_wrapper.cpython-38.pyc b/src/wrappers/__pycache__/get_next_task_wrapper.cpython-38.pyc deleted file mode 100644 index eef1769..0000000 Binary files a/src/wrappers/__pycache__/get_next_task_wrapper.cpython-38.pyc and /dev/null differ diff --git a/src/wrappers/__pycache__/pbcli_run_wrapper.cpython-38.pyc b/src/wrappers/__pycache__/pbcli_run_wrapper.cpython-38.pyc deleted file mode 100644 index ef2eac1..0000000 Binary files a/src/wrappers/__pycache__/pbcli_run_wrapper.cpython-38.pyc and /dev/null differ diff --git a/src/wrappers/__pycache__/play_prereqs_wrapper.cpython-38.pyc b/src/wrappers/__pycache__/play_prereqs_wrapper.cpython-38.pyc deleted file mode 100644 index c6c6d6e..0000000 Binary files a/src/wrappers/__pycache__/play_prereqs_wrapper.cpython-38.pyc and /dev/null differ diff --git a/src/wrappers/__pycache__/strategy_run_wrapper.cpython-38.pyc b/src/wrappers/__pycache__/strategy_run_wrapper.cpython-38.pyc deleted file mode 100644 index 25f65c5..0000000 Binary files a/src/wrappers/__pycache__/strategy_run_wrapper.cpython-38.pyc and /dev/null differ diff --git a/src/wrappers/__pycache__/tqm_run_wrapper.cpython-38.pyc b/src/wrappers/__pycache__/tqm_run_wrapper.cpython-38.pyc deleted file mode 100644 index bbc34df..0000000 Binary files a/src/wrappers/__pycache__/tqm_run_wrapper.cpython-38.pyc and /dev/null differ diff --git a/src/wrappers/__pycache__/wait_pending_wrapper.cpython-38.pyc b/src/wrappers/__pycache__/wait_pending_wrapper.cpython-38.pyc deleted file mode 100644 index 33a937f..0000000 Binary files a/src/wrappers/__pycache__/wait_pending_wrapper.cpython-38.pyc and /dev/null differ diff --git a/src/wrappers/__pycache__/wrapper_base.cpython-38.pyc b/src/wrappers/__pycache__/wrapper_base.cpython-38.pyc deleted file mode 100644 index 0a396fd..0000000 Binary files a/src/wrappers/__pycache__/wrapper_base.cpython-38.pyc and /dev/null differ diff --git a/src/wrappers/get_batch_len_wrapper.py b/src/wrappers/get_batch_len_wrapper.py deleted file mode 100644 index d9762f7..0000000 --- a/src/wrappers/get_batch_len_wrapper.py +++ /dev/null @@ -1,16 +0,0 @@ -from cotea.src.wrappers.wrapper_base import wrapper_base - - -# wraps ansible.inventory.manager.InventoryManager.restrict_to_hosts() -class get_batch_len_wrapper(wrapper_base): - def __init__(self, func, sync_obj, logger): - super().__init__(func, sync_obj, logger) - self.batch_len = None - - def __call__(self, real_obj, batch): - self.logger.debug("WRAPPER: BEFORE restrict_to_hosts") - - # call of InventoryManager.restrict_to_hosts() - self.func(real_obj, batch) - - self.batch_len = len(batch) diff --git a/src/wrappers/get_batches_wrapper.py b/src/wrappers/get_batches_wrapper.py deleted file mode 100644 index 4e9c6ad..0000000 --- a/src/wrappers/get_batches_wrapper.py +++ /dev/null @@ -1,30 +0,0 @@ -from cotea.src.wrappers.wrapper_base import wrapper_base - - -# wraps ansible.executor.playbook_executor.PlaybookExecutor._get_serialized_batches() -class get_batches_wrapper(wrapper_base): - def __init__(self, func, sync_obj, logger): - super().__init__(func, sync_obj, logger) - self.empty_play = False - self.batches_count = None - - def __call__(self, real_obj, play): - self.logger.debug("WRAPPER: BEFORE GET_SERIALIZED_BATCHES") - self.empty_play = False - - # call of PlaybookExecutor._get_serialized_batches() - res_batches = self.func(real_obj, play) - - if len(res_batches) == 0: - self.empty_play = True - - if not self.empty_play: - self.batches_count = len(res_batches[0]) - - self.logger.debug("empty play: %s", self.empty_play) - self.sync_obj.continue_runner_with_stop() - - self.logger.info(res_batches) - self.logger.info(play.serial) - - return res_batches diff --git a/src/wrappers/get_next_task_wrapper.py b/src/wrappers/get_next_task_wrapper.py deleted file mode 100644 index c508f3d..0000000 --- a/src/wrappers/get_next_task_wrapper.py +++ /dev/null @@ -1,67 +0,0 @@ -from cotea.src.wrappers.wrapper_base import wrapper_base - - -# wraps ansible.plugins.strategy.linear.StrategyModule._get_next_task_lockstep() -class get_next_task_wrapper(wrapper_base): - def __init__(self, func, sync_obj, logger): - super().__init__(func, sync_obj, logger) - - self.ansible_iterator = None - self.hosts_left = None - self.has_next_task = False - - def __call__(self, real_obj, hosts_left, iterator): - self.logger.debug("WRAPPER: BEFORE GET_NEXT_TASK") - real_obj._set_hosts_cache(iterator._play) - self.hosts_left = hosts_left # real_obj.get_hosts_left(iterator) - prev_host_states = iterator._host_states.copy() - real_task_ind = 1 - tasks_count = 0 - meta_tasks_count = 0 - self.has_next_task = False - - for host in self.hosts_left: - task = iterator.get_next_task_for_host(host) - if task[real_task_ind]: - if task[real_task_ind].get_name() != "meta": - tasks_count += 1 - else: - meta_tasks_count += 1 - - # moving iterator to the starting state - try: - prev_host_state = prev_host_states[host.name] - except KeyError: - prev_host_state = iterator.get_host_state(host) - iterator._host_states[host.name] = prev_host_state - - self.ansible_iterator = iterator - - if tasks_count == 0 and meta_tasks_count == 0: - self.logger.debug("there is no task at all") - # there is no tasks at all - self.sync_obj.continue_runner_with_stop() - - result = self.func(real_obj, hosts_left, iterator) - return result - else: - if tasks_count > 0: - self.logger.debug("there is NORMAL task") - # no-meta task will be the next one - self.has_next_task = True - - # flashing control to runner - self.sync_obj.continue_runner_with_stop() - - result = self.func(real_obj, hosts_left, iterator) - return result - else: - self.logger.debug("there is only META") - # meta task will be the next one - result = self.func(real_obj, hosts_left, iterator) - return result - - # все следующие - meta -> прокручиваем их без стопа и без - # возврата управления - # следующий - норм таск -> возвращаем управление, говоря true - # следующих не осталось -> возвращаем управление, говоря false diff --git a/src/wrappers/pbcli_run_wrapper.py b/src/wrappers/pbcli_run_wrapper.py deleted file mode 100644 index 5eebe70..0000000 --- a/src/wrappers/pbcli_run_wrapper.py +++ /dev/null @@ -1,19 +0,0 @@ -from cotea.src.wrappers.wrapper_base import wrapper_base - - -# wraps from ansible.cli.playbook.PlaybookCLI.run() -class pbcli_run_wrapper(wrapper_base): - def __init__(self, func, sync_obj, logger): - super().__init__(func, sync_obj, logger) - - self.parse_error = False - - def __call__(self, real_obj): - # call of PlaybookCLI.run() - result = self.func(real_obj) - - ''' - self.logger.debug("After pb_cli_run") - self.logger.debug("Result = %s", result) - ''' - return result diff --git a/src/wrappers/strategy_run_wrapper.py b/src/wrappers/strategy_run_wrapper.py deleted file mode 100644 index eb4a99c..0000000 --- a/src/wrappers/strategy_run_wrapper.py +++ /dev/null @@ -1,45 +0,0 @@ -from cotea.src.wrappers.wrapper_base import wrapper_base - - -# wraps ansible.plugins.strategy.linear.StrategyModule.run() -class strategy_run_wrapper(wrapper_base): - def __init__(self, func, sync_obj, logger): - super().__init__(func, sync_obj, logger) - - self.cur_play_hosts_tasks = {} - self.total_tasks_count = 0 - self.ansible_iterator = None - self.ansible_play_context = None - self.hosts_left = None - - def __call__(self, real_obj, iterator, play_context): - self.logger.debug("WRAPPER: BEFORE RUN") - real_obj._set_hosts_cache(iterator._play) - self.hosts_left = real_obj.get_hosts_left(iterator) - prev_host_states = iterator._host_states.copy() - real_task_ind = 1 - self.cur_play_hosts_tasks = {} - - for host in self.hosts_left: - self.cur_play_hosts_tasks[host] = [] - while True: - task = iterator.get_next_task_for_host(host) - if task[real_task_ind] is None: - break - self.cur_play_hosts_tasks[host].append(task[real_task_ind]) - - # moving iterator to start state - try: - prev_host_state = prev_host_states[host.name] - except KeyError: - prev_host_state = iterator.get_host_state(host) - iterator._host_states[host.name] = prev_host_state - - self.ansible_iterator = iterator - self.ansible_play_context = play_context - - # flashing control to runner - self.sync_obj.continue_runner_with_stop() - - result = self.func(real_obj, iterator, play_context) - return result diff --git a/src/wrappers/tqm_run_wrapper.py b/src/wrappers/tqm_run_wrapper.py deleted file mode 100644 index 5c92d0b..0000000 --- a/src/wrappers/tqm_run_wrapper.py +++ /dev/null @@ -1,45 +0,0 @@ -from cotea.src.wrappers.wrapper_base import wrapper_base - - -# wraps ansible.executor.task_queue_manager.TaskQueueManager.run() -class tqm_run_wrapper(wrapper_base): - def __init__(self, func, sync_obj, logger): - super().__init__(func, sync_obj, logger) - - self.failed_tasks_hosts = False - self.failed_hosts_count = None - self.was_called = False - self.play_name = "" - - def __call__(self, real_obj, play): - self.logger.debug("WRAPPER: BEFORE TQM RUN") - previously_failed = len(real_obj._failed_hosts) - previously_unreachable = len(real_obj._unreachable_hosts) - - self.was_called = True - try: - self.play_name = str(play.get_name()) - except Exception: - pass - - # call of TaskQueueManager.run() - result = self.func(real_obj, play) - self.logger.debug("AFTER TQM, PARAPAPAAAAAAAAAAAAAAAAAAAAAAAAAAM") - - if result & real_obj.RUN_FAILED_BREAK_PLAY != 0: - self.failed_tasks_hosts = True - - failed_count = len(real_obj._failed_hosts) - unreachable_count = len(real_obj._unreachable_hosts) - self.failed_hosts_count = failed_count + unreachable_count - \ - (previously_failed + previously_unreachable) - - ''' - self.logger.debug("break_play =", self.failed_tasks_hosts) - self.logger.debug("failed_hosts_count =", self.failed_hosts_count) - ''' - - self.sync_obj.continue_runner_with_stop() - self.was_called = False - - return result diff --git a/src/wrappers/wait_pending_wrapper.py b/src/wrappers/wait_pending_wrapper.py deleted file mode 100644 index 169ced0..0000000 --- a/src/wrappers/wait_pending_wrapper.py +++ /dev/null @@ -1,124 +0,0 @@ -from cotea.src.wrappers.wrapper_base import wrapper_base - - -# wraps ansible.plugins.strategy.StrategyBase._wait_on_pending_results() -class wait_pending_wrapper(wrapper_base): - def __init__(self, func, sync_obj, logger): - super().__init__(func, sync_obj, logger) - - self.results = [] - self.hosts_left = None - self.ansible_iterator = None - self.this_is_last_play = False - self.failed_task = False - self.unreachable_hosts = [] - self.failed_hosts = [] - self.call_count = 0 - - def __call__(self, real_obj, iterator): - self.logger.debug("WRAPPER: BEFORE COLLECTING") - self.call_count += 1 - ret_results = self.func(real_obj, iterator) - self.ansible_iterator = iterator - self.hosts_left = real_obj.get_hosts_left(iterator) - self.results.append(ret_results) - - for got_result in ret_results: - if got_result.is_failed() and not \ - got_result._task_fields.get('ignore_errors'): - self.failed_task = True - self.sync_obj.continue_runner_with_stop() - return ret_results - - ''' - if self.has_next_task() or not self.this_is_last_play: - self.sync_obj.continue_runner_with_stop() - else: - self.sync_obj.continue_runner() - ''' - - self.sync_obj.continue_runner_with_stop() - - if real_obj._tqm._unreachable_hosts: - for unrch_host in real_obj._tqm._unreachable_hosts: - if unrch_host not in self.unreachable_hosts: - self.unreachable_hosts.append(unrch_host) - - if real_obj._tqm._failed_hosts: - for failed_host in real_obj._tqm._failed_hosts: - if failed_host not in self.failed_hosts: - self.failed_hosts.append(failed_host) - - return ret_results - - def print_results(self): - for res_of_pass in self.results: - for res in res_of_pass: - self.logger.info(res._host, end=" | ") - self.logger.info(res._task) - # self.logger.debug(res.is_unreachable()) - self.logger.info() - - def has_next_task(self): - prev_host_states = self.ansible_iterator._host_states.copy() - hosts_with_next_task = 0 - real_task_ind = 1 - - for host in self.hosts_left: - while True: - task = self.ansible_iterator.get_next_task_for_host(host) - if task[real_task_ind]: - self.logger.debug("iter -- %s", - task[real_task_ind].get_name()) - self.logger.debug("name attr = %s", - task[real_task_ind].name) - ''' - hosts_with_next_task += 1 - break - ''' - if task[real_task_ind].get_name() != "meta": - hosts_with_next_task += 1 - break - else: - break - - # moving iterator to start state - try: - prev_host_state = prev_host_states[host.name] - except KeyError: - prev_host_state = self.ansible_iterator.get_host_state(host) - self.ansible_iterator._host_states[host.name] = prev_host_state - - if hosts_with_next_task > 0: - # return hosts_with_next_task - return True - - return False - - def get_next_task(self): - prev_host_states = self.ansible_iterator._host_states.copy() - real_task_ind = 1 - result = [] - - for host in self.hosts_left: - while True: - task = self.ansible_iterator.get_next_task_for_host(host) - if task[real_task_ind]: - if task[real_task_ind].name: - result.append(task[real_task_ind]) - break - else: - break - - # moving iterator to start state - try: - prev_host_state = prev_host_states[host.name] - except KeyError: - prev_host_state = self.ansible_iterator.get_host_state(host) - self.ansible_iterator._host_states[host.name] = prev_host_state - - return result - - def skip_next_task(self): - for host in self.hosts_left: - self.ansible_iterator.get_next_task_for_host(host)