diff --git a/metaflow/plugins/aip/aip_metaflow_step.py b/metaflow/plugins/aip/aip_metaflow_step.py index de1ea1b04e4..77b38df7ec4 100644 --- a/metaflow/plugins/aip/aip_metaflow_step.py +++ b/metaflow/plugins/aip/aip_metaflow_step.py @@ -25,6 +25,7 @@ AIP_JOIN_METAFLOW_S3OP_NUM_WORKERS, ) from metaflow.plugins.cards.card_client import get_cards, Card +from metaflow.plugins.cards.exception import CardNotPresentException from ... import R, metaflow_version @@ -45,11 +46,22 @@ def _write_card_artifacts( task_id_template: str = f"{task_id}.{passed_in_split_indexes}".strip(".") pathspec = f"{flow_name}/{run_id}/{step_name}/{task_id_template}" - try: - cards: List[Card] = list(get_cards(pathspec)) - except Exception as e: - # Workflow should still succeed even if cards fail to render - logging.exception(f"Failed to get card artifacts for {pathspec}: {e}") + retry_limit = 3 + cards: List[Card] = [] + for attempt in range(retry_limit): + try: + cards: List[Card] = list(get_cards(pathspec)) + except: + if attempt == retry_limit - 1: # Last attempt failed + logging.exception( + f"Failed to get cards from Metaflow backend for pathspec {pathspec}" + "Please view cards through Metaflow UI, or refer to https://docs.metaflow.org/metaflow/visualizing-results/effortless-task-inspection-with-default-cards#accessing-cards-via-an-api" + ) + continue + break + + if not cards: + return # sort such that the default card is first sorted_cards = sorted( @@ -63,7 +75,10 @@ def _write_card_artifacts( with open(file_name, "w") as card_file: card_file.write(card.get()) except Exception: - logging.exception(f"Failed to write card {index} of type {card.type}") + logging.exception( + f"Failed to write card {index} of type {card.type} to Argo artifact output." + "Please view cards through Metaflow UI, or refer to https://docs.metaflow.org/metaflow/visualizing-results/effortless-task-inspection-with-default-cards#accessing-cards-via-an-api" + ) raise @@ -143,12 +158,12 @@ def _step_cli( "--max-value-size=0", input_paths, ] - param_cmd: str = "if ! %s >/dev/null 2>/dev/null; then %s && %s; fi" % ( + cmd: str = "if ! %s >/dev/null 2>/dev/null; then %s && %s; fi" % ( " ".join(exists), export_params, " ".join(params), ) - cmds.append(param_cmd) + cmds.append(cmd) top_level: List[str] = [ "--quiet", @@ -371,7 +386,7 @@ def aip_metaflow_step( flow_name, is_interruptible, ) - step_cmd: str = cmd_template.format( + cmd: str = cmd_template.format( run_id=metaflow_run_id, passed_in_split_indexes=passed_in_split_indexes, ) @@ -406,13 +421,13 @@ def aip_metaflow_step( # TODO: Map username to KFP specific user/profile/namespace # Running Metaflow runtime (runs user code, handles state) with Popen( - step_cmd, shell=True, universal_newlines=True, executable="/bin/bash", env=env + cmd, shell=True, universal_newlines=True, executable="/bin/bash", env=env ) as process: pass if process.returncode != 0: logging.info(f"---- Following command returned: {process.returncode}") - logging.info(step_cmd.replace(" && ", "\n")) + logging.info(cmd.replace(" && ", "\n")) logging.info("----") raise Exception("Returned: %s" % process.returncode)