diff --git a/ext/scheduler/airflow/__lib.py b/ext/scheduler/airflow/__lib.py index 2b5462626a..d0050a5cce 100644 --- a/ext/scheduler/airflow/__lib.py +++ b/ext/scheduler/airflow/__lib.py @@ -88,6 +88,7 @@ def _add_connection_adapter_if_absent(self, host): def get_job_replay_config(self, project_name, job_name, schedule_time) -> dict: scheduled_at_str = schedule_time.strftime(TIMESTAMP_FORMAT) + log.info("Getting replay config for job:{}, project:{}, schedule_time:{}".format(job_name, project_name, scheduled_at_str)) url = '{optimus_host}/api/v1beta1/project/{optimus_project}/replay-details'.format( optimus_host=self.host, optimus_project=project_name, @@ -97,18 +98,23 @@ def get_job_replay_config(self, project_name, job_name, schedule_time) -> dict: 'job_names': job_name, 'status': "in progress", }, timeout=OPTIMUS_REQUEST_TIMEOUT_IN_SECS) + log.info("get replay details request URL {}".format(response.url)) if response.status_code != 200: + log.error("Received non 200 status code:{}, response:{}".format(response.status_code, response)) return {} replay_config = response.json() - + log.info("Received replay config :{}".format(replay_config)) if 'replays' in replay_config.keys(): + if len(replay_config['replays']) == 0 : + log.warning("unable to find a 'in_progress' replay for the given scheduled date:{}" + .format(scheduled_at_str)) + return {} for config in replay_config['replays']: if 'replayConfig' in config.keys(): - if 'startTime' in config['replayConfig'].keys() & 'endTime' in config['replayConfig'].keys(): - start_time = datetime.strptime(config['replayConfig']['startTime'], TIMESTAMP_FORMAT) - end_time = datetime.strptime(config['replayConfig']['endTime'], TIMESTAMP_FORMAT) - if start_time <= schedule_time & schedule_time >= end_time: - return config['replayConfig']['jobConfig'] + if 'jobConfig' in config['replayConfig'].keys(): + log.info("Found a replay with specified 'jobConfig' for the given scheduled_date:{}, config:{}" + .format(scheduled_at_str, config)) + return config['replayConfig']['jobConfig'] return {} def get_job_run(self, project_name: str, job_name: str, start_date: str, end_date: str, downstream_project_name: str, downstream_job_name: str) -> dict: @@ -240,11 +246,11 @@ def __init__( def poke(self, context): schedule_time = get_scheduled_at(context) - job_config = self._optimus_client.get_job_replay_config(self.project_name, self.name, schedule_time) - if 'replays' in job_config.keys(): - if 'IGNORE_UPSTREAM' in job_config['jobConfig'].keys(): - if job_config['jobConfig']['IGNORE_UPSTREAM'] == "True": - return True + job_config = self._optimus_client.get_job_replay_config(self.project_name, self.job_name, schedule_time) + if 'IGNORE_UPSTREAM' in job_config.keys(): + if job_config['IGNORE_UPSTREAM'] == "True": + log.info("Bypassing upstream check as job_config contains IGNORE_UPSTREAM=True") + return True try: upstream_schedule = self.get_schedule_interval(schedule_time) except Exception as e: