diff --git a/README.md b/README.md index 5f352de..dd9bf1c 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,7 @@ ## Getting started +Please note that this file is no substitute for reading and understanding the Airflow documentation. This file is only intended to provide a quick start for the SAS providers. Unless an issue relates specifically to the SAS providers, the Airflow documentation should be consulted. ### Install Airflow Follow instructions at https://airflow.apache.org/docs/apache-airflow/stable/installation/index.html to install Airflow. If you just want to evaluate the SAS providers, then the simplest path would be to install via PYPI and run Airflow on the local machine in a virtual environment. @@ -36,6 +37,10 @@ There are a few ways to provide the package: In order to connect to SAS Viya from the Airflow operator, you will need to create a connection. The easiest way to do this is to go into the Airflow UI under Admin/Connections and create a new connection using the blue + button. Select SAS from the list of connection types, and enter sas_default as the name. The applicable fields are host (http or https url to your SAS Viya install), login and password. It is also possible to specify an OAuth token by creating a json body in the extra field. For example `{"token": "oauth_token_here"}`. If a token is found it is used instead of the user/password. Please be aware of security considerations when storing sensitive information in a connection. Consult https://airflow.apache.org/docs/apache-airflow/stable/security/index.html for details. +TLS verification can be disabled (not recommended) by specifying the following in +the extra field `{"ssl_certificate_verification": false }` +In addition, a custom TLS CA certificate bundle file can be used as follows: +`{"ssl_certificate_verification": "/path/to/trustedcerts.pem"}` ### Running a DAG with a SAS provider See example files in the src/sas_airflow_provider/example_dags directory. These dags can be modified and diff --git a/setup.cfg b/setup.cfg index a708ea4..4aa1181 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = sas-airflow-provider -version = 0.0.8 +version = 0.0.9 author = SAS author_email = andrew.shakinovsky@sas.com description = Enables execution of Studio Flows and Jobs from Airflow diff --git a/src/sas_airflow_provider/hooks/sas.py b/src/sas_airflow_provider/hooks/sas.py index fce53a2..3adaa47 100644 --- a/src/sas_airflow_provider/hooks/sas.py +++ b/src/sas_airflow_provider/hooks/sas.py @@ -26,6 +26,8 @@ def __init__(self, conn_id: str = None) -> None: self.password = None self.token = None self.sas_conn = None + self.cert_verify = True + self.grant_type = None def get_conn(self): """Returns a SAS connection.""" @@ -39,12 +41,19 @@ def get_conn(self): extras = conn.extra_dejson self.token = extras.get("token") self.client_id = extras.get("client_id") + self.grant_type = extras.get("grant_type", "password") self.client_secret = "" if not self.client_id: self.client_id = "sas.cli" else: self.client_secret = extras.get("client_secret") # type: ignore + self.cert_verify = extras.get("ssl_certificate_verification", True) + if not self.cert_verify: + self.log.info(f"TLS verification is turned off") + elif isinstance(self.cert_verify, str): + self.log.info("Using custom TLS CA certificate bundle file") + if not self.sas_conn: self.sas_conn = self._create_session_for_connection() @@ -55,8 +64,9 @@ def _create_session_for_connection(self): self.conn_id, self.host) - # disable insecure HTTP requests warnings - urllib3.disable_warnings(InsecureRequestWarning) + if not self.cert_verify: + # disable insecure HTTP requests warnings + urllib3.disable_warnings(InsecureRequestWarning) if not self.token: # base 64 encode the api client auth and pass in authorization header @@ -65,11 +75,14 @@ def _create_session_for_connection(self): auth_header = base64.b64encode(auth_bytes).decode("ascii") my_headers = {"Authorization": f"Basic {auth_header}"} - payload = {"grant_type": "password", "username": self.login, "password": self.password} + payload = {"grant_type": self.grant_type} + if self.login: + payload["username"] = self.login + payload["password"] = self.password self.log.info("Get oauth token (see README if this crashes)") response = requests.post( - f"{self.host}/SASLogon/oauth/token", data=payload, verify=False, headers=my_headers + f"{self.host}/SASLogon/oauth/token", data=payload, verify=self.cert_verify, headers=my_headers ) if response.status_code != 200: @@ -85,8 +98,8 @@ def _create_session_for_connection(self): session.headers.update({"Accept": "application/json"}) session.headers.update({"Content-Type": "application/json"}) - # set to false if using self signed certs - session.verify = False + # set to false if using self-signed certs + session.verify = self.cert_verify # prepend the root url for all operations on the session, so that consumers can just provide # resource uri without the protocol and host diff --git a/src/sas_airflow_provider/operators/sas_jobexecution.py b/src/sas_airflow_provider/operators/sas_jobexecution.py index 2c1914f..3162efc 100644 --- a/src/sas_airflow_provider/operators/sas_jobexecution.py +++ b/src/sas_airflow_provider/operators/sas_jobexecution.py @@ -87,7 +87,7 @@ def execute(self, context): url = f"/SASJobExecution/?_program={program_name}{url_string}" headers = {"Accept": "application/vnd.sas.job.execution.job+json"} - response = session.post(url, headers=headers, verify=False) + response = session.post(url, headers=headers) if response.status_code < 200 or response.status_code >= 300: raise AirflowFailException(f"SAS Job Execution HTTP status code {response.status_code}") @@ -101,7 +101,7 @@ def execute(self, context): job_id = response.headers.get('X-Sas-Jobexec-Id') if job_id: job_status_url = f"/jobExecution/jobs/{job_id}" - job = session.get(job_status_url, verify=False) + job = session.get(job_status_url) if job.status_code >= 200: dump_logs(session, job.json()) else: diff --git a/tests/hooks/test_sas.py b/tests/hooks/test_sas.py index db48f5f..06a0f99 100644 --- a/tests/hooks/test_sas.py +++ b/tests/hooks/test_sas.py @@ -42,5 +42,5 @@ def test_sas_hook(self, bh_mock, req_mock, sess_mock): req_mock.assert_called_with('host/SASLogon/oauth/token', data={'grant_type': 'password', 'username': 'user', - 'password': 'pass'}, verify=False, + 'password': 'pass'}, verify=True, headers={'Authorization': 'Basic c2FzLmNsaTo='}) diff --git a/tests/operators/test_sas_jobexecution.py b/tests/operators/test_sas_jobexecution.py index b8e934d..61fdaeb 100644 --- a/tests/operators/test_sas_jobexecution.py +++ b/tests/operators/test_sas_jobexecution.py @@ -51,5 +51,4 @@ def test_execute_sas_job_execution_operator(self, session_mock, dump_logs_mock): dump_logs_mock.assert_called() session_mock.return_value.get_conn.return_value.post.assert_called_with('/SASJobExecution/?_program=/Public/my_job&a=b', headers={ - 'Accept': 'application/vnd.sas.job.execution.job+json'}, - verify=False) + 'Accept': 'application/vnd.sas.job.execution.job+json'})