Skip to content

Commit

Permalink
fix[intn] | description and request timeout changes
Browse files Browse the repository at this point in the history
  • Loading branch information
ankita-gulati-gojek committed Oct 30, 2023
1 parent 9a3888b commit 9320256
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 9 deletions.
13 changes: 7 additions & 6 deletions ext/scheduler/airflow/__lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
TIMESTAMP_MS_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"

SCHEDULER_ERR_MSG = "scheduler_error"
OPTIMUS_REQUEST_TIMEOUT_IN_SECS = int(Variable.get("optimus_request_timeout_in_secs", default_var=5 * 60))

def lookup_non_standard_cron_expression(expr: str) -> str:
expr_mapping = {
Expand Down Expand Up @@ -81,7 +82,7 @@ def get_job_run(self, optimus_project: str, optimus_job: str, startDate: str, en
optimus_project=optimus_project,
optimus_job=optimus_job,
)
response = requests.get(url, params={'start_date': startDate, 'end_date': endDate})
response = requests.get(url, params={'start_date': startDate, 'end_date': endDate}, timeout=OPTIMUS_REQUEST_TIMEOUT_IN_SECS)
self._raise_error_if_request_failed(response)
return response.json()

Expand All @@ -95,7 +96,7 @@ def get_task_window(self, scheduled_at: str, version: int, window_size: str, win
window_offset=window_offset,
window_truncate_upto=window_truncate_upto,
)
response = requests.get(url)
response = requests.get(url, timeout=OPTIMUS_REQUEST_TIMEOUT_IN_SECS)
self._raise_error_if_request_failed(response)
return response.json()

Expand All @@ -104,7 +105,7 @@ def get_job_run_input(self, execution_date: str, project_name: str, job_name: st
response = requests.post(url="{}/api/v1beta1/project/{}/job/{}/run_input".format(self.host, project_name, job_name),
json={'scheduled_at': execution_date,
'instance_name': instance_name,
'instance_type': "TYPE_" + job_type.upper()})
'instance_type': "TYPE_" + job_type.upper()}, timeout=OPTIMUS_REQUEST_TIMEOUT_IN_SECS)

self._raise_error_if_request_failed(response)
return response.json()
Expand All @@ -115,7 +116,7 @@ def get_job_metadata(self, execution_date, namespace, project, job) -> dict:
namespace_name=namespace,
project_name=project,
job_name=job)
response = requests.get(url)
response = requests.get(url, timeout=OPTIMUS_REQUEST_TIMEOUT_IN_SECS)
self._raise_error_if_request_failed(response)
return response.json()

Expand All @@ -129,7 +130,7 @@ def notify_event(self, project, namespace, job, event) -> dict:
request_data = {
"event": event
}
response = requests.post(url, data=json.dumps(request_data))
response = requests.post(url, data=json.dumps(request_data), timeout=OPTIMUS_REQUEST_TIMEOUT_IN_SECS)
self._raise_error_if_request_failed(response)
return response.json()

Expand Down Expand Up @@ -637,7 +638,7 @@ def __init__(

def poke(self, context: 'Context') -> bool:
self.log.info('Poking: %s', self.endpoint)
r = requests.get(url=self.endpoint, headers=self.headers, params=self.request_params)
r = requests.get(url=self.endpoint, headers=self.headers, params=self.request_params, timeout=OPTIMUS_REQUEST_TIMEOUT_IN_SECS))
if (r.status_code >= 200 and r.status_code <= 300):
return True
return False
2 changes: 1 addition & 1 deletion ext/scheduler/airflow/dag/compiler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func setupJobDetails(tnnt tenant.Tenant) *scheduler.JobWithDetails {
jobMeta := &scheduler.JobMetadata{
Version: 1,
Owner: "[email protected]",
Description: "This job collects the billing information related to infrastructure",
Description: "This job collects the billing information related to infrastructure.\nThis job will run in a weekly basis.",
Labels: map[string]string{"orchestrator": "optimus"},
}

Expand Down
4 changes: 3 additions & 1 deletion ext/scheduler/airflow/dag/dag.py.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ default_args = {
}

{{ if ne .JobDetails.JobMetadata.Description "" -}}
# {{.JobDetails.JobMetadata.Description}}
"""
{{.JobDetails.JobMetadata.Description}}
"""
{{- end }}
dag = DAG(
dag_id={{.JobDetails.Name.String | quote}},
Expand Down
5 changes: 4 additions & 1 deletion ext/scheduler/airflow/dag/expected_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@
"on_failure_callback": operator_failure_event,
}

# This job collects the billing information related to infrastructure
"""
This job collects the billing information related to infrastructure.
This job will run in a weekly basis.
"""
dag = DAG(
dag_id="infra.billing.weekly-status-reports",
default_args=default_args,
Expand Down

0 comments on commit 9320256

Please sign in to comment.