Skip to content

Commit

Permalink
Merge pull request #18100 from jmchilton/aws_fixes
Browse files Browse the repository at this point in the history
Fix AWS Object Store for us-east-2
  • Loading branch information
jmchilton authored May 7, 2024
2 parents 4f68dd2 + 18c1376 commit 0910741
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 57 deletions.
23 changes: 6 additions & 17 deletions lib/galaxy/objectstore/cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,6 @@ def _config_to_dict(self):
"name": self.bucket_name,
"use_reduced_redundancy": self.use_rr,
},
"connection": {
"host": self.host,
"port": self.port,
"multipart": self.multipart,
"is_secure": self.is_secure,
"conn_path": self.conn_path,
},
"cache": {
"size": self.cache_size,
"path": self.staging_path,
Expand All @@ -86,7 +79,6 @@ def __init__(self, config, config_dict):
self.transfer_progress = 0

bucket_dict = config_dict["bucket"]
connection_dict = config_dict.get("connection", {})
cache_dict = config_dict.get("cache") or {}
self.enable_cache_monitor, self.cache_monitor_interval = enable_cache_monitor(config, config_dict)

Expand All @@ -96,12 +88,6 @@ def __init__(self, config, config_dict):
self.use_rr = bucket_dict.get("use_reduced_redundancy", False)
self.max_chunk_size = bucket_dict.get("max_chunk_size", 250)

self.host = connection_dict.get("host", None)
self.port = connection_dict.get("port", 6000)
self.multipart = connection_dict.get("multipart", True)
self.is_secure = connection_dict.get("is_secure", True)
self.conn_path = connection_dict.get("conn_path", "/")

self.cache_size = cache_dict.get("size") or self.config.object_store_cache_size
self.staging_path = cache_dict.get("path") or self.config.object_store_cache_path
self.cache_updated_data = cache_dict.get("cache_updated_data", True)
Expand Down Expand Up @@ -131,6 +117,8 @@ def _get_connection(provider, credentials):
log.debug(f"Configuring `{provider}` Connection")
if provider == "aws":
config = {"aws_access_key": credentials["access_key"], "aws_secret_key": credentials["secret_key"]}
if "region" in credentials:
config["aws_region_name"] = credentials["region"]
connection = CloudProviderFactory().create_provider(ProviderList.AWS, config)
elif provider == "azure":
config = {
Expand Down Expand Up @@ -198,8 +186,9 @@ def parse_xml(clazz, config_xml):
if provider == "aws":
akey = auth_element.get("access_key")
skey = auth_element.get("secret_key")

config["auth"] = {"access_key": akey, "secret_key": skey}
if "region" in auth_element:
config["auth"]["region"] = auth_element["region"]
elif provider == "azure":
sid = auth_element.get("subscription_id")
if sid is None:
Expand Down Expand Up @@ -553,7 +542,7 @@ def _create(self, obj, **kwargs):

def _empty(self, obj, **kwargs):
if self._exists(obj, **kwargs):
return bool(self._size(obj, **kwargs) > 0)
return bool(self._size(obj, **kwargs) == 0)
else:
raise ObjectNotFound(f"objectstore.empty, object does not exist: {obj}, kwargs: {kwargs}")

Expand Down Expand Up @@ -692,7 +681,7 @@ def _get_object_url(self, obj, **kwargs):
log.exception("Trouble generating URL for dataset '%s'", rel_path)
return None

def _get_store_usage_percent(self):
def _get_store_usage_percent(self, obj):
return 0.0

def shutdown(self):
Expand Down
24 changes: 22 additions & 2 deletions lib/galaxy/objectstore/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ def parse_config_xml(config_xml):
multipart = string_as_bool(cn_xml.get("multipart", "True"))
is_secure = string_as_bool(cn_xml.get("is_secure", "True"))
conn_path = cn_xml.get("conn_path", "/")
region = cn_xml.get("region", None)

cache_dict = parse_caching_config_dict_from_xml(config_xml)

Expand Down Expand Up @@ -114,6 +115,7 @@ def parse_config_xml(config_xml):
"multipart": multipart,
"is_secure": is_secure,
"conn_path": conn_path,
"region": region,
},
"cache": cache_dict,
"extra_dirs": extra_dirs,
Expand Down Expand Up @@ -142,6 +144,7 @@ def _config_to_dict(self):
"multipart": self.multipart,
"is_secure": self.is_secure,
"conn_path": self.conn_path,
"region": self.region,
},
"cache": {
"size": self.cache_size,
Expand Down Expand Up @@ -185,6 +188,7 @@ def __init__(self, config, config_dict):
self.multipart = connection_dict.get("multipart", True)
self.is_secure = connection_dict.get("is_secure", True)
self.conn_path = connection_dict.get("conn_path", "/")
self.region = connection_dict.get("region", None)

self.cache_size = cache_dict.get("size") or self.config.object_store_cache_size
self.staging_path = cache_dict.get("path") or self.config.object_store_cache_path
Expand Down Expand Up @@ -228,7 +232,23 @@ def _configure_connection(self):
log.debug("Configuring S3 Connection")
# If access_key is empty use default credential chain
if self.access_key:
self.conn = S3Connection(self.access_key, self.secret_key)
if self.region:
# If specify a region we can infer a host and turn on SIGV4.
# https://stackoverflow.com/questions/26744712/s3-using-boto-and-sigv4-missing-host-parameter

# Turning on SIGV4 is needed for AWS regions created after 2014... from
# https://docs.aws.amazon.com/AmazonS3/latest/API/sig-v4-authenticating-requests.html:
#
# "Amazon S3 supports Signature Version 4, a protocol for authenticating inbound API requests to AWS services,
# in all AWS Regions. At this time, AWS Regions created before January 30, 2014 will continue to support the
# previous protocol, Signature Version 2. Any new Regions after January 30, 2014 will support only Signature
# Version 4 and therefore all requests to those Regions must be made with Signature Version 4."
os.environ["S3_USE_SIGV4"] = "True"
self.conn = S3Connection(self.access_key, self.secret_key, host=f"s3.{self.region}.amazonaws.com")
else:
# See notes above, this path through the code will not work for
# newer regions.
self.conn = S3Connection(self.access_key, self.secret_key)
else:
self.conn = S3Connection()

Expand Down Expand Up @@ -581,7 +601,7 @@ def _create(self, obj, **kwargs):

def _empty(self, obj, **kwargs):
if self._exists(obj, **kwargs):
return bool(self._size(obj, **kwargs) > 0)
return bool(self._size(obj, **kwargs) == 0)
else:
raise ObjectNotFound(f"objectstore.empty, object does not exist: {obj}, kwargs: {kwargs}")

Expand Down
160 changes: 122 additions & 38 deletions test/unit/objectstore/test_objectstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from uuid import uuid4

import pytest
from requests import get

from galaxy.exceptions import ObjectInvalid
from galaxy.objectstore.azure_blob import AzureBlobObjectStore
Expand Down Expand Up @@ -991,26 +992,19 @@ def test_config_parse_cloud():
assert object_store.bucket_name == "unique_bucket_name_all_lowercase"
assert object_store.use_rr is False

assert object_store.host is None
assert object_store.port == 6000
assert object_store.multipart is True
assert object_store.is_secure is True
assert object_store.conn_path == "/"

cache_target = object_store.cache_target
assert cache_target.size == 1000.0
assert cache_target.path == "database/object_store_cache"
assert object_store.extra_dirs["job_work"] == "database/job_working_directory_cloud"
assert object_store.extra_dirs["temp"] == "database/tmp_cloud"

as_dict = object_store.to_dict()
_assert_has_keys(as_dict, ["provider", "auth", "bucket", "connection", "cache", "extra_dirs", "type"])
_assert_has_keys(as_dict, ["provider", "auth", "bucket", "cache", "extra_dirs", "type"])

_assert_key_has_value(as_dict, "type", "cloud")

auth_dict = as_dict["auth"]
bucket_dict = as_dict["bucket"]
connection_dict = as_dict["connection"]
cache_dict = as_dict["cache"]

provider = as_dict["provider"]
Expand All @@ -1028,11 +1022,6 @@ def test_config_parse_cloud():
_assert_key_has_value(bucket_dict, "name", "unique_bucket_name_all_lowercase")
_assert_key_has_value(bucket_dict, "use_reduced_redundancy", False)

_assert_key_has_value(connection_dict, "host", None)
_assert_key_has_value(connection_dict, "port", 6000)
_assert_key_has_value(connection_dict, "multipart", True)
_assert_key_has_value(connection_dict, "is_secure", True)

_assert_key_has_value(cache_dict, "size", 1000.0)
_assert_key_has_value(cache_dict, "path", "database/object_store_cache")

Expand All @@ -1056,26 +1045,19 @@ def test_config_parse_cloud_noauth_for_aws():
assert object_store.bucket_name == "unique_bucket_name_all_lowercase"
assert object_store.use_rr is False

assert object_store.host is None
assert object_store.port == 6000
assert object_store.multipart is True
assert object_store.is_secure is True
assert object_store.conn_path == "/"

cache_target = object_store.cache_target
assert cache_target.size == 1000.0
assert cache_target.path == "database/object_store_cache"
assert object_store.extra_dirs["job_work"] == "database/job_working_directory_cloud"
assert object_store.extra_dirs["temp"] == "database/tmp_cloud"

as_dict = object_store.to_dict()
_assert_has_keys(as_dict, ["provider", "auth", "bucket", "connection", "cache", "extra_dirs", "type"])
_assert_has_keys(as_dict, ["provider", "auth", "bucket", "cache", "extra_dirs", "type"])

_assert_key_has_value(as_dict, "type", "cloud")

auth_dict = as_dict["auth"]
bucket_dict = as_dict["bucket"]
connection_dict = as_dict["connection"]
cache_dict = as_dict["cache"]

provider = as_dict["provider"]
Expand All @@ -1087,11 +1069,6 @@ def test_config_parse_cloud_noauth_for_aws():
_assert_key_has_value(bucket_dict, "name", "unique_bucket_name_all_lowercase")
_assert_key_has_value(bucket_dict, "use_reduced_redundancy", False)

_assert_key_has_value(connection_dict, "host", None)
_assert_key_has_value(connection_dict, "port", 6000)
_assert_key_has_value(connection_dict, "multipart", True)
_assert_key_has_value(connection_dict, "is_secure", True)

_assert_key_has_value(cache_dict, "size", 1000.0)
_assert_key_has_value(cache_dict, "path", "database/object_store_cache")

Expand Down Expand Up @@ -1266,7 +1243,7 @@ def test_config_parse_azure_no_cache():
assert object_store.staging_path == directory.global_config.object_store_cache_path


def verify_caching_object_store_functionality(tmp_path, object_store):
def verify_caching_object_store_functionality(tmp_path, object_store, check_get_url=True):
# Test no dataset with id 1 exists.
absent_dataset = MockDataset(1)
assert not object_store.exists(absent_dataset)
Expand Down Expand Up @@ -1346,14 +1323,13 @@ def verify_caching_object_store_functionality(tmp_path, object_store):

# Test get_object_url returns a read-only URL
url = object_store.get_object_url(hello_world_dataset)
from requests import get

response = get(url)
response.raise_for_status()
assert response.text == "Hello World!"
if check_get_url:
response = get(url)
response.raise_for_status()
assert response.text == "Hello World!"


def verify_object_store_functionality(tmp_path, object_store):
def verify_object_store_functionality(tmp_path, object_store, check_get_url=True):
# Test no dataset with id 1 exists.
absent_dataset = MockDataset(1)
assert not object_store.exists(absent_dataset)
Expand Down Expand Up @@ -1400,11 +1376,10 @@ def verify_object_store_functionality(tmp_path, object_store):

# Test get_object_url returns a read-only URL
url = object_store.get_object_url(hello_world_dataset)
from requests import get

response = get(url)
response.raise_for_status()
assert response.text == "Hello World!"
if check_get_url:
response = get(url)
response.raise_for_status()
assert response.text == "Hello World!"


AZURE_BLOB_TEMPLATE_TEST_CONFIG_YAML = """
Expand Down Expand Up @@ -1533,6 +1508,115 @@ def test_real_azure_blob_store_in_hierarchical(tmp_path):
verify_object_store_functionality(tmp_path, object_store)


AMAZON_S3_SIMPLE_TEMPLATE_TEST_CONFIG_YAML = """
type: aws_s3
store_by: uuid
auth:
access_key: ${access_key}
secret_key: ${secret_key}
bucket:
name: ${bucket}
connection:
region: ${region}
extra_dirs:
- type: job_work
path: database/job_working_directory_azure
- type: temp
path: database/tmp_azure
"""


@skip_unless_environ("GALAXY_TEST_AWS_ACCESS_KEY")
@skip_unless_environ("GALAXY_TEST_AWS_SECRET_KEY")
@skip_unless_environ("GALAXY_TEST_AWS_BUCKET")
@skip_unless_environ("GALAXY_TEST_AWS_REGION")
def test_real_aws_s3_store(tmp_path):
template_vars = {
"access_key": os.environ["GALAXY_TEST_AWS_ACCESS_KEY"],
"secret_key": os.environ["GALAXY_TEST_AWS_SECRET_KEY"],
"bucket": os.environ["GALAXY_TEST_AWS_BUCKET"],
"region": os.environ["GALAXY_TEST_AWS_REGION"],
}
with TestConfig(AMAZON_S3_SIMPLE_TEMPLATE_TEST_CONFIG_YAML, template_vars=template_vars) as (_, object_store):
verify_caching_object_store_functionality(tmp_path, object_store)


AMAZON_CLOUDBRIDGE_TEMPLATE_TEST_CONFIG_YAML = """
type: cloud
store_by: uuid
provider: aws
auth:
access_key: ${access_key}
secret_key: ${secret_key}
bucket:
name: ${bucket}
extra_dirs:
- type: job_work
path: database/job_working_directory_azure
- type: temp
path: database/tmp_azure
"""


@skip_unless_environ("GALAXY_TEST_AWS_ACCESS_KEY")
@skip_unless_environ("GALAXY_TEST_AWS_SECRET_KEY")
@skip_unless_environ("GALAXY_TEST_AWS_BUCKET")
def test_aws_via_cloudbridge_store(tmp_path):
template_vars = {
"access_key": os.environ["GALAXY_TEST_AWS_ACCESS_KEY"],
"secret_key": os.environ["GALAXY_TEST_AWS_SECRET_KEY"],
"bucket": os.environ["GALAXY_TEST_AWS_BUCKET"],
}
with TestConfig(AMAZON_CLOUDBRIDGE_TEMPLATE_TEST_CONFIG_YAML, template_vars=template_vars) as (_, object_store):
# disabling get_object_url check - cloudbridge in this config assumes the region
# is us-east-1 and generates a URL for that region. This functionality works and can
# be tested if a region is specified in the configuration (see next config and test case).
verify_caching_object_store_functionality(tmp_path, object_store, check_get_url=False)


AMAZON_CLOUDBRIDGE_WITH_REGION_TEMPLATE_TEST_CONFIG_YAML = """
type: cloud
store_by: uuid
provider: aws
auth:
access_key: ${access_key}
secret_key: ${secret_key}
region: ${region}
bucket:
name: ${bucket}
extra_dirs:
- type: job_work
path: database/job_working_directory_azure
- type: temp
path: database/tmp_azure
"""


@skip_unless_environ("GALAXY_TEST_AWS_ACCESS_KEY")
@skip_unless_environ("GALAXY_TEST_AWS_SECRET_KEY")
@skip_unless_environ("GALAXY_TEST_AWS_BUCKET")
@skip_unless_environ("GALAXY_TEST_AWS_REGION")
def test_aws_via_cloudbridge_store_with_region(tmp_path):
template_vars = {
"access_key": os.environ["GALAXY_TEST_AWS_ACCESS_KEY"],
"secret_key": os.environ["GALAXY_TEST_AWS_SECRET_KEY"],
"bucket": os.environ["GALAXY_TEST_AWS_BUCKET"],
"region": os.environ["GALAXY_TEST_AWS_REGION"],
}
with TestConfig(AMAZON_CLOUDBRIDGE_WITH_REGION_TEMPLATE_TEST_CONFIG_YAML, template_vars=template_vars) as (
_,
object_store,
):
verify_caching_object_store_functionality(tmp_path, object_store)


class MockDataset:
def __init__(self, id):
self.id = id
Expand Down

0 comments on commit 0910741

Please sign in to comment.