diff --git a/docs/concepts/tasks/README.md b/docs/concepts/tasks/README.md index ab9259d5..8ca7580c 100644 --- a/docs/concepts/tasks/README.md +++ b/docs/concepts/tasks/README.md @@ -12,7 +12,7 @@ There are many task types in Zrb. Every task has its own specific use cases: - [ResourceMaker](resource-maker.md): Generate artifacts/resources based on templates - [FlowTask](flow-task.md): Put `CmdTask` and `python task` into single flow. - [RemoteCmdTask](remote-cmd-task.md) -- [RsyncTask](remote-cmd-task.md) +- [RsyncTask](rsync-task.md) - [Checkers (HttpChecker, PortChecker, and PathChecker)](checkers.md): Check parent task's readiness. As every task are extended from `BaseTask`, you will see that most of them share some common parameters. @@ -400,13 +400,45 @@ Every task share some common methods like `run`, `check`, and `to_function`. Deep copy current task -## `add_envs` +## `inject_env` -## `add_env_files` +To be overridden -## `add_inputs` +## `insert_env` -## `add_upstreams` +## `add_env` + +## `inject_env_file` + +To be overridden + +## `insert_env_file` + +## `add_env_file` + +## `inject_input` + +To be overridden + +## `insert_input` + +## `add_input` + +## `inject_upstream` + +To be overridden + +## `insert_upstream` + +## `add_upstream` + +## `inject_checker` + +To be overridden + +## `insert_checker` + +## `add_checker` ## `set_name` diff --git a/docs/concepts/tasks/remote-cmd-task.md b/docs/concepts/tasks/remote-cmd-task.md index 7ec8093b..5b1dcacb 100644 --- a/docs/concepts/tasks/remote-cmd-task.md +++ b/docs/concepts/tasks/remote-cmd-task.md @@ -27,4 +27,13 @@ install_curl = RemoteCmdTask( runner.register(install_curl) ``` +RemoteCmdTask exposes several environments that you can use on your `cmd` and `cmd_path` + +- `_CONFIG_HOST` +- `_CONFIG_PORT` +- `_CONFIG_SSH_KEY` +- `_CONFIG_USER` +- `_CONFIG_PASSWORD` +- `_CONFIG_MAP_` + 🔖 [Table of Contents](../../README.md) / [Concepts](../README.md) / [Tasks](README.md) diff --git a/docs/concepts/tasks/rsync-task.md b/docs/concepts/tasks/rsync-task.md index 499fe04a..5eef1342 100644 --- a/docs/concepts/tasks/rsync-task.md +++ b/docs/concepts/tasks/rsync-task.md @@ -1,13 +1,13 @@ 🔖 [Table of Contents](../../README.md) / [Concepts](../README.md) / [Tasks](README.md) -# RSyncTask +# RsyncTask ```python from zrb import ( - runner, CmdTask, RSyncTask, RemoteConfig, PasswordInput, StrInput + runner, CmdTask, RsyncTask, RemoteConfig, PasswordInput, StrInput ) -upload = RSyncTask( +upload = RsyncTask( name='upload', inputs=[ PasswordInput(name='passsword'), @@ -18,17 +18,20 @@ upload = RSyncTask( RemoteConfig( host='192.168.1.10, user='ubuntu, - password='{{input.password}}' + password='{{input.password}}', + config_map={ + 'dir': '192-168-1-10' + } ) ], is_remote_src=False, - is_remote_dst=True - src='{{input.src}}', + src='$_CONFIG_MAP_DIR/{{input.src}}', + is_remote_dst=True, dst='{{input.dst}}', ) runner.register(upload) -download = RSyncTask( +download = RsyncTask( name='download', inputs=[ PasswordInput(name='passsword'), @@ -43,11 +46,20 @@ download = RSyncTask( ) ], is_remote_src=True, - is_remote_dst=False src='{{input.src}}', - dst='{{input.dst}}', + is_remote_dst=False, + dst='$_CONFIG_MAP_DIR/{{input.dst}}', ) runner.register(download) ``` +RsyncTask exposes several environments that you can use on your `src` and `dst` + +- `_CONFIG_HOST` +- `_CONFIG_PORT` +- `_CONFIG_SSH_KEY` +- `_CONFIG_USER` +- `_CONFIG_PASSWORD` +- `_CONFIG_MAP_` + 🔖 [Table of Contents](../../README.md) / [Concepts](../README.md) / [Tasks](README.md) diff --git a/docs/oops-i-did-it-again/using-the-same-variable-to-define-different-task.md b/docs/oops-i-did-it-again/using-the-same-variable-to-define-different-task.md index 3064e8aa..6762c52e 100644 --- a/docs/oops-i-did-it-again/using-the-same-variable-to-define-different-task.md +++ b/docs/oops-i-did-it-again/using-the-same-variable-to-define-different-task.md @@ -5,7 +5,35 @@ ```python from zrb import runner, CmdTask +prepare = CmdTask( + name='prepare-python-project', + cmd='pip install -R requirements.txt' +) +runner.register(prepare) +prepare = CmdTask( + name='prepare-node-project', + cmd='npm install' +) +runner.register(prepare) + +run_fastapi = CmdTask( + name='run-fastapi', + cmd='uvicon main:app' + upstreams=[ + prepare, # <-- Here is the problem, `npm install`` or `pip install`? + ] +) +runner.register(run_fastapi) ``` +You can see that `prepare-python-project` and `prepare-node-project` are assigned to the same variable. + +Using that variable as upstream or checker will lead to a tricky situation. In our case, we want to perform `pip install` before starting Fast API. But since we re-assign the variable to `prepare-node-project`, we will got `npm install` instead. + +# Avoiding the Problem + +Beware of your variable name. Give your variable the same name as your task name. + + 🔖 [Table of Contents](../README.md) / [Oops, I Did It Again](README.md) diff --git a/docs/quirks.md b/docs/quirks.md index c0a8d075..c8d50375 100644 --- a/docs/quirks.md +++ b/docs/quirks.md @@ -18,6 +18,5 @@ - `env` will override each other, the last one takes greater priority - If you define a `DockerComposeTask`, it will automatically fill your environment with the ones you use in your docker-compose file. The environment defined that way will have a very low priority. They will be overridden by both `env_files` and `env`. - You cannot have an input named: `_task`, `_args` or `_execution_id` -- You cannot have an environment named `_execution_id` 🔖 [Table of Contents](README.md) diff --git a/docs/tutorials/copy-and-reuse-task.md b/docs/tutorials/copy-and-reuse-task.md index f50c1032..8c7a64f3 100644 --- a/docs/tutorials/copy-and-reuse-task.md +++ b/docs/tutorials/copy-and-reuse-task.md @@ -20,10 +20,10 @@ local_hello = hello.copy() # Update name, input, and env local_hello.set_name('hello-local') -local_hello.add_inputs( +local_hello.add_input( StrInput(name='name', description='Name', default='dunia') ) -local_hello.add_envs( +local_hello.add_env( Env(name='GREETINGS', os_name='', default='Halo') ) diff --git a/docs/tutorials/extending-cmd-task.md b/docs/tutorials/extending-cmd-task.md index d73333dd..1c78bbd0 100644 --- a/docs/tutorials/extending-cmd-task.md +++ b/docs/tutorials/extending-cmd-task.md @@ -56,18 +56,23 @@ class SlackPrintTask(CmdTask): self._slack_app_token = slack_app_token self._message = message - def run(self, *args: Any, **kwargs: Any): - # Inject environment variables - self.inject_env_map( - env_map={ - 'CHANNEL_ID': self.render_str(self._slack_channel_id), - 'TOKEN': self.render_str(self._slack_app_token), - 'MESSAGE': self.render_str(self._message) - } + def inject_envs(self): + self.add_envs( + Env( + name='CHANNEL_ID', os_name='', + default=self.render_str(self._slack_channel_id) + ), + Env( + name='TOKEN', os_name='', + default=self.render_str(self._slack_app_token) + ), + Env( + name='MESSAGE', os_name='', + default=self.render_str(self._message) + ) ) - return super().run(*args, **kwargs) - def _get_cmd_str(self, *args: Any, **kwargs: Any): + def get_cmd_script(self, *args: Any, **kwargs: Any): # contruct json payload and replace all `"` with `\\"` json_payload = jsons.dumps({ 'channel': '$CHANNEL_ID', diff --git a/pyproject.toml b/pyproject.toml index 954fe75f..24451fe5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "flit_core.buildapi" [project] name = "zrb" -version = "0.0.109" +version = "0.0.110" authors = [ { name="Go Frendi Gunawan", email="gofrendiasgard@gmail.com" }, ] diff --git a/src/zrb/action/runner.py b/src/zrb/action/runner.py index 4767ed4c..10663178 100644 --- a/src/zrb/action/runner.py +++ b/src/zrb/action/runner.py @@ -28,7 +28,7 @@ def __init__(self, env_prefix: str = ''): def register(self, task: AnyTask): task.set_has_cli_interface() - cmd_name = task.get_complete_cmd_name() + cmd_name = task.get_full_cmd_name() logger.debug(colored(f'Register task: {cmd_name}', attrs=['dark'])) self._tasks.append(task) logger.debug(colored(f'Task registered: {cmd_name}', attrs=['dark'])) @@ -77,7 +77,7 @@ def _get_cli_group(self, task_group: TaskGroup) -> click.Group: return group def _create_cli_command(self, task: AnyTask) -> click.Command: - task_inputs = task.get_all_inputs() + task_inputs = task._get_combined_inputs() task_cmd_name = task.get_cmd_name() task_description = task.get_description() task_function = task.to_function( diff --git a/src/zrb/builtin/git.py b/src/zrb/builtin/git.py index ae11eed1..0b83395d 100644 --- a/src/zrb/builtin/git.py +++ b/src/zrb/builtin/git.py @@ -1,7 +1,6 @@ from zrb.helper.typing import Any from zrb.builtin.group import git_group from zrb.task.decorator import python_task -from zrb.task.task import Task from zrb.task_input.str_input import StrInput from zrb.task_input.bool_input import BoolInput from zrb.runner import runner @@ -26,7 +25,7 @@ prompt='Commit hash/Tag', default='HEAD' ), - BoolInput( + BoolInput( name='include-new', description='include new files', prompt='Include new files', @@ -52,7 +51,6 @@ async def get_file_changes(*args: Any, **kwargs: Any): include_new = kwargs.get('include_new', True) include_removed = kwargs.get('include_removed', True) include_updated = kwargs.get('include_updated', True) - task: Task = kwargs['_task'] modified_file_states = get_modified_file_states(commit) modified_file_keys = [] output = [] diff --git a/src/zrb/helper/accessories/name.py b/src/zrb/helper/accessories/name.py index 07b5100b..96b8b86b 100644 --- a/src/zrb/helper/accessories/name.py +++ b/src/zrb/helper/accessories/name.py @@ -10,19 +10,29 @@ def get_random_name( digit_count: int = 4 ) -> str: prefixes = [ - "aurum", "argentum", "platinum", "mercurius", "sulfur", "sal", - "luna", "sol", "ferrum", "cuprum", "argent", "aurora", "citrin", - "coral", "diamond", "dragon", "emerald", "garnet", "jade", "onyx", - "opal", "pearl", "ruby", "sapphire", "topaz", "turquoise", "verde", - "zircon" + 'albedo', 'argent', 'argentum', 'aurora', 'aurum', 'azure', + 'basilisk', 'cerulean', 'chimeric', 'citrin', 'coral', 'crimson', + 'diamond', 'draco', 'dragon', 'emerald', 'ethereal', 'ferrum', + 'flammeus', 'garnet', 'glacial', 'glimmering', 'glistening', 'golden', + 'helios', 'igneous', 'imperial', 'jade', 'luminous', 'luna', 'lunar', + 'mystic', 'nephrite', 'nocturnal', 'obsidian', 'opal', 'pearl', + 'platinum', 'prismatic', 'ruby', 'sapphire', 'serpentine', 'silver', + 'sol', 'solar', 'spiritual', 'stellar', 'tempest', 'topaz', + 'turquoise', 'verde', 'vermillion', 'vitreous', 'zephyr', 'zircon' ] suffixes = [ - "philosophorum", "spiritus", "tinctura", "essentia", "elixir", - "praeparatum", "aether", "vitae", "lapis", "metallum", "aureum", - "caelestis", "chrysopoeia", "cosmicum", "deum", "draconis", - "elementorum", "hermetica", "illuminationis", "magnum", "mysticum", - "occultum", "omnipotentis", "philosophia", "praestantissimum", - "quintessentia", "regeneratio", "universalis" + 'aether', 'albedo', 'alchemy', 'arcana', 'aureum', 'aetheris', + 'anima', 'astralis', 'caelestis', 'chrysopoeia', 'cosmicum', + 'crystallum', 'deum', 'divinitas', 'draconis', 'elementorum', 'elixir', + 'essentia', 'eternis', 'ethereus', 'fatum', 'flamma', 'fulgur', + 'hermetica', 'ignis', 'illuminationis', 'imperium', 'incantatum', + 'infinitum', 'lapis', 'lux', 'magicae', 'magnum', 'materia', + 'metallum', 'mysticum', 'natura', 'occultum', 'omnipotentis', + 'opulentia', 'philosophia', 'philosophorum', 'praeparatum', + 'praestantissimum', 'prima', 'primordium', 'quintessentia', + 'regeneratio', 'ritualis', 'sanctum', 'spiritus', 'tenebris', + 'terra', 'tinctura', 'transmutationis', 'universalis', 'vapores', + 'venenum', 'veritas', 'vitae', 'volatus' ] prefix = random.choice(prefixes) suffix = random.choice(suffixes) diff --git a/src/zrb/helper/env_map/fetch.py b/src/zrb/helper/env_map/fetch.py index 3633d2bf..a3fd1c21 100644 --- a/src/zrb/helper/env_map/fetch.py +++ b/src/zrb/helper/env_map/fetch.py @@ -16,7 +16,7 @@ def fetch_env_map_from_group( sub_env_map: Mapping[str, str] = fetch_env_map_from_group( env_map, sub_group ) - env_map = cascade_env_map(env_map, sub_env_map) + env_map = _cascade_env_map(env_map, sub_env_map) return env_map @@ -25,33 +25,33 @@ def fetch_env_map_from_task( env_map: Mapping[str, str], task: AnyTask ): task_env_map: Mapping[str, str] = {} - for env_file in task.get_env_files(): + for env_file in task._get_env_files(): envs = env_file.get_envs() - task_env_map = add_envs_to_env_map(task_env_map, envs) - task_env_map = add_envs_to_env_map(task_env_map, task._envs) - env_map = cascade_env_map(env_map, task_env_map) - for upstream in task.get_upstreams(): + task_env_map = _add_envs_to_env_map(task_env_map, envs) + task_env_map = _add_envs_to_env_map(task_env_map, task._envs) + env_map = _cascade_env_map(env_map, task_env_map) + for upstream in task._get_upstreams(): task_env_map = fetch_env_map_from_task(env_map, upstream) - for checker in task.get_checkers(): + for checker in task._get_checkers(): task_env_map = fetch_env_map_from_task(env_map, checker) return env_map @typechecked -def add_envs_to_env_map( +def _add_envs_to_env_map( env_map: Mapping[str, str], envs: List[Env] ) -> Mapping[str, str]: for env in envs: if env.os_name == '': continue - env_name = get_env_name(env) - env_default = get_env_default(env) + env_name = _get_env_name(env) + env_default = _get_env_default(env) env_map[env_name] = env_default return env_map @typechecked -def cascade_env_map( +def _cascade_env_map( env_map: Mapping[str, str], other_env_map: Mapping[str, str] ) -> Mapping[str, str]: @@ -63,14 +63,14 @@ def cascade_env_map( @typechecked -def get_env_name(env: Env) -> str: +def _get_env_name(env: Env) -> str: if env.os_name is None: return env.name return env.os_name @typechecked -def get_env_default(env: Env) -> str: +def _get_env_default(env: Env) -> str: if is_probably_jinja(env.default): return '' return env.default diff --git a/src/zrb/shell-scripts/rsync-util.sh b/src/zrb/shell-scripts/rsync-util.sh index f050ba10..9f847fc3 100644 --- a/src/zrb/shell-scripts/rsync-util.sh +++ b/src/zrb/shell-scripts/rsync-util.sh @@ -2,11 +2,11 @@ set -e auth_rsync(){ if [ "$_CONFIG_SSH_KEY" != "" ] then - rsync -avz -e "ssh -i $_CONFIG_SSH_KEY -p $_CONFIG_PORT" $@ + rsync --mkpath -avz -e "ssh -i $_CONFIG_SSH_KEY -p $_CONFIG_PORT" $@ elif [ "$_CONFIG_PASSWORD" != "" ] then - sshpass -p "$_CONFIG_PASSWORD" rsync -avz -e "ssh -p $_CONFIG_PORT" $@ + sshpass -p "$_CONFIG_PASSWORD" rsync --mkpath -avz -e "ssh -p $_CONFIG_PORT" $@ else - rsync -avz -e "ssh -p $_CONFIG_PORT" $@ + rsync --mkpath -avz -e "ssh -p $_CONFIG_PORT" $@ fi } \ No newline at end of file diff --git a/src/zrb/task/any_task.py b/src/zrb/task/any_task.py index 78bb19a6..14c482ae 100644 --- a/src/zrb/task/any_task.py +++ b/src/zrb/task/any_task.py @@ -65,23 +65,35 @@ def to_function( pass @abstractmethod - def add_upstreams(self, *upstreams: TAnyTask): + def add_upstream(self, *upstreams: TAnyTask): pass @abstractmethod - def add_inputs(self, *inputs: AnyInput): + def insert_input(self, *inputs: AnyInput): pass @abstractmethod - def add_envs(self, *envs: Env): + def add_input(self, *inputs: AnyInput): pass @abstractmethod - def add_env_files(self, *env_files: EnvFile): + def insert_env(self, *envs: Env): pass @abstractmethod - def set_execution_id(self, execution_id: str): + def add_env(self, *envs: Env): + pass + + @abstractmethod + def insert_env_file(self, *env_files: EnvFile): + pass + + @abstractmethod + def add_env_file(self, *env_files: EnvFile): + pass + + @abstractmethod + def _set_execution_id(self, execution_id: str): pass @abstractmethod @@ -139,31 +151,51 @@ def get_cmd_name(self) -> str: pass @abstractmethod - def get_complete_cmd_name(self) -> str: + def get_full_cmd_name(self) -> str: + pass + + @abstractmethod + def inject_env_files(self): + pass + + @abstractmethod + def _get_env_files(self) -> List[EnvFile]: + pass + + @abstractmethod + def inject_envs(self): + pass + + @abstractmethod + def _get_envs(self) -> List[Env]: pass @abstractmethod - def get_env_files(self) -> List[EnvFile]: + def inject_inputs(self): pass @abstractmethod - def get_envs(self) -> List[Env]: + def _get_inputs(self) -> List[AnyInput]: pass @abstractmethod - def get_inputs(self) -> List[AnyInput]: + def inject_checkers(self): pass @abstractmethod - def get_checkers(self) -> Iterable[TAnyTask]: + def _get_checkers(self) -> Iterable[TAnyTask]: pass @abstractmethod - def get_upstreams(self) -> Iterable[TAnyTask]: + def inject_upstreams(self): pass @abstractmethod - def get_all_inputs(self) -> Iterable[AnyInput]: + def _get_upstreams(self) -> Iterable[TAnyTask]: + pass + + @abstractmethod + def _get_combined_inputs(self) -> Iterable[AnyInput]: pass @abstractmethod @@ -206,12 +238,6 @@ def get_input_map(self) -> Mapping[str, Any]: def get_env_map(self) -> Mapping[str, Any]: pass - @abstractmethod - def inject_env_map( - self, env_map: Mapping[str, str], override: bool = False - ): - pass - @abstractmethod def render_any( self, val: Any, data: Optional[Mapping[str, Any]] = None diff --git a/src/zrb/task/base_remote_cmd_task.py b/src/zrb/task/base_remote_cmd_task.py index f1e864ed..af5c0977 100644 --- a/src/zrb/task/base_remote_cmd_task.py +++ b/src/zrb/task/base_remote_cmd_task.py @@ -2,6 +2,7 @@ Any, Callable, Iterable, Mapping, Optional, Union, TypeVar ) from zrb.helper.typecheck import typechecked +from zrb.helper.util import to_snake_case from zrb.task.any_task import AnyTask from zrb.task.any_task_event_handler import ( OnTriggered, OnWaiting, OnSkipped, OnStarted, OnReady, OnRetry, OnFailed @@ -34,13 +35,15 @@ def __init__( user: str = '', password: str = '', ssh_key: str = '', - port: int = 22 + port: int = 22, + config_map: Optional[Mapping[str, str]] = None ): self.host = host self.user = user self.password = password self.ssh_key = ssh_key self.port = port + self.config_map = {} if config_map is None else config_map @typechecked @@ -123,28 +126,50 @@ def __init__( def copy(self) -> TSingleBaseRemoteCmdTask: return copy.deepcopy(self) - def _get_shell_env_map(self) -> Mapping[str, Any]: - env_map = super()._get_shell_env_map() - env_map['_CONFIG_HOST'] = self.render_str(self._remote_config.host) - env_map['_CONFIG_PORT'] = str(self.render_int( - self._remote_config.port) - ) - env_map['_CONFIG_SSH_KEY'] = self.render_str( - self._remote_config.ssh_key - ) - env_map['_CONFIG_USER'] = self.render_str(self._remote_config.user) - env_map['_CONFIG_PASSWORD'] = self.render_str( - self._remote_config.password + def inject_envs(self): + super().inject_envs() + # add remote config properties as env + self.add_env( + Env( + name='_CONFIG_HOST', os_name='', + default=self.render_str(self._remote_config.host) + ), + Env( + name='_CONFIG_PORT', os_name='', + default=str(self.render_int(self._remote_config.port)) + ), + Env( + name='_CONFIG_SSH_KEY', os_name='', + default=self.render_str(self._remote_config.ssh_key) + ), + Env( + name='_CONFIG_USER', os_name='', + default=self.render_str(self._remote_config.user) + ), + Env( + name='_CONFIG_PASSWORD', os_name='', + default=self.render_str(self._remote_config.password) + ), ) - return env_map + for key, val in self._remote_config.config_map.items(): + upper_snake_key = to_snake_case(key).upper() + rendered_val = self.render_str(val) + # add remote config map as env + self.add_env( + Env( + name='_CONFIG_MAP_' + upper_snake_key, + os_name='', + default=rendered_val + ) + ) - def _get_cmd_str(self, *args: Any, **kwargs: Any) -> str: + def get_cmd_script(self, *args: Any, **kwargs: Any) -> str: cmd_str = '\n'.join([ - self._create_cmd_str( + self._create_cmd_script( self._pre_cmd_path, self._pre_cmd, *args, **kwargs ), - super()._get_cmd_str(*args, **kwargs), - self._create_cmd_str( + super().get_cmd_script(*args, **kwargs), + self._create_cmd_script( self._post_cmd_path, self._post_cmd, *args, **kwargs ), ]) diff --git a/src/zrb/task/base_task.py b/src/zrb/task/base_task.py index 72fc9598..8730bfa6 100644 --- a/src/zrb/task/base_task.py +++ b/src/zrb/task/base_task.py @@ -110,30 +110,28 @@ def __init__( self._is_execution_started: bool = False self._args: List[Any] = [] self._kwargs: Mapping[str, Any] = {} - self._allow_add_upstreams: bool = True def copy(self) -> AnyTask: return copy.deepcopy(self) - def get_all_inputs(self) -> Iterable[AnyInput]: + def _get_combined_inputs(self) -> Iterable[AnyInput]: '''' Getting all inputs of this task and all its upstream, non-duplicated. ''' if self._all_inputs is not None: return self._all_inputs - self._allow_add_upstreams = False - self._allow_add_inputs = False self._all_inputs: List[AnyInput] = [] existing_input_names: Mapping[str, bool] = {} # Add task inputs - for input_index, first_occurence_task_input in enumerate(self._inputs): + inputs = self._get_inputs() + for input_index, first_occurence_task_input in enumerate(inputs): input_name = first_occurence_task_input.get_name() if input_name in existing_input_names: continue # Look for all input with the same name in the current task task_inputs = [ candidate - for candidate in self._inputs[input_index:] + for candidate in inputs[input_index:] if candidate.get_name() == input_name ] # Get the last input, and add it to _all_inputs @@ -141,13 +139,15 @@ def get_all_inputs(self) -> Iterable[AnyInput]: self._all_inputs.append(task_input) existing_input_names[input_name] = True # Add upstream inputs - for upstream in self._upstreams: - upstream_inputs = upstream.get_all_inputs() + for upstream in self._get_upstreams(): + upstream_inputs = upstream._get_combined_inputs() for upstream_input in upstream_inputs: if upstream_input.get_name() in existing_input_names: continue self._all_inputs.append(upstream_input) existing_input_names[upstream_input.get_name()] = True + self._allow_add_upstreams = False + self._allow_add_inputs = False return self._all_inputs def to_function( @@ -167,21 +167,6 @@ def function(*args: Any, **kwargs: Any) -> Any: )) return function - def add_upstreams(self, *upstreams: AnyTask): - if not self._allow_add_upstreams: - raise Exception(f'Cannot add upstreams on `{self._name}`') - self._upstreams += upstreams - - def inject_env_map( - self, env_map: Mapping[str, str], override: bool = False - ): - ''' - Set new values for current task's env map - ''' - for key, val in env_map.items(): - if override or key not in self.get_env_map(): - self._set_env_map(key, val) - async def run(self, *args: Any, **kwargs: Any) -> Any: ''' Do task execution @@ -259,28 +244,28 @@ async def _set_local_keyval( return True self._is_keyval_set = True self.log_info('Set input map') - for task_input in self.get_all_inputs(): + for task_input in self._get_combined_inputs(): input_name = self._get_normalized_input_key(task_input.get_name()) input_value = self.render_any( kwargs.get(input_name, task_input.get_default()) ) self._set_input_map(input_name, input_value) + self._set_input_map('_execution_id', self._execution_id) self.log_debug( 'Input map:\n' + map_to_str(self.get_input_map(), item_prefix=' ') ) self.log_info('Merging task envs, task env files, and native envs') - for env_name, env in self._get_all_envs().items(): + for env_name, env in self._get_combined_env().items(): env_value = env.get(env_prefix) if env.renderable: env_value = self.render_any(env_value) self._set_env_map(env_name, env_value) + self._set_env_map('_ZRB_EXECUTION_ID', self._execution_id) self.log_debug( 'Env map:\n' + map_to_str(self.get_env_map(), item_prefix=' ') ) - def _get_all_envs(self) -> Mapping[str, Env]: - self._allow_add_envs = False - self._allow_add_env_files = False + def _get_combined_env(self) -> Mapping[str, Env]: all_envs: Mapping[str, Env] = {} for env_name in os.environ: if env_name in RESERVED_ENV_NAMES: @@ -288,11 +273,13 @@ def _get_all_envs(self) -> Mapping[str, Env]: all_envs[env_name] = Env( name=env_name, os_name=env_name, renderable=False ) - for env_file in self._env_files: + for env_file in self._get_env_files(): for env in env_file.get_envs(): all_envs[env.name] = env - for env in self._envs: + for env in self._get_envs(): all_envs[env.name] = env + self._allow_add_envs = False + self._allow_add_env_files = False return all_envs def _get_normalized_input_key(self, key: str) -> str: @@ -310,7 +297,9 @@ async def _run_and_check_all( try: self._start_timer() if self.get_execution_id() == '': - self.set_execution_id(get_random_name()) + self._set_execution_id( + get_random_name(add_random_digit=True, digit_count=5) + ) self.log_info('Set input and env map') await self._set_keyval(kwargs=kwargs, env_prefix=env_prefix) self.log_info('Set run kwargs') @@ -348,9 +337,10 @@ def _print_result(self, result: Any): return if self._return_upstream_result: # if _return_upstream_result, result is list (see: self._run_all) + upstreams = self._get_upstreams() upstream_results = list(result) for upstream_index, upstream_result in enumerate(upstream_results): - self._upstreams[upstream_index]._print_result(upstream_result) + upstreams[upstream_index]._print_result(upstream_result) return self.print_result(result) @@ -363,11 +353,6 @@ def print_result(self, result: Any): if you want to show the result differently. ''' print(result) - - def set_execution_id(self, execution_id: str): - super().set_execution_id(execution_id) - self._set_env_map('_ZRB_EXECUTION_ID', execution_id) - self._set_input_map('_execution_id', execution_id) async def _loop_check(self, show_done: bool = False) -> bool: self.log_info('Start readiness checking') @@ -393,14 +378,14 @@ def _show_env_prefix(self): def _show_run_command(self): params: List[str] = [double_quote(arg) for arg in self._args] - for task_input in self.get_all_inputs(): + for task_input in self._get_combined_inputs(): if task_input.is_hidden(): continue key = task_input.get_name() kwarg_key = self._get_normalized_input_key(key) quoted_value = double_quote(str(self._kwargs[kwarg_key])) params.append(f'--{key} {quoted_value}') - run_cmd = self.get_complete_cmd_name() + run_cmd = self.get_full_cmd_name() run_cmd_with_param = run_cmd if len(params) > 0: param_str = ' '.join(params) @@ -437,7 +422,7 @@ async def _check(self) -> bool: await asyncio.sleep(0.1) check_coroutines: Iterable[asyncio.Task] = [] for checker_task in self._checkers: - checker_task.set_execution_id(self.get_execution_id()) + checker_task._set_execution_id(self.get_execution_id()) check_coroutines.append( asyncio.create_task(checker_task._run_all()) ) @@ -449,8 +434,8 @@ async def _run_all(self, *args: Any, **kwargs: Any) -> Any: coroutines: Iterable[asyncio.Task] = [] # Add upstream tasks to processes self._allow_add_upstreams = False - for upstream_task in self._upstreams: - upstream_task.set_execution_id(self.get_execution_id()) + for upstream_task in self._get_upstreams(): + upstream_task._set_execution_id(self.get_execution_id()) coroutines.append(asyncio.create_task( upstream_task._run_all(**kwargs) )) @@ -472,7 +457,7 @@ async def _cached_run(self, *args: Any, **kwargs: Any) -> Any: # get upstream checker upstream_check_processes: Iterable[asyncio.Task] = [] self._allow_add_upstreams = False - for upstream_task in self._upstreams: + for upstream_task in self._get_upstreams(): upstream_check_processes.append(asyncio.create_task( upstream_task._loop_check() )) @@ -516,7 +501,7 @@ async def _check_should_execute(self, *args: Any, **kwargs: Any) -> bool: async def _set_keyval(self, kwargs: Mapping[str, Any], env_prefix: str): # if input is not in input_map, add default values - for task_input in self.get_all_inputs(): + for task_input in self._get_combined_inputs(): key = self._get_normalized_input_key(task_input.get_name()) if key in kwargs: continue @@ -527,20 +512,20 @@ async def _set_keyval(self, kwargs: Mapping[str, Any], env_prefix: str): new_kwargs = copy.deepcopy(kwargs) new_kwargs.update(self.get_input_map()) upstream_coroutines = [] - # set uplstreams keyval + # set upstreams keyval self._allow_add_upstreams = False - for upstream_task in self._upstreams: + for upstream_task in self._get_upstreams(): upstream_coroutines.append(asyncio.create_task( upstream_task._set_keyval( kwargs=new_kwargs, env_prefix=env_prefix ) )) # set checker keyval - local_env_map = self.get_env_map() + # local_env_map = self.get_env_map() checker_coroutines = [] for checker_task in self._checkers: - checker_task._inputs += self._inputs - checker_task.inject_env_map(local_env_map, override=True) + checker_task.add_input(*self._get_inputs()) + checker_task.add_env(*self._get_envs()) checker_coroutines.append(asyncio.create_task( checker_task._set_keyval( kwargs=new_kwargs, env_prefix=env_prefix diff --git a/src/zrb/task/base_task_composite.py b/src/zrb/task/base_task_composite.py index a5510dd2..80a069cf 100644 --- a/src/zrb/task/base_task_composite.py +++ b/src/zrb/task/base_task_composite.py @@ -66,10 +66,17 @@ def __init__( self._allow_add_envs = True self._allow_add_env_files = True self._allow_add_inputs = True + self._allow_add_upstreams: bool = True + self._has_already_inject_env_files: bool = False + self._has_already_inject_envs: bool = False + self._has_already_inject_inputs: bool = False + self._has_already_inject_checkers: bool = False + self._has_already_inject_upstreams: bool = False self._execution_id = '' - def set_execution_id(self, execution_id: str): - self._execution_id = execution_id + def _set_execution_id(self, execution_id: str): + if self._execution_id != '': + self._execution_id = execution_id def set_name(self, new_name: str): if self._description == self._name: @@ -99,20 +106,45 @@ def set_retry_interval(self, new_retry_interval: Union[float, int]): def set_checking_interval(self, new_checking_interval: Union[float, int]): self._checking_interval = new_checking_interval - def add_inputs(self, *inputs: AnyInput): + def insert_input(self, *inputs: AnyInput): if not self._allow_add_inputs: - raise Exception(f'Cannot add inputs on `{self._name}`') - self._inputs += inputs + raise Exception(f'Cannot insert inputs for `{self._name}`') + self._inputs = list(inputs) + list(self._inputs) - def add_envs(self, *envs: Env): + def add_input(self, *inputs: AnyInput): + if not self._allow_add_inputs: + raise Exception(f'Cannot add inputs for `{self._name}`') + self._inputs = list(self._inputs) + list(inputs) + + def insert_env(self, *envs: Env): + if not self._allow_add_envs: + raise Exception(f'Cannot insert envs to `{self._name}`') + self._envs = list(envs) + list(self._envs) + + def add_env(self, *envs: Env): if not self._allow_add_envs: - raise Exception(f'Cannot add envs on `{self._name}`') - self._envs += envs + raise Exception(f'Cannot add envs to `{self._name}`') + self._envs = list(self._envs) + list(envs) - def add_env_files(self, *env_files: EnvFile): + def insert_env_file(self, *env_files: EnvFile): if not self._allow_add_env_files: - raise Exception(f'Cannot add env_files on `{self._name}`') - self._env_files += env_files + raise Exception(f'Cannot insert env_files to `{self._name}`') + self._env_files = list(env_files) + list(self._env_files) + + def add_env_file(self, *env_files: EnvFile): + if not self._allow_add_env_files: + raise Exception(f'Cannot add env_files to `{self._name}`') + self._env_files = list(self._env_files) + list(env_files) + + def insert_upstream(self, *upstreams: AnyTask): + if not self._allow_add_upstreams: + raise Exception(f'Cannot insert upstreams to `{self._name}`') + self._upstreams = list(upstreams) + list(self._upstreams) + + def add_upstream(self, *upstreams: AnyTask): + if not self._allow_add_upstreams: + raise Exception(f'Cannot add upstreams to `{self._name}`') + self._upstreams = list(self._upstreams) + list(upstreams) def get_execution_id(self) -> str: return self._execution_id @@ -123,20 +155,50 @@ def get_icon(self) -> str: def get_color(self) -> str: return self._color - def get_env_files(self) -> List[EnvFile]: + def inject_env_files(self): + pass + + def _get_env_files(self) -> List[EnvFile]: + if not self._has_already_inject_env_files: + self.inject_env_files() + self._has_already_inject_env_files = True return self._env_files - def get_envs(self) -> List[Env]: - return self._envs + def inject_envs(self): + pass + + def _get_envs(self) -> List[Env]: + if not self._has_already_inject_envs: + self.inject_envs() + self._has_already_inject_envs = True + return list(self._envs) + + def inject_inputs(self): + pass + + def _get_inputs(self) -> List[AnyInput]: + if not self._has_already_inject_inputs: + self.inject_inputs() + self._has_already_inject_inputs = True + return list(self._inputs) + + def inject_checkers(self): + pass - def get_inputs(self) -> List[AnyInput]: - return self._inputs + def _get_checkers(self) -> List[AnyTask]: + if not self._has_already_inject_checkers: + self.inject_checkers() + self._has_already_inject_checkers = True + return list(self._checkers) - def get_checkers(self) -> Iterable[AnyTask]: - return self._checkers + def inject_upstreams(self): + pass - def get_upstreams(self) -> Iterable[AnyTask]: - return self._upstreams + def _get_upstreams(self) -> List[AnyTask]: + if not self._has_already_inject_upstreams: + self.inject_upstreams() + self._has_already_inject_upstreams = True + return list(self._upstreams) def get_description(self) -> str: return self._description @@ -361,7 +423,7 @@ def __init__( run: Optional[Callable[..., Any]] = None, should_execute: Union[bool, str, Callable[..., bool]] = True ): - self._filled_complete_name: Optional[str] = None + self._rjust_full_cmd_name: Optional[str] = None self._has_cli_interface = False self._complete_name: Optional[str] = None CommonTaskModel.__init__( @@ -448,13 +510,13 @@ def _get_colored(self, text: str) -> str: def _get_print_prefix(self) -> str: common_prefix = self._get_common_prefix(show_time=show_time) icon = self.get_icon() - truncated_name = self._get_filled_complete_name() + truncated_name = self._get_rjust_full_cmd_name() return f'{common_prefix} {icon} {truncated_name}' def _get_log_prefix(self) -> str: common_prefix = self._get_common_prefix(show_time=False) icon = self.get_icon() - filled_name = self._get_filled_complete_name() + filled_name = self._get_rjust_full_cmd_name() return f'{common_prefix} {icon} {filled_name}' def _get_common_prefix(self, show_time: bool) -> str: @@ -466,14 +528,14 @@ def _get_common_prefix(self, show_time: bool) -> str: return f'◷ {now} ❁ {pid} → {attempt}/{max_attempt}' return f'❁ {pid} → {attempt}/{max_attempt}' - def _get_filled_complete_name(self) -> str: - if self._filled_complete_name is not None: - return self._filled_complete_name - complete_name = self.get_complete_cmd_name() - self._filled_complete_name = complete_name.rjust(LOG_NAME_LENGTH, ' ') - return self._filled_complete_name + def _get_rjust_full_cmd_name(self) -> str: + if self._rjust_full_cmd_name is not None: + return self._rjust_full_cmd_name + complete_name = self.get_full_cmd_name() + self._rjust_full_cmd_name = complete_name.rjust(LOG_NAME_LENGTH, ' ') + return self._rjust_full_cmd_name - def get_complete_cmd_name(self) -> str: + def get_full_cmd_name(self) -> str: if self._complete_name is not None: return self._complete_name executable_prefix = '' diff --git a/src/zrb/task/cmd_task.py b/src/zrb/task/cmd_task.py index 756e6dd2..417988af 100644 --- a/src/zrb/task/cmd_task.py +++ b/src/zrb/task/cmd_task.py @@ -1,5 +1,5 @@ from zrb.helper.typing import ( - Any, Callable, Iterable, List, Mapping, Optional, Union, TypeVar + Any, Callable, Iterable, List, Optional, Union, TypeVar ) from zrb.helper.typecheck import typechecked from zrb.task.any_task import AnyTask @@ -182,18 +182,17 @@ def print_result(self, result: CmdResult): return print(result.output) - def _get_shell_env_map(self) -> Mapping[str, Any]: - env_map = self.get_env_map() + def inject_envs(self): + super().inject_envs() input_map = self.get_input_map() for input_name, input_value in input_map.items(): - upper_input_name = '_INPUT_' + input_name.upper() - if upper_input_name not in env_map: - env_map[upper_input_name] = f'{input_value}' - return env_map + env_name = '_INPUT_' + input_name.upper() + self.add_env( + Env(name=env_name, os_name='', default=str(input_value)) + ) async def run(self, *args: Any, **kwargs: Any) -> CmdResult: - cmd = self._get_cmd_str(*args, **kwargs) - env_map = self._get_shell_env_map() + cmd = self.get_cmd_script(*args, **kwargs) self.print_out_dark('Run script: ' + self._get_multiline_repr(cmd)) self.print_out_dark('Working directory: ' + self._cwd) self._output_buffer = [] @@ -203,7 +202,7 @@ async def run(self, *args: Any, **kwargs: Any) -> CmdResult: cwd=self._cwd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, - env=env_map, + env=self.get_env_map(), shell=True, executable=self._executable, close_fds=True, @@ -314,21 +313,25 @@ async def _wait_process(self, process: asyncio.subprocess.Process): await stdout_log_process await stderr_log_process - def _get_cmd_str(self, *args: Any, **kwargs: Any) -> str: - return self._create_cmd_str(self._cmd_path, self._cmd, *args, **kwargs) + def get_cmd_script(self, *args: Any, **kwargs: Any) -> str: + return self._create_cmd_script( + self._cmd_path, self._cmd, *args, **kwargs + ) - def _create_cmd_str( + def _create_cmd_script( self, cmd_path: CmdVal, cmd: CmdVal, *args: Any, **kwargs: Any ) -> str: if not isinstance(cmd_path, str) or cmd_path != '': if callable(cmd_path): - return self._render_cmd_path_str(cmd_path(*args, **kwargs)) - return self._render_cmd_path_str(cmd_path) + return self._get_rendered_cmd_path(cmd_path(*args, **kwargs)) + return self._get_rendered_cmd_path(cmd_path) if callable(cmd): - return self._render_cmd_str(cmd(*args, **kwargs)) - return self._render_cmd_str(cmd) + return self._get_rendered_cmd(cmd(*args, **kwargs)) + return self._get_rendered_cmd(cmd) - def _render_cmd_path_str(self, cmd_path: Union[str, Iterable[str]]) -> str: + def _get_rendered_cmd_path( + self, cmd_path: Union[str, Iterable[str]] + ) -> str: if isinstance(cmd_path, str): return self.render_file(cmd_path) return '\n'.join([ @@ -336,7 +339,7 @@ def _render_cmd_path_str(self, cmd_path: Union[str, Iterable[str]]) -> str: for cmd_path_str in cmd_path ]) - def _render_cmd_str(self, cmd: Union[str, Iterable[str]]) -> str: + def _get_rendered_cmd(self, cmd: Union[str, Iterable[str]]) -> str: if isinstance(cmd, str): return self.render_str(cmd) return self.render_str('\n'.join(list(cmd))) diff --git a/src/zrb/task/docker_compose_task.py b/src/zrb/task/docker_compose_task.py index 117683df..f70d3060 100644 --- a/src/zrb/task/docker_compose_task.py +++ b/src/zrb/task/docker_compose_task.py @@ -153,7 +153,8 @@ def __init__( ) # Flag to make mark whether service config and compose environments # has been added to this task's envs and env_files - self._is_additional_env_added = False + self._is_compose_additional_env_added = False + self._is_compose_additional_env_file_added = False def copy(self) -> TDockerComposeTask: return super().copy() @@ -166,45 +167,30 @@ async def run(self, *args, **kwargs: Any) -> CmdResult: os.remove(self._compose_runtime_file) return result - def _get_all_envs(self) -> Mapping[str, Env]: - ''' - This method override BaseTask's _get_all_envs. - Whenever _get_all_envs is called, we want to make sure that: - - Service config's envs and env_files are included - - Any environment defined in docker compose file is also included - ''' - if self._is_additional_env_added: - return super()._get_all_envs() - self._is_additional_env_added = True - # define additional envs and additonal env_files - additional_envs: List[Env] = [] - additional_env_files: List[EnvFile] = [] - # populate additional envs and additional env_files - # with service configs + def inject_envs(self): + super().inject_envs() + # inject envs from service_configs for _, service_config in self._compose_service_configs.items(): - additional_env_files += service_config.get_env_files() - additional_envs += service_config.get_envs() - # populate additional envs and additional env_files with - # compose envs - data = read_compose_file(self._compose_template_file) - env_map = fetch_compose_file_env_map(data) + self.insert_env(*service_config.get_envs()) + # inject envs from docker compose file + compose_data = read_compose_file(self._compose_template_file) + env_map = fetch_compose_file_env_map(compose_data) added_env_map: Mapping[str, bool] = {} for key, value in env_map.items(): # Need to get this everytime because we only want # the first compose file env value for a certain key if key in RESERVED_ENV_NAMES or key in added_env_map: continue + added_env_map[key] = True os_name = key if self._compose_env_prefix != '': os_name = f'{self._compose_env_prefix}_{os_name}' - compose_env = Env(name=key, os_name=os_name, default=value) - additional_envs.append(compose_env) - added_env_map[key] = True - # Add additional envs and addition env files to this task - self._envs = additional_envs + list(self._envs) - self._env_files = additional_env_files + list(self._env_files) - # get all envs - return super()._get_all_envs() + self.insert_env(Env(name=key, os_name=os_name, default=value)) + + def inject_env_files(self): + # inject env_files from service_configs + for _, service_config in self._compose_service_configs.items(): + self.insert_env_file(*service_config.get_env_files()) def _generate_compose_runtime_file(self): compose_data = read_compose_file(self._compose_template_file) @@ -311,8 +297,8 @@ def _get_compose_template_file(self, compose_file: Optional[str]) -> str: return os.path.join(self._cwd, compose_file) raise Exception(f'Invalid compose file: {compose_file}') - def _get_cmd_str(self, *args: Any, **kwargs: Any) -> str: - setup_cmd_str = self._create_cmd_str( + def get_cmd_script(self, *args: Any, **kwargs: Any) -> str: + setup_cmd_str = self._create_cmd_script( self._setup_cmd_path, self._setup_cmd, *args, **kwargs ) command_options = dict(self._compose_options) diff --git a/src/zrb/task/flow_task.py b/src/zrb/task/flow_task.py index 5dd138c5..601f2e2b 100644 --- a/src/zrb/task/flow_task.py +++ b/src/zrb/task/flow_task.py @@ -107,10 +107,10 @@ def _get_embeded_tasks( embeded_tasks: List[AnyTask] = [] for task in tasks: embeded_task = task.copy() - embeded_task.add_upstreams(*upstreams) - embeded_task.add_envs(*envs) - embeded_task.add_env_files(*env_files) - embeded_task.add_inputs(*inputs) + embeded_task.add_upstream(*upstreams) + embeded_task.add_env(*envs) + embeded_task.add_env_file(*env_files) + embeded_task.add_input(*inputs) embeded_tasks.append(embeded_task) return embeded_tasks diff --git a/zrb_init.py b/zrb_init.py index 0c7d5b28..7b04e760 100644 --- a/zrb_init.py +++ b/zrb_init.py @@ -99,7 +99,7 @@ ], ) skippable_build: CmdTask = build.copy() -skippable_build.add_inputs(build_zrb_input) +skippable_build.add_input(build_zrb_input) skippable_build.set_should_execute('{{ input.build_zrb}}') runner.register(build) @@ -273,7 +273,7 @@ ] ) skippable_install_symlink: CmdTask = install_symlink.copy() -skippable_install_symlink.add_inputs( +skippable_install_symlink.add_input( build_zrb_input, install_symlink_input ) @@ -342,7 +342,7 @@ preexec_fn=None ) skippable_create_playground: CmdTask = create_playground.copy() -skippable_create_playground.add_inputs( +skippable_create_playground.add_input( build_zrb_input, install_symlink_input, create_playground_input