Skip to content

Commit

Permalink
Pass credentials directly to GCP connectors rather than through envir…
Browse files Browse the repository at this point in the history
…onment variable (#1040)

* Fix typo in filename utitities > utilities

* New method to pass credentials directly to GCP clients

* Pass credentials explicitly to Google clients

Avoids issue documented in #1039 where credentials for all GCP clients
are stored in the same environment variable, leading to overwrites if
multiple clients are initialized in the same environment.

* Mock credential parsing in tests

Avoids mock credentials needing to match Google Service Account
credential parsing

* Refactor Google Admin using new authed request session

* Fix tests on GoogleAdmin, expect new response structure

* Small changes to get BigQuery tests working
  • Loading branch information
austinweisgrau authored Jul 12, 2024
1 parent 118d744 commit 6dfaaec
Show file tree
Hide file tree
Showing 8 changed files with 234 additions and 91 deletions.
44 changes: 23 additions & 21 deletions parsons/google/google_admin.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from oauth2client.service_account import ServiceAccountCredentials
import uuid

from google.auth.transport.requests import AuthorizedSession

from parsons.etl.table import Table
from parsons.google.utitities import setup_google_application_credentials
import httplib2
import json
import os
from parsons.google.utilities import (
load_google_application_credentials,
setup_google_application_credentials,
)


class GoogleAdmin(object):
Expand All @@ -23,17 +26,16 @@ class GoogleAdmin(object):
"""

def __init__(self, app_creds=None, sub=None):
setup_google_application_credentials(app_creds)

self.client = (
ServiceAccountCredentials.from_json_keyfile_name(
os.environ["GOOGLE_APPLICATION_CREDENTIALS"],
["https://www.googleapis.com/auth/admin.directory.group"],
)
.create_delegated(sub)
.authorize(httplib2.Http())
env_credentials_path = str(uuid.uuid4())
setup_google_application_credentials(app_creds, target_env_var_name=env_credentials_path)
credentials = load_google_application_credentials(
env_credentials_path,
scopes=["https://www.googleapis.com/auth/admin.directory.group"],
subject=sub,
)

self.client = AuthorizedSession(credentials)

def _paginate_request(self, endpoint, collection, params=None):
# Build query params
param_arr = []
Expand All @@ -48,7 +50,9 @@ def _paginate_request(self, endpoint, collection, params=None):

# Return type from Google Admin is a tuple of length 2. Extract desired result from 2nd item
# in tuple and convert to json
res = json.loads(self.client.request(req_url + param_str, "GET")[1].decode("utf-8"))
res = self.client.request("GET", req_url + param_str).json()
if "error" in res:
raise RuntimeError(res["error"].get("message"))

# Paginate
ret = []
Expand All @@ -60,12 +64,10 @@ def _paginate_request(self, endpoint, collection, params=None):
param_arr.append("pageToken=" + res["nextPageToken"])
else:
param_arr[-1] = "pageToken=" + res["nextPageToken"]
res = json.loads(
self.client.request(req_url + "?" + "&".join(param_arr), "GET")[1].decode(
"utf-8"
)
)
ret += res[collection]
response = self.client.request("GET", req_url + "?" + "&".join(param_arr)).json()
if "error" in response:
raise RuntimeError(response["error"].get("message"))
ret += response[collection]

return Table(ret)

Expand Down
16 changes: 11 additions & 5 deletions parsons/google/google_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
from parsons.databases.table import BaseTable
from parsons.etl import Table
from parsons.google.google_cloud_storage import GoogleCloudStorage
from parsons.google.utitities import setup_google_application_credentials
from parsons.google.utilities import (
load_google_application_credentials,
setup_google_application_credentials,
)
from parsons.utilities import check_env
from parsons.utilities.files import create_temp_file

Expand Down Expand Up @@ -159,8 +162,11 @@ def __init__(
if isinstance(app_creds, Credentials):
self.credentials = app_creds
else:
self.credentials = None
setup_google_application_credentials(app_creds)
self.env_credential_path = str(uuid.uuid4())
setup_google_application_credentials(
app_creds, target_env_var_name=self.env_credential_path
)
self.credentials = load_google_application_credentials(self.env_credential_path)

self.project = project
self.location = location
Expand Down Expand Up @@ -694,7 +700,7 @@ def copy_s3(

# copy from S3 to GCS
tmp_gcs_bucket = check_env.check("GCS_TEMP_BUCKET", tmp_gcs_bucket)
gcs_client = gcs_client or GoogleCloudStorage()
gcs_client = gcs_client or GoogleCloudStorage(app_creds=self.app_creds)
temp_blob_uri = gcs_client.copy_s3_to_gcs(
aws_source_bucket=bucket,
aws_access_key_id=aws_access_key_id,
Expand Down Expand Up @@ -807,7 +813,7 @@ def copy(
schema.append(schema_row)
job_config.schema = schema

gcs_client = gcs_client or GoogleCloudStorage()
gcs_client = gcs_client or GoogleCloudStorage(app_creds=self.app_creds)
temp_blob_name = f"{uuid.uuid4()}.{data_type}"
temp_blob_uri = gcs_client.upload_table(tbl, tmp_gcs_bucket, temp_blob_name)

Expand Down
52 changes: 37 additions & 15 deletions parsons/google/google_cloud_storage.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
import google
from google.cloud import storage
from google.cloud import storage_transfer
from parsons.google.utitities import setup_google_application_credentials
from parsons.utilities import files
import datetime
import gzip
import petl
Expand All @@ -12,6 +7,15 @@
import zipfile
from typing import Optional

import google
from google.cloud import storage, storage_transfer

from parsons.google.utilities import (
load_google_application_credentials,
setup_google_application_credentials,
)
from parsons.utilities import files

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -42,17 +46,21 @@ class GoogleCloudStorage(object):
"""

def __init__(self, app_creds=None, project=None):
setup_google_application_credentials(app_creds)
env_credentials_path = str(uuid.uuid4())
setup_google_application_credentials(
app_creds, target_env_var_name=env_credentials_path
)
credentials = load_google_application_credentials(env_credentials_path)
self.project = project

# Throws an error if you pass project=None, so adding if/else statement.
if not self.project:
self.client = storage.Client()
self.client = storage.Client(credentials=credentials)
"""
Access all methods of `google.cloud` package
"""
else:
self.client = storage.Client(project=self.project)
self.client = storage.Client(credentials=credentials, project=self.project)

def list_buckets(self):
"""
Expand Down Expand Up @@ -289,7 +297,9 @@ def delete_blob(self, bucket_name, blob_name):
blob.delete()
logger.info(f"{blob_name} blob in {bucket_name} bucket deleted.")

def upload_table(self, table, bucket_name, blob_name, data_type="csv", default_acl=None):
def upload_table(
self, table, bucket_name, blob_name, data_type="csv", default_acl=None
):
"""
Load the data from a Parsons table into a blob.
Expand Down Expand Up @@ -325,7 +335,9 @@ def upload_table(self, table, bucket_name, blob_name, data_type="csv", default_a
local_file = table.to_json()
content_type = "application/json"
else:
raise ValueError(f"Unknown data_type value ({data_type}): must be one of: csv or json")
raise ValueError(
f"Unknown data_type value ({data_type}): must be one of: csv or json"
)

try:
blob.upload_from_filename(
Expand Down Expand Up @@ -395,7 +407,9 @@ def copy_bucket_to_gcs(
Secret key to authenticate storage transfer
"""
if source not in ["gcs", "s3"]:
raise ValueError(f"Blob transfer only supports gcs and s3 sources [source={source}]")
raise ValueError(
f"Blob transfer only supports gcs and s3 sources [source={source}]"
)
if source_path and source_path[-1] != "/":
raise ValueError("Source path much end in a '/'")

Expand Down Expand Up @@ -582,9 +596,13 @@ def unzip_blob(
}

file_extension = compression_params[compression_type]["file_extension"]
compression_function = compression_params[compression_type]["compression_function"]
compression_function = compression_params[compression_type][
"compression_function"
]

compressed_filepath = self.download_blob(bucket_name=bucket_name, blob_name=blob_name)
compressed_filepath = self.download_blob(
bucket_name=bucket_name, blob_name=blob_name
)

decompressed_filepath = compressed_filepath.replace(file_extension, "")
decompressed_blob_name = (
Expand Down Expand Up @@ -616,7 +634,9 @@ def __gzip_decompress_and_write_to_gcs(self, **kwargs):
bucket_name = kwargs.pop("bucket_name")

with gzip.open(compressed_filepath, "rb") as f_in:
logger.debug(f"Uploading uncompressed file to GCS: {decompressed_blob_name}")
logger.debug(
f"Uploading uncompressed file to GCS: {decompressed_blob_name}"
)
bucket = self.get_bucket(bucket_name=bucket_name)
blob = storage.Blob(name=decompressed_blob_name, bucket=bucket)
blob.upload_from_file(file_obj=f_in, rewind=True, timeout=3600)
Expand All @@ -636,7 +656,9 @@ def __zip_decompress_and_write_to_gcs(self, **kwargs):
with zipfile.ZipFile(compressed_filepath) as path_:
# Open the underlying file
with path_.open(decompressed_blob_in_archive) as f_in:
logger.debug(f"Uploading uncompressed file to GCS: {decompressed_blob_name}")
logger.debug(
f"Uploading uncompressed file to GCS: {decompressed_blob_name}"
)
bucket = self.get_bucket(bucket_name=bucket_name)
blob = storage.Blob(name=decompressed_blob_name, bucket=bucket)
blob.upload_from_file(file_obj=f_in, rewind=True, timeout=3600)
39 changes: 25 additions & 14 deletions parsons/google/google_sheets.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import os
import json
import logging

from parsons.etl.table import Table
from parsons.google.utitities import setup_google_application_credentials, hexavigesimal
import uuid

import gspread
from google.oauth2.service_account import Credentials

from parsons.etl.table import Table
from parsons.google.utilities import (
load_google_application_credentials,
setup_google_application_credentials,
hexavigesimal,
)

logger = logging.getLogger(__name__)

Expand All @@ -32,12 +34,15 @@ def __init__(self, google_keyfile_dict=None, subject=None):
"https://www.googleapis.com/auth/drive",
]

setup_google_application_credentials(google_keyfile_dict, "GOOGLE_DRIVE_CREDENTIALS")
google_credential_file = open(os.environ["GOOGLE_DRIVE_CREDENTIALS"])
credentials_dict = json.load(google_credential_file)
env_credentials_path = str(uuid.uuid4())
setup_google_application_credentials(
google_keyfile_dict,
"GOOGLE_DRIVE_CREDENTIAL",

This comment has been minimized.

Copy link
@cmdelrio

cmdelrio Jul 15, 2024

Contributor

I think this should be GOOGLE_DRIVE_CREDENTIALS. Was this intended @austinweisgrau ?

This comment has been minimized.

Copy link
@austinweisgrau

austinweisgrau Jul 15, 2024

Author Collaborator

woops you're right! let me put up a hotfix PR and ill assign u

This comment has been minimized.

Copy link
@austinweisgrau

austinweisgrau Jul 15, 2024

Author Collaborator
target_env_var_name=env_credentials_path,
)

credentials = Credentials.from_service_account_info(
credentials_dict, scopes=scope, subject=subject
credentials = load_google_application_credentials(
env_credentials_path, scopes=scope, subject=subject
)

self.gspread_client = gspread.authorize(credentials)
Expand All @@ -47,12 +52,16 @@ def _get_worksheet(self, spreadsheet_id, worksheet=0):

# Check if the worksheet is an integer, if so find the sheet by index
if isinstance(worksheet, int):
return self.gspread_client.open_by_key(spreadsheet_id).get_worksheet(worksheet)
return self.gspread_client.open_by_key(spreadsheet_id).get_worksheet(
worksheet
)

elif isinstance(worksheet, str):
idx = self.list_worksheets(spreadsheet_id).index(worksheet)
try:
return self.gspread_client.open_by_key(spreadsheet_id).get_worksheet(idx)
return self.gspread_client.open_by_key(spreadsheet_id).get_worksheet(
idx
)
except: # noqa: E722
raise ValueError(f"Couldn't find worksheet {worksheet}")

Expand Down Expand Up @@ -280,7 +289,9 @@ def append_to_sheet(

# If the existing sheet is blank, then just overwrite the table.
if existing_table.num_rows == 0:
return self.overwrite_sheet(spreadsheet_id, table, worksheet, user_entered_value)
return self.overwrite_sheet(
spreadsheet_id, table, worksheet, user_entered_value
)

cells = []
for row_num, row in enumerate(table.data):
Expand Down
34 changes: 30 additions & 4 deletions parsons/google/utitities.py → parsons/google/utilities.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
import typing as t
from parsons.utilities import files
from parsons.utilities import check_env
import json
import os
import typing as t

import google
from google.oauth2 import service_account

from parsons.utilities import check_env, files


def setup_google_application_credentials(
app_creds: t.Union[t.Dict, str, None],
env_var_name: str = "GOOGLE_APPLICATION_CREDENTIALS",
target_env_var_name: t.Optional[str] = None,
) -> None:
# Detect if app_creds is a dict, path string or json string, and if it is a
# json string, then convert it to a temporary file. Then set the
Expand All @@ -21,7 +25,10 @@ def setup_google_application_credentials(
except ValueError:
creds_path = credentials

os.environ[env_var_name] = creds_path
if not target_env_var_name:
target_env_var_name = env_var_name

os.environ[target_env_var_name] = creds_path


def hexavigesimal(n: int) -> str:
Expand All @@ -48,3 +55,22 @@ def hexavigesimal(n: int) -> str:
chars = chr((n - 1) % 26 + 65) + chars # 65 makes us start at A
n = (n - 1) // 26
return chars


def load_google_application_credentials(
env_var_name: str = "GOOGLE_APPLICATION_CREDENTIALS",
scopes: t.Optional[t.List[str]] = None,
subject: t.Optional[str] = None,
) -> google.auth.credentials.Credentials:

service_account_filepath = os.environ[env_var_name]

credentials = service_account.Credentials.from_service_account_file(service_account_filepath)

if scopes:
credentials = credentials.with_scopes(scopes)

if subject:
credentials = credentials.with_subject(subject)

return credentials
Loading

0 comments on commit 6dfaaec

Please sign in to comment.