Skip to content

Commit

Permalink
feat(ingestion/dbt): multiple node owner separated by comma (#9740)
Browse files Browse the repository at this point in the history
Co-authored-by: Aseem Bansal <[email protected]>
  • Loading branch information
sid-acryl and anshbansal authored Jan 31, 2024
1 parent 54f8550 commit ad2df22
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 17 deletions.
75 changes: 59 additions & 16 deletions metadata-ingestion/src/datahub/utilities/mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,23 @@ def _get_best_match(the_match: Match, group_name: str) -> str:
return the_match.group(0)


def _make_owner_category_list(
owner_type: OwnerType,
owner_category: Any,
owner_category_urn: Optional[str],
owner_ids: List[str],
) -> List[Dict]:

return [
{
"urn": mce_builder.make_owner_urn(owner_id, owner_type),
"category": owner_category,
"categoryUrn": owner_category_urn,
}
for owner_id in owner_ids
]


_match_regexp = re.compile(r"{{\s*\$match\s*}}", flags=re.MULTILINE)


Expand Down Expand Up @@ -149,13 +166,26 @@ def process(self, raw_props: Mapping[str, Any]) -> Dict[str, Any]:
operation = self.get_operation_value(
operation_key, operation_type, operation_config, maybe_match
)

if operation_type == Constants.ADD_TERMS_OPERATION:
# add_terms operation is a special case where the operation value is a list of terms.
# We want to aggregate these values with the add_term operation.
operation_type = Constants.ADD_TERM_OPERATION

if operation:
if isinstance(operation, (str, list)):
if (
isinstance(operation, list)
and operation_type == Constants.ADD_OWNER_OPERATION
):
operation_value_list = operations_map.get(
operation_type, list()
)
cast(List, operation_value_list).extend(
operation
) # cast to silent the lint
operations_map[operation_type] = operation_value_list

elif isinstance(operation, (str, list)):
operations_value_set = operations_map.get(
operation_type, set()
)
Expand Down Expand Up @@ -184,8 +214,11 @@ def convert_to_aspects(
tag_aspect = mce_builder.make_global_tag_aspect_with_tag_list(
sorted(operation_map[Constants.ADD_TAG_OPERATION])
)

aspect_map[Constants.ADD_TAG_OPERATION] = tag_aspect

if Constants.ADD_OWNER_OPERATION in operation_map:

owner_aspect = OwnershipClass(
owners=[
OwnerClass(
Expand All @@ -202,6 +235,7 @@ def convert_to_aspects(
)
]
)

aspect_map[Constants.ADD_OWNER_OPERATION] = owner_aspect

if Constants.ADD_TERM_OPERATION in operation_map:
Expand Down Expand Up @@ -262,7 +296,7 @@ def get_operation_value(
operation_type: str,
operation_config: Dict,
match: Match,
) -> Optional[Union[str, Dict, List[str]]]:
) -> Optional[Union[str, Dict, List[str], List[Dict]]]:
if (
operation_type == Constants.ADD_TAG_OPERATION
and operation_config[Constants.TAG]
Expand All @@ -278,30 +312,39 @@ def get_operation_value(
and operation_config[Constants.OWNER_TYPE]
):
owner_id = _get_best_match(match, "owner")

owner_ids: List[str] = [_id.strip() for _id in owner_id.split(",")]

owner_category = (
operation_config.get(Constants.OWNER_CATEGORY)
or OwnershipTypeClass.DATAOWNER
)
owner_category_urn = None
owner_category_urn: Optional[str] = None
if owner_category.startswith("urn:li:"):
owner_category_urn = owner_category
owner_category = OwnershipTypeClass.DATAOWNER
else:
owner_category = owner_category.upper()

if self.strip_owner_email_id:
owner_id = self.sanitize_owner_ids(owner_id)
if operation_config[Constants.OWNER_TYPE] == Constants.USER_OWNER:
return {
"urn": mce_builder.make_owner_urn(owner_id, OwnerType.USER),
"category": owner_category,
"categoryUrn": owner_category_urn,
}
elif operation_config[Constants.OWNER_TYPE] == Constants.GROUP_OWNER:
return {
"urn": mce_builder.make_owner_urn(owner_id, OwnerType.GROUP),
"category": owner_category,
"categoryUrn": owner_category_urn,
}
owner_ids = [
self.sanitize_owner_ids(owner_id) for owner_id in owner_ids
]

owner_type_mapping: Dict[str, OwnerType] = {
Constants.USER_OWNER: OwnerType.USER,
Constants.GROUP_OWNER: OwnerType.GROUP,
}
if operation_config[Constants.OWNER_TYPE] in owner_type_mapping:
return _make_owner_category_list(
owner_ids=owner_ids,
owner_category=owner_category,
owner_category_urn=owner_category_urn,
owner_type=owner_type_mapping[
operation_config[Constants.OWNER_TYPE]
],
)

elif (
operation_type == Constants.ADD_TERM_OPERATION
and operation_config[Constants.TERM]
Expand Down
11 changes: 10 additions & 1 deletion metadata-ingestion/tests/unit/test_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ def get_operation_defs() -> Dict[str, Any]:
"operation": "add_owner",
"config": {"owner_type": "user"},
},
"multi_user": {
"match": ".*",
"operation": "add_owner",
"config": {"owner_type": "user"},
},
"group.owner": {
"match": ".*",
"operation": "add_owner",
Expand Down Expand Up @@ -78,6 +83,7 @@ def test_operation_processor_not_matching():
def test_operation_processor_matching():
raw_props = {
"user_owner": "[email protected]",
"multi_user": "[email protected], [email protected]",
"user_owner_2": "test_user_2",
"group.owner": "[email protected]",
"governance.team_owner": "Finance",
Expand All @@ -86,6 +92,7 @@ def test_operation_processor_matching():
"double_property": 2.5,
"tag": "Finance",
}

processor = OperationProcessor(
operation_defs=get_operation_defs(),
owner_source_type="SOURCE_CONTROL",
Expand Down Expand Up @@ -116,11 +123,13 @@ def test_operation_processor_matching():
)

ownership_aspect: OwnershipClass = aspect_map["add_owner"]
assert len(ownership_aspect.owners) == 3
assert len(ownership_aspect.owners) == 5
owner_set = {
"urn:li:corpuser:test_user",
"urn:li:corpuser:test_user_2",
"urn:li:corpGroup:test.group",
"urn:li:corpuser:sales_member1",
"urn:li:corpuser:sales_member2",
}
for single_owner in ownership_aspect.owners:
assert single_owner.owner in owner_set
Expand Down

0 comments on commit ad2df22

Please sign in to comment.