Skip to content

Commit

Permalink
Logging __lib.py (#270)
Browse files Browse the repository at this point in the history
* fix: add logging in __lib.py
  • Loading branch information
Mryashbhardwaj authored Sep 24, 2024
1 parent 643a392 commit b4c6c1a
Showing 1 changed file with 17 additions and 11 deletions.
28 changes: 17 additions & 11 deletions ext/scheduler/airflow/__lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def _add_connection_adapter_if_absent(self, host):

def get_job_replay_config(self, project_name, job_name, schedule_time) -> dict:
scheduled_at_str = schedule_time.strftime(TIMESTAMP_FORMAT)
log.info("Getting replay config for job:{}, project:{}, schedule_time:{}".format(job_name, project_name, scheduled_at_str))
url = '{optimus_host}/api/v1beta1/project/{optimus_project}/replay-details'.format(
optimus_host=self.host,
optimus_project=project_name,
Expand All @@ -97,18 +98,23 @@ def get_job_replay_config(self, project_name, job_name, schedule_time) -> dict:
'job_names': job_name,
'status': "in progress",
}, timeout=OPTIMUS_REQUEST_TIMEOUT_IN_SECS)
log.info("get replay details request URL {}".format(response.url))
if response.status_code != 200:
log.error("Received non 200 status code:{}, response:{}".format(response.status_code, response))
return {}
replay_config = response.json()

log.info("Received replay config :{}".format(replay_config))
if 'replays' in replay_config.keys():
if len(replay_config['replays']) == 0 :
log.warning("unable to find a 'in_progress' replay for the given scheduled date:{}"
.format(scheduled_at_str))
return {}
for config in replay_config['replays']:
if 'replayConfig' in config.keys():
if 'startTime' in config['replayConfig'].keys() & 'endTime' in config['replayConfig'].keys():
start_time = datetime.strptime(config['replayConfig']['startTime'], TIMESTAMP_FORMAT)
end_time = datetime.strptime(config['replayConfig']['endTime'], TIMESTAMP_FORMAT)
if start_time <= schedule_time & schedule_time >= end_time:
return config['replayConfig']['jobConfig']
if 'jobConfig' in config['replayConfig'].keys():
log.info("Found a replay with specified 'jobConfig' for the given scheduled_date:{}, config:{}"
.format(scheduled_at_str, config))
return config['replayConfig']['jobConfig']
return {}

def get_job_run(self, project_name: str, job_name: str, start_date: str, end_date: str, downstream_project_name: str, downstream_job_name: str) -> dict:
Expand Down Expand Up @@ -240,11 +246,11 @@ def __init__(

def poke(self, context):
schedule_time = get_scheduled_at(context)
job_config = self._optimus_client.get_job_replay_config(self.project_name, self.name, schedule_time)
if 'replays' in job_config.keys():
if 'IGNORE_UPSTREAM' in job_config['jobConfig'].keys():
if job_config['jobConfig']['IGNORE_UPSTREAM'] == "True":
return True
job_config = self._optimus_client.get_job_replay_config(self.project_name, self.job_name, schedule_time)
if 'IGNORE_UPSTREAM' in job_config.keys():
if job_config['IGNORE_UPSTREAM'] == "True":
log.info("Bypassing upstream check as job_config contains IGNORE_UPSTREAM=True")
return True
try:
upstream_schedule = self.get_schedule_interval(schedule_time)
except Exception as e:
Expand Down

0 comments on commit b4c6c1a

Please sign in to comment.