Skip to content

Commit

Permalink
feat: lf_inherited_tags config to exclude certain tags from CRUD (#453)
Browse files Browse the repository at this point in the history
Co-authored-by: Serhii Dimchenko <[email protected]>
  • Loading branch information
dacreify and svdimchenko authored Oct 17, 2023
1 parent 53f067f commit 502939b
Show file tree
Hide file tree
Showing 9 changed files with 151 additions and 32 deletions.
38 changes: 30 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -259,17 +259,39 @@ athena:
}
```

> Notes:
* `lf_inherited_tags` (`default=none`)
* List of Lake Formation tag keys that are intended to be inherited from the database level and thus shouldn't be
removed during association of those defined in `lf_tags_config`
* i.e. The default behavior of `lf_tags_config` is to be exhaustive and first remove any pre-existing tags from
tables and columns before associating the ones currently defined for a given model
* This breaks tag inheritance as inherited tags appear on tables and columns like those associated directly
* This list sits outside of `lf_tags_config` so that it can be set at the project level -- for example:

```yaml
models:
my_project:
example:
+lf_inherited_tags: [inherited-tag-1, inherited-tag-2]
```

> Notes:
>
> * `lf_tags` and `lf_tags_columns` configs support only attaching lf tags to corresponding resources.
> We recommend managing LF Tags permissions somewhere outside dbt. For example, you may use
> [terraform](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/lakeformation_permissions)
or
> [aws cdk](https://docs.aws.amazon.com/cdk/api/v1/docs/aws-lakeformation-readme.html) for such purpose.
> We recommend managing LF Tags permissions somewhere outside dbt. For example, you may use
> [terraform](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/lakeformation_permissions) or
> [aws cdk](https://docs.aws.amazon.com/cdk/api/v1/docs/aws-lakeformation-readme.html) for such purpose.
> * `data_cell_filters` management can't be automated outside dbt because the filter can't be attached to the table
> which doesn't exist. Once you `enable` this config, dbt will set all filters and their permissions during every
> dbt run. Such approach keeps the actual state of row level security configuration actual after every dbt run and
> apply changes if they occur: drop, create, update filters and their permissions.
> which doesn't exist. Once you `enable` this config, dbt will set all filters and their permissions during every
> dbt run. Such approach keeps the actual state of row level security configuration actual after every dbt run and
> apply changes if they occur: drop, create, update filters and their permissions.
> * Any tags listed in `lf_inherited_tags` should be strictly inherited from the database level and never overridden at
the table and column level
> * Currently `dbt-athena` does not differentiate between an inherited tag association and an override of same it made
> previously
> * e.g. If an inherited tag is overridden by an `lf_tags_config` value in one DBT run, and that override is removed
prior to a subsequent run, the prior override will linger and no longer be encoded anywhere (in e.g. Terraform
where the inherited value is configured nor in the DBT project where the override previously existed but now is
gone)

[create-table-as]: https://docs.aws.amazon.com/athena/latest/ug/create-table-as.html#ctas-table-properties

Expand Down
6 changes: 4 additions & 2 deletions dbt/adapters/athena/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,10 @@ def add_lf_tags_to_database(self, relation: AthenaRelation) -> None:
LOGGER.debug(f"Lakeformation is disabled for {relation}")

@available
def add_lf_tags(self, relation: AthenaRelation, lf_tags_config: Dict[str, Any]) -> None:
config = LfTagsConfig(**lf_tags_config)
def add_lf_tags(
self, relation: AthenaRelation, lf_tags_config: Dict[str, Any], lf_inherited_tags: Optional[List[str]]
) -> None:
config = LfTagsConfig(**(lf_tags_config | dict(inherited_tags=lf_inherited_tags)))
if config.enabled:
conn = self.connections.get_thread_connection()
client = conn.handle
Expand Down
56 changes: 39 additions & 17 deletions dbt/adapters/athena/lakeformation.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
"""AWS Lakeformation permissions management helper utilities."""

from typing import Dict, List, Optional, Union
from typing import Dict, List, Optional, Sequence, Set, Union

from mypy_boto3_lakeformation import LakeFormationClient
from mypy_boto3_lakeformation.type_defs import (
AddLFTagsToResourceResponseTypeDef,
BatchPermissionsRequestEntryTypeDef,
ColumnLFTagTypeDef,
DataCellsFilterTypeDef,
GetResourceLFTagsResponseTypeDef,
LFTagPairTypeDef,
RemoveLFTagsFromResourceResponseTypeDef,
ResourceTypeDef,
)
Expand All @@ -24,6 +26,7 @@ class LfTagsConfig(BaseModel):
enabled: bool = False
tags: Optional[Dict[str, str]] = None
tags_columns: Optional[Dict[str, Dict[str, List[str]]]] = None
inherited_tags: List[str] = []


class LfTagsManager:
Expand All @@ -33,6 +36,7 @@ def __init__(self, lf_client: LakeFormationClient, relation: AthenaRelation, lf_
self.table = relation.identifier
self.lf_tags = lf_tags_config.tags
self.lf_tags_columns = lf_tags_config.tags_columns
self.lf_inherited_tags = set(lf_tags_config.inherited_tags)

def process_lf_tags_database(self) -> None:
if self.lf_tags:
Expand All @@ -49,21 +53,31 @@ def process_lf_tags(self) -> None:
self._apply_lf_tags_table(table_resource, existing_lf_tags)
self._apply_lf_tags_columns()

@staticmethod
def _column_tags_to_remove(
lf_tags_columns: List[ColumnLFTagTypeDef], lf_inherited_tags: Set[str]
) -> Dict[str, Dict[str, List[str]]]:
to_remove = {}

for column in lf_tags_columns:
non_inherited_tags = [tag for tag in column["LFTags"] if not tag["TagKey"] in lf_inherited_tags]
for tag in non_inherited_tags:
tag_key = tag["TagKey"]
tag_value = tag["TagValues"][0]
if tag_key not in to_remove:
to_remove[tag_key] = {tag_value: [column["Name"]]}
elif tag_value not in to_remove[tag_key]:
to_remove[tag_key][tag_value] = [column["Name"]]
else:
to_remove[tag_key][tag_value].append(column["Name"])

return to_remove

def _remove_lf_tags_columns(self, existing_lf_tags: GetResourceLFTagsResponseTypeDef) -> None:
lf_tags_columns = existing_lf_tags.get("LFTagsOnColumns", [])
logger.debug(f"COLUMNS: {lf_tags_columns}")
if lf_tags_columns:
to_remove = {}
for column in lf_tags_columns:
for tag in column["LFTags"]:
tag_key = tag["TagKey"]
tag_value = tag["TagValues"][0]
if tag_key not in to_remove:
to_remove[tag_key] = {tag_value: [column["Name"]]}
elif tag_value not in to_remove[tag_key]:
to_remove[tag_key][tag_value] = [column["Name"]]
else:
to_remove[tag_key][tag_value].append(column["Name"])
to_remove = LfTagsManager._column_tags_to_remove(lf_tags_columns, self.lf_inherited_tags)
logger.debug(f"TO REMOVE: {to_remove}")
for tag_key, tag_config in to_remove.items():
for tag_value, columns in tag_config.items():
Expand All @@ -75,18 +89,26 @@ def _remove_lf_tags_columns(self, existing_lf_tags: GetResourceLFTagsResponseTyp
)
self._parse_and_log_lf_response(response, columns, {tag_key: tag_value}, "remove")

@staticmethod
def _table_tags_to_remove(
lf_tags_table: List[LFTagPairTypeDef], lf_tags: Optional[Dict[str, str]], lf_inherited_tags: Set[str]
) -> Dict[str, Sequence[str]]:
return {
tag["TagKey"]: tag["TagValues"]
for tag in lf_tags_table
if tag["TagKey"] not in (lf_tags or {})
if tag["TagKey"] not in lf_inherited_tags
}

def _apply_lf_tags_table(
self, table_resource: ResourceTypeDef, existing_lf_tags: GetResourceLFTagsResponseTypeDef
) -> None:
lf_tags_table = existing_lf_tags.get("LFTagsOnTable", [])
logger.debug(f"EXISTING TABLE TAGS: {lf_tags_table}")
logger.debug(f"CONFIG TAGS: {self.lf_tags}")

to_remove = {
tag["TagKey"]: tag["TagValues"]
for tag in lf_tags_table
if tag["TagKey"] not in self.lf_tags # type: ignore
}
to_remove = LfTagsManager._table_tags_to_remove(lf_tags_table, self.lf_tags, self.lf_inherited_tags)

logger.debug(f"TAGS TO REMOVE: {to_remove}")
if to_remove:
response = self.lf_client.remove_lf_tags_from_resource(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
{% set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') %}

{% set lf_tags_config = config.get('lf_tags_config') %}
{% set lf_inherited_tags = config.get('lf_inherited_tags') %}
{% set lf_grants = config.get('lf_grants') %}
{% set partitioned_by = config.get('partitioned_by') %}
{% set target_relation = this.incorporate(type='table') %}
Expand Down Expand Up @@ -106,7 +107,7 @@
{{ run_hooks(post_hooks, inside_transaction=False) }}

{% if lf_tags_config is not none %}
{{ adapter.add_lf_tags(target_relation, lf_tags_config) }}
{{ adapter.add_lf_tags(target_relation, lf_tags_config, lf_inherited_tags) }}
{% endif %}

{% if lf_grants is not none %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
{%- set identifier = model['alias'] -%}

{%- set lf_tags_config = config.get('lf_tags_config') -%}
{%- set lf_inherited_tags = config.get('lf_inherited_tags') -%}
{%- set lf_grants = config.get('lf_grants') -%}

{%- set table_type = config.get('table_type', default='hive') | lower -%}
Expand Down Expand Up @@ -111,7 +112,7 @@
{{ run_hooks(post_hooks) }}

{% if lf_tags_config is not none %}
{{ adapter.add_lf_tags(target_relation, lf_tags_config) }}
{{ adapter.add_lf_tags(target_relation, lf_tags_config, lf_inherited_tags) }}
{% endif %}

{% if lf_grants is not none %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
{%- set identifier = model['alias'] -%}

{%- set lf_tags_config = config.get('lf_tags_config') -%}
{%- set lf_inherited_tags = config.get('lf_inherited_tags') -%}
{%- set lf_grants = config.get('lf_grants') -%}

{%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%}
Expand Down Expand Up @@ -32,7 +33,7 @@
{%- endcall %}

{% if lf_tags_config is not none %}
{{ adapter.add_lf_tags(target_relation, lf_tags_config) }}
{{ adapter.add_lf_tags(target_relation, lf_tags_config, lf_inherited_tags) }}
{% endif %}

{% if lf_grants is not none %}
Expand Down
3 changes: 2 additions & 1 deletion dbt/include/athena/macros/materializations/seeds/helpers.sql
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
{%- set identifier = model['alias'] -%}

{%- set lf_tags_config = config.get('lf_tags_config') -%}
{%- set lf_inherited_tags = config.get('lf_inherited_tags') -%}
{%- set lf_grants = config.get('lf_grants') -%}

{%- set column_override = config.get('column_types', {}) -%}
Expand Down Expand Up @@ -179,7 +180,7 @@
{% do adapter.delete_from_s3(tmp_s3_location) %}

{% if lf_tags_config is not none %}
{{ adapter.add_lf_tags(relation, lf_tags_config) }}
{{ adapter.add_lf_tags(relation, lf_tags_config, lf_inherited_tags) }}
{% endif %}

{% if lf_grants is not none %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@
{%- set table_type = config.get('table_type', 'hive') -%}

{%- set lf_tags_config = config.get('lf_tags_config') -%}
{%- set lf_inherited_tags = config.get('lf_inherited_tags') -%}
{%- set lf_grants = config.get('lf_grants') -%}

{{ log('Checking if target table exists') }}
Expand Down Expand Up @@ -230,7 +231,7 @@
{{ run_hooks(post_hooks, inside_transaction=False) }}

{% if lf_tags_config is not none %}
{{ adapter.add_lf_tags(target_relation, lf_tags_config) }}
{{ adapter.add_lf_tags(target_relation, lf_tags_config, lf_inherited_tags) }}
{% endif %}

{% if lf_grants is not none %}
Expand Down
68 changes: 68 additions & 0 deletions tests/unit/test_lakeformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import pytest
from tests.unit.constants import AWS_REGION, DATA_CATALOG_NAME, DATABASE_NAME

import dbt.adapters.athena.lakeformation as lakeformation
from dbt.adapters.athena.lakeformation import LfTagsConfig, LfTagsManager
from dbt.adapters.athena.relation import AthenaRelation

Expand Down Expand Up @@ -74,3 +75,70 @@ def test__parse_lf_response(self, dbt_debug_caplog, response, identifier, column
manager = LfTagsManager(lf_client, relation, LfTagsConfig())
manager._parse_and_log_lf_response(response, columns, lf_tags, verb)
assert expected in dbt_debug_caplog.getvalue()

@pytest.mark.parametrize(
"lf_tags_columns,lf_inherited_tags,expected",
[
pytest.param(
[{"Name": "my_column", "LFTags": [{"TagKey": "inherited", "TagValues": ["oh-yes-i-am"]}]}],
{"inherited"},
{},
id="retains-inherited-tag",
),
pytest.param(
[{"Name": "my_column", "LFTags": [{"TagKey": "not-inherited", "TagValues": ["oh-no-im-not"]}]}],
{},
{"not-inherited": {"oh-no-im-not": ["my_column"]}},
id="removes-non-inherited-tag",
),
pytest.param(
[
{
"Name": "my_column",
"LFTags": [
{"TagKey": "not-inherited", "TagValues": ["oh-no-im-not"]},
{"TagKey": "inherited", "TagValues": ["oh-yes-i-am"]},
],
}
],
{"inherited"},
{"not-inherited": {"oh-no-im-not": ["my_column"]}},
id="removes-non-inherited-tag-among-inherited",
),
pytest.param([], {}, {}, id="handles-empty"),
],
)
def test__column_tags_to_remove(self, lf_tags_columns, lf_inherited_tags, expected):
assert lakeformation.LfTagsManager._column_tags_to_remove(lf_tags_columns, lf_inherited_tags) == expected

@pytest.mark.parametrize(
"lf_tags_table,lf_tags,lf_inherited_tags,expected",
[
pytest.param(
[
{"TagKey": "not-inherited", "TagValues": ["oh-no-im-not"]},
{"TagKey": "inherited", "TagValues": ["oh-yes-i-am"]},
],
{"not-inherited": "some-preexisting-value"},
{"inherited"},
{},
id="retains-being-set-and-inherited",
),
pytest.param(
[
{"TagKey": "not-inherited", "TagValues": ["oh-no-im-not"]},
{"TagKey": "inherited", "TagValues": ["oh-yes-i-am"]},
],
{},
{"inherited"},
{"not-inherited": ["oh-no-im-not"]},
id="removes-preexisting-not-being-set",
),
pytest.param(
[{"TagKey": "inherited", "TagValues": ["oh-yes-i-am"]}], {}, {"inherited"}, {}, id="retains-inherited"
),
pytest.param([], None, {}, {}, id="handles-empty"),
],
)
def test__table_tags_to_remove(self, lf_tags_table, lf_tags, lf_inherited_tags, expected):
assert lakeformation.LfTagsManager._table_tags_to_remove(lf_tags_table, lf_tags, lf_inherited_tags) == expected

0 comments on commit 502939b

Please sign in to comment.