Skip to content

Commit

Permalink
feat: tls, client creds (#23)
Browse files Browse the repository at this point in the history
* TLS is enabled by default
* Ability to login using client credentials
  • Loading branch information
AndrewShakinovsky-SAS authored Sep 20, 2023
1 parent 0b530ad commit ab8277a
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 12 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@


## Getting started
Please note that this file is no substitute for reading and understanding the Airflow documentation. This file is only intended to provide a quick start for the SAS providers. Unless an issue relates specifically to the SAS providers, the Airflow documentation should be consulted.
### Install Airflow
Follow instructions at https://airflow.apache.org/docs/apache-airflow/stable/installation/index.html to install Airflow.
If you just want to evaluate the SAS providers, then the simplest path would be to install via PYPI and run Airflow on the local machine in a virtual environment.
Expand All @@ -36,6 +37,10 @@ There are a few ways to provide the package:
In order to connect to SAS Viya from the Airflow operator, you will need to create a connection. The easiest way to do this is to go into the Airflow UI under Admin/Connections and create a new connection using the blue + button. Select SAS from the list of connection types, and enter sas_default as the name. The applicable fields are host (http or https url to your SAS Viya install), login and password. It is also possible to specify an OAuth token by creating a json body in the extra field. For example `{"token": "oauth_token_here"}`. If a token is found it is used instead of the user/password.
Please be aware of security considerations when storing sensitive information in a
connection. Consult https://airflow.apache.org/docs/apache-airflow/stable/security/index.html for details.
TLS verification can be disabled (not recommended) by specifying the following in
the extra field `{"ssl_certificate_verification": false }`
In addition, a custom TLS CA certificate bundle file can be used as follows:
`{"ssl_certificate_verification": "/path/to/trustedcerts.pem"}`

### Running a DAG with a SAS provider
See example files in the src/sas_airflow_provider/example_dags directory. These dags can be modified and
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[metadata]
name = sas-airflow-provider
version = 0.0.8
version = 0.0.9
author = SAS
author_email = [email protected]
description = Enables execution of Studio Flows and Jobs from Airflow
Expand Down
25 changes: 19 additions & 6 deletions src/sas_airflow_provider/hooks/sas.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ def __init__(self, conn_id: str = None) -> None:
self.password = None
self.token = None
self.sas_conn = None
self.cert_verify = True
self.grant_type = None

def get_conn(self):
"""Returns a SAS connection."""
Expand All @@ -39,12 +41,19 @@ def get_conn(self):
extras = conn.extra_dejson
self.token = extras.get("token")
self.client_id = extras.get("client_id")
self.grant_type = extras.get("grant_type", "password")
self.client_secret = ""
if not self.client_id:
self.client_id = "sas.cli"
else:
self.client_secret = extras.get("client_secret") # type: ignore

self.cert_verify = extras.get("ssl_certificate_verification", True)
if not self.cert_verify:
self.log.info(f"TLS verification is turned off")
elif isinstance(self.cert_verify, str):
self.log.info("Using custom TLS CA certificate bundle file")

if not self.sas_conn:
self.sas_conn = self._create_session_for_connection()

Expand All @@ -55,8 +64,9 @@ def _create_session_for_connection(self):
self.conn_id,
self.host)

# disable insecure HTTP requests warnings
urllib3.disable_warnings(InsecureRequestWarning)
if not self.cert_verify:
# disable insecure HTTP requests warnings
urllib3.disable_warnings(InsecureRequestWarning)

if not self.token:
# base 64 encode the api client auth and pass in authorization header
Expand All @@ -65,11 +75,14 @@ def _create_session_for_connection(self):
auth_header = base64.b64encode(auth_bytes).decode("ascii")
my_headers = {"Authorization": f"Basic {auth_header}"}

payload = {"grant_type": "password", "username": self.login, "password": self.password}
payload = {"grant_type": self.grant_type}
if self.login:
payload["username"] = self.login
payload["password"] = self.password

self.log.info("Get oauth token (see README if this crashes)")
response = requests.post(
f"{self.host}/SASLogon/oauth/token", data=payload, verify=False, headers=my_headers
f"{self.host}/SASLogon/oauth/token", data=payload, verify=self.cert_verify, headers=my_headers
)

if response.status_code != 200:
Expand All @@ -85,8 +98,8 @@ def _create_session_for_connection(self):
session.headers.update({"Accept": "application/json"})
session.headers.update({"Content-Type": "application/json"})

# set to false if using self signed certs
session.verify = False
# set to false if using self-signed certs
session.verify = self.cert_verify

# prepend the root url for all operations on the session, so that consumers can just provide
# resource uri without the protocol and host
Expand Down
4 changes: 2 additions & 2 deletions src/sas_airflow_provider/operators/sas_jobexecution.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def execute(self, context):
url = f"/SASJobExecution/?_program={program_name}{url_string}"

headers = {"Accept": "application/vnd.sas.job.execution.job+json"}
response = session.post(url, headers=headers, verify=False)
response = session.post(url, headers=headers)

if response.status_code < 200 or response.status_code >= 300:
raise AirflowFailException(f"SAS Job Execution HTTP status code {response.status_code}")
Expand All @@ -101,7 +101,7 @@ def execute(self, context):
job_id = response.headers.get('X-Sas-Jobexec-Id')
if job_id:
job_status_url = f"/jobExecution/jobs/{job_id}"
job = session.get(job_status_url, verify=False)
job = session.get(job_status_url)
if job.status_code >= 200:
dump_logs(session, job.json())
else:
Expand Down
2 changes: 1 addition & 1 deletion tests/hooks/test_sas.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,5 @@ def test_sas_hook(self, bh_mock, req_mock, sess_mock):

req_mock.assert_called_with('host/SASLogon/oauth/token',
data={'grant_type': 'password', 'username': 'user',
'password': 'pass'}, verify=False,
'password': 'pass'}, verify=True,
headers={'Authorization': 'Basic c2FzLmNsaTo='})
3 changes: 1 addition & 2 deletions tests/operators/test_sas_jobexecution.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,4 @@ def test_execute_sas_job_execution_operator(self, session_mock, dump_logs_mock):
dump_logs_mock.assert_called()
session_mock.return_value.get_conn.return_value.post.assert_called_with('/SASJobExecution/?_program=/Public/my_job&a=b',
headers={
'Accept': 'application/vnd.sas.job.execution.job+json'},
verify=False)
'Accept': 'application/vnd.sas.job.execution.job+json'})

0 comments on commit ab8277a

Please sign in to comment.