From 93a4a53e3490a0bc763ab739fde4a4c92dea7987 Mon Sep 17 00:00:00 2001 From: Enos Date: Wed, 27 Nov 2024 16:35:53 -0500 Subject: [PATCH 1/6] split ownership pr --- .../src/datahub/ingestion/source/superset.py | 98 +++++++++++++++++++ 1 file changed, 98 insertions(+) diff --git a/metadata-ingestion/src/datahub/ingestion/source/superset.py b/metadata-ingestion/src/datahub/ingestion/source/superset.py index 1da233bf0b22a..6ac93fe82e3ad 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/superset.py +++ b/metadata-ingestion/src/datahub/ingestion/source/superset.py @@ -22,6 +22,7 @@ make_dataset_urn, make_dataset_urn_with_platform_instance, make_domain_urn, + make_user_urn, ) from datahub.emitter.mcp_builder import add_domain_to_entity_wu from datahub.ingestion.api.common import PipelineContext @@ -72,6 +73,9 @@ ChartTypeClass, DashboardInfoClass, DatasetPropertiesClass, + OwnershipClass, + OwnerClass, + OwnershipTypeClass, ) from datahub.utilities import config_clean from datahub.utilities.registries.domain_registry import DomainRegistry @@ -232,6 +236,7 @@ def __init__(self, ctx: PipelineContext, config: SupersetConfig): graph=self.ctx.graph, ) self.session = self.login() + self.owners_dict = self.build_preset_owner_dict() def login(self) -> requests.Session: login_response = requests.post( @@ -354,6 +359,58 @@ def get_datasource_urn_from_id( env=self.config.env, ) raise ValueError("Could not construct dataset URN") + + def parse_owner_payload(self, payload: dict, owners_dict: dict) -> None: + for owner_data in payload.get("result", []): + email = owner_data.get("extra", {}).get("email") + value = owner_data.get("value") + + if value and email and value not in owners_dict: + owners_dict[value] = email + + def build_preset_owner_dict(self) -> Dict[str, str]: + owners_dict = {} + dataset_payload = self.get_all_entity_owners("dataset") + chart_payload = self.get_all_entity_owners("chart") + dashboard_payload = self.get_all_entity_owners("dashboard") + + owners_dict.update(self.parse_owner_payload(dataset_payload)) + owners_dict.update(self.parse_owner_payload(chart_payload)) + owners_dict.update(self.parse_owner_payload(dashboard_payload)) + + return owners_dict + + def build_owners_urn_list(self, data: dict) -> List[str]: + owners_urn_list = [] + for owner in data.get("owners", []): + owner_id = owner.get("id") + owner_email = self.owners_dict.get(owner_id) + if owner_email is not None: + owners_urn = make_user_urn(owner_email) + owners_urn_list.append(owners_urn) + return owners_urn_list + + def get_all_entity_owners(self, entity: str) -> Iterable[Dict]: + current_page = 1 + total_owners = PAGE_SIZE + all_owners = [] + + while (current_page - 1) * PAGE_SIZE <= total_owners: + full_owners_response = self.session.get( + f"{self.config.connect_uri}/api/v1/{entity}/related/owners", + params=f"q=(page:{current_page},page_size:{PAGE_SIZE})", + ) + if full_owners_response.status_code != 200: + logger.warning(f"Failed to get {entity} data: {full_owners_response.text}") + full_owners_response.raise_for_status() + + payload = full_owners_response.json() + total_owners = payload.get("count", total_owners) + all_owners.extend(payload.get("result", [])) + current_page += 1 + + #return combined payload + return {"result": all_owners, "count": total_owners} def construct_dashboard_from_api_data( self, dashboard_data: dict @@ -427,6 +484,20 @@ def construct_dashboard_from_api_data( customProperties=custom_properties, ) dashboard_snapshot.aspects.append(dashboard_info) + + dashboard_owners_list = self.build_owners_urn_list(dashboard_data) + owners_info = OwnershipClass( + owners=[ + OwnerClass( + owner=urn, + #default as Technical Owners from Preset + type=OwnershipTypeClass.TECHNICAL_OWNER, + ) + for urn in (dashboard_owners_list or []) + ], + ) + dashboard_snapshot.aspects.append(owners_info) + return dashboard_snapshot def emit_dashboard_mces(self) -> Iterable[MetadataWorkUnit]: @@ -526,6 +597,20 @@ def construct_chart_from_chart_data(self, chart_data: dict) -> ChartSnapshot: customProperties=custom_properties, ) chart_snapshot.aspects.append(chart_info) + + chart_owners_list = self.build_owners_urn_list(chart_data) + owners_info = OwnershipClass( + owners=[ + OwnerClass( + owner=urn, + #default as Technical Owners from Preset + type=OwnershipTypeClass.TECHNICAL_OWNER, + ) + for urn in (chart_owners_list or []) + ], + ) + chart_snapshot.aspects.append(owners_info) + return chart_snapshot def emit_chart_mces(self) -> Iterable[MetadataWorkUnit]: @@ -615,6 +700,19 @@ def construct_dataset_from_dataset_data( ] ) + dataset_owners_list = self.build_owners_urn_list(dataset_data) + owners_info = OwnershipClass( + owners=[ + OwnerClass( + owner=urn, + #default as Technical Owners from Preset + type=OwnershipTypeClass.TECHNICAL_OWNER, + ) + for urn in (dataset_owners_list or []) + ], + ) + aspects_items.append(owners_info) + dataset_snapshot = DatasetSnapshot( urn=datasource_urn, aspects=aspects_items, From 66b22579ee8b88e4bca667d4a0226579712c31e5 Mon Sep 17 00:00:00 2001 From: Enos Date: Fri, 13 Dec 2024 16:02:05 -0500 Subject: [PATCH 2/6] parseowner fix - not working --- .../src/datahub/ingestion/source/superset.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/superset.py b/metadata-ingestion/src/datahub/ingestion/source/superset.py index 6ac93fe82e3ad..5dc6d67d539f5 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/superset.py +++ b/metadata-ingestion/src/datahub/ingestion/source/superset.py @@ -360,14 +360,16 @@ def get_datasource_urn_from_id( ) raise ValueError("Could not construct dataset URN") - def parse_owner_payload(self, payload: dict, owners_dict: dict) -> None: + def parse_owner_payload(self, payload: dict) -> dict: + owners_dict = {} for owner_data in payload.get("result", []): email = owner_data.get("extra", {}).get("email") value = owner_data.get("value") - if value and email and value not in owners_dict: + if value and email: owners_dict[value] = email - + return owners_dict + def build_preset_owner_dict(self) -> Dict[str, str]: owners_dict = {} dataset_payload = self.get_all_entity_owners("dataset") From b084b7b079c5efbd2306008482b9dc6682a0ab70 Mon Sep 17 00:00:00 2001 From: Shuixi Li Date: Mon, 16 Dec 2024 14:45:46 -0500 Subject: [PATCH 3/6] skip fetching for dataset when datasource_id is null --- .../src/datahub/ingestion/source/superset.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/superset.py b/metadata-ingestion/src/datahub/ingestion/source/superset.py index 5dc6d67d539f5..7a5099125bbac 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/superset.py +++ b/metadata-ingestion/src/datahub/ingestion/source/superset.py @@ -547,10 +547,13 @@ def construct_chart_from_chart_data(self, chart_data: dict) -> ChartSnapshot: chart_url = f"{self.config.display_uri}{chart_data.get('url', '')}" datasource_id = chart_data.get("datasource_id") - dataset_response = self.get_dataset_info(datasource_id) - datasource_urn = self.get_datasource_urn_from_id( - dataset_response, self.platform - ) + if not datasource_id: + logger.warning(f'chart {chart_data["id"]} has no datasource_id, skipping fetching dataset info') + else: + dataset_response = self.get_dataset_info(datasource_id) + datasource_urn = self.get_datasource_urn_from_id( + dataset_response, self.platform + ) params = json.loads(chart_data.get("params", "{}")) metrics = [ From aca7830023ac76bf298633546ff7edb30b52ddb7 Mon Sep 17 00:00:00 2001 From: Shuixi Li Date: Mon, 16 Dec 2024 14:54:31 -0500 Subject: [PATCH 4/6] fix for local variable 'datasource_urn' referenced before assignment --- .../src/datahub/ingestion/source/superset.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/superset.py b/metadata-ingestion/src/datahub/ingestion/source/superset.py index 7a5099125bbac..14e7f7657bb67 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/superset.py +++ b/metadata-ingestion/src/datahub/ingestion/source/superset.py @@ -548,12 +548,13 @@ def construct_chart_from_chart_data(self, chart_data: dict) -> ChartSnapshot: datasource_id = chart_data.get("datasource_id") if not datasource_id: - logger.warning(f'chart {chart_data["id"]} has no datasource_id, skipping fetching dataset info') + logger.debug(f'chart {chart_data["id"]} has no datasource_id, skipping fetching dataset info') + datasource_urn = None else: dataset_response = self.get_dataset_info(datasource_id) - datasource_urn = self.get_datasource_urn_from_id( + datasource_urn = list(self.get_datasource_urn_from_id( dataset_response, self.platform - ) + )) params = json.loads(chart_data.get("params", "{}")) metrics = [ @@ -598,7 +599,7 @@ def construct_chart_from_chart_data(self, chart_data: dict) -> ChartSnapshot: title=title, lastModified=last_modified, chartUrl=chart_url, - inputs=[datasource_urn] if datasource_urn else None, + inputs=datasource_urn, customProperties=custom_properties, ) chart_snapshot.aspects.append(chart_info) From 2bceb4666992dff01cbfa54b19f794e43dfdf76a Mon Sep 17 00:00:00 2001 From: Shuixi Li Date: Tue, 17 Dec 2024 11:33:20 -0500 Subject: [PATCH 5/6] lint --- .../src/datahub/ingestion/source/preset.py | 2 - .../src/datahub/ingestion/source/superset.py | 48 +++++++++---------- 2 files changed, 24 insertions(+), 26 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/preset.py b/metadata-ingestion/src/datahub/ingestion/source/preset.py index 7b0bc89648c52..322a3b42339d1 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/preset.py +++ b/metadata-ingestion/src/datahub/ingestion/source/preset.py @@ -80,8 +80,6 @@ class PresetSource(SupersetSource): platform = "preset" def __init__(self, ctx: PipelineContext, config: PresetConfig): - logger.info(f"ctx is {ctx}") - super().__init__(ctx, config) self.config = config self.report = StaleEntityRemovalSourceReport() diff --git a/metadata-ingestion/src/datahub/ingestion/source/superset.py b/metadata-ingestion/src/datahub/ingestion/source/superset.py index 14e7f7657bb67..d3457863fc89d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/superset.py +++ b/metadata-ingestion/src/datahub/ingestion/source/superset.py @@ -73,8 +73,8 @@ ChartTypeClass, DashboardInfoClass, DatasetPropertiesClass, - OwnershipClass, OwnerClass, + OwnershipClass, OwnershipTypeClass, ) from datahub.utilities import config_clean @@ -359,18 +359,18 @@ def get_datasource_urn_from_id( env=self.config.env, ) raise ValueError("Could not construct dataset URN") - - def parse_owner_payload(self, payload: dict) -> dict: + + def parse_owner_payload(self, payload: Dict[str, Any]) -> Dict[str, str]: owners_dict = {} - for owner_data in payload.get("result", []): - email = owner_data.get("extra", {}).get("email") - value = owner_data.get("value") + for owner_data in payload["result"]: + email = owner_data["extra"]["email"] + value = owner_data["value"] if value and email: owners_dict[value] = email return owners_dict - - def build_preset_owner_dict(self) -> Dict[str, str]: + + def build_preset_owner_dict(self) -> Dict[str, str]: owners_dict = {} dataset_payload = self.get_all_entity_owners("dataset") chart_payload = self.get_all_entity_owners("chart") @@ -379,10 +379,9 @@ def build_preset_owner_dict(self) -> Dict[str, str]: owners_dict.update(self.parse_owner_payload(dataset_payload)) owners_dict.update(self.parse_owner_payload(chart_payload)) owners_dict.update(self.parse_owner_payload(dashboard_payload)) - return owners_dict - - def build_owners_urn_list(self, data: dict) -> List[str]: + + def build_owners_urn_list(self, data: Dict[str, Any]) -> List[str]: owners_urn_list = [] for owner in data.get("owners", []): owner_id = owner.get("id") @@ -391,8 +390,8 @@ def build_owners_urn_list(self, data: dict) -> List[str]: owners_urn = make_user_urn(owner_email) owners_urn_list.append(owners_urn) return owners_urn_list - - def get_all_entity_owners(self, entity: str) -> Iterable[Dict]: + + def get_all_entity_owners(self, entity: str) -> Dict[str, Any]: current_page = 1 total_owners = PAGE_SIZE all_owners = [] @@ -403,15 +402,16 @@ def get_all_entity_owners(self, entity: str) -> Iterable[Dict]: params=f"q=(page:{current_page},page_size:{PAGE_SIZE})", ) if full_owners_response.status_code != 200: - logger.warning(f"Failed to get {entity} data: {full_owners_response.text}") + logger.warning( + f"Failed to get {entity} data: {full_owners_response.text}" + ) full_owners_response.raise_for_status() payload = full_owners_response.json() total_owners = payload.get("count", total_owners) all_owners.extend(payload.get("result", [])) current_page += 1 - - #return combined payload + # return combined payload return {"result": all_owners, "count": total_owners} def construct_dashboard_from_api_data( @@ -492,7 +492,7 @@ def construct_dashboard_from_api_data( owners=[ OwnerClass( owner=urn, - #default as Technical Owners from Preset + # default as Technical Owners from Preset type=OwnershipTypeClass.TECHNICAL_OWNER, ) for urn in (dashboard_owners_list or []) @@ -548,13 +548,14 @@ def construct_chart_from_chart_data(self, chart_data: dict) -> ChartSnapshot: datasource_id = chart_data.get("datasource_id") if not datasource_id: - logger.debug(f'chart {chart_data["id"]} has no datasource_id, skipping fetching dataset info') + logger.debug( + f'chart {chart_data["id"]} has no datasource_id, skipping fetching dataset info' + ) datasource_urn = None else: dataset_response = self.get_dataset_info(datasource_id) - datasource_urn = list(self.get_datasource_urn_from_id( - dataset_response, self.platform - )) + ds_urn = self.get_datasource_urn_from_id(dataset_response, self.platform) + datasource_urn = [ds_urn] if not isinstance(ds_urn, list) else ds_urn params = json.loads(chart_data.get("params", "{}")) metrics = [ @@ -609,7 +610,7 @@ def construct_chart_from_chart_data(self, chart_data: dict) -> ChartSnapshot: owners=[ OwnerClass( owner=urn, - #default as Technical Owners from Preset + # default as Technical Owners from Preset type=OwnershipTypeClass.TECHNICAL_OWNER, ) for urn in (chart_owners_list or []) @@ -711,14 +712,13 @@ def construct_dataset_from_dataset_data( owners=[ OwnerClass( owner=urn, - #default as Technical Owners from Preset + # default as Technical Owners from Preset type=OwnershipTypeClass.TECHNICAL_OWNER, ) for urn in (dataset_owners_list or []) ], ) aspects_items.append(owners_info) - dataset_snapshot = DatasetSnapshot( urn=datasource_urn, aspects=aspects_items, From 44199965db3014a0c7ca2872bd26e2daa8c36bb6 Mon Sep 17 00:00:00 2001 From: Enos Date: Wed, 18 Dec 2024 15:06:36 -0500 Subject: [PATCH 6/6] review comments --- .../src/datahub/ingestion/source/superset.py | 39 ++++++++++--------- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/superset.py b/metadata-ingestion/src/datahub/ingestion/source/superset.py index d3457863fc89d..17b95fc11b5b2 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/superset.py +++ b/metadata-ingestion/src/datahub/ingestion/source/superset.py @@ -236,7 +236,7 @@ def __init__(self, ctx: PipelineContext, config: SupersetConfig): graph=self.ctx.graph, ) self.session = self.login() - self.owners_dict = self.build_preset_owner_dict() + self.owners_id_to_email_dict = self.build_preset_owner_dict() def login(self) -> requests.Session: login_response = requests.post( @@ -360,38 +360,38 @@ def get_datasource_urn_from_id( ) raise ValueError("Could not construct dataset URN") - def parse_owner_payload(self, payload: Dict[str, Any]) -> Dict[str, str]: - owners_dict = {} + def _parse_owner_payload(self, payload: Dict[str, Any]) -> Dict[str, str]: + owners_id_to_email_dict = {} for owner_data in payload["result"]: - email = owner_data["extra"]["email"] - value = owner_data["value"] + owner_email = owner_data.get("extra", {}).get("email", None) + owner_id = owner_data.get("value", None) - if value and email: - owners_dict[value] = email - return owners_dict + if owner_id and owner_email: + owners_id_to_email_dict[owner_id] = owner_email + return owners_id_to_email_dict def build_preset_owner_dict(self) -> Dict[str, str]: - owners_dict = {} - dataset_payload = self.get_all_entity_owners("dataset") - chart_payload = self.get_all_entity_owners("chart") - dashboard_payload = self.get_all_entity_owners("dashboard") + owners_id_to_email_dict = {} + dataset_payload = self._get_all_entity_owners("dataset") + chart_payload = self._get_all_entity_owners("chart") + dashboard_payload = self._get_all_entity_owners("dashboard") - owners_dict.update(self.parse_owner_payload(dataset_payload)) - owners_dict.update(self.parse_owner_payload(chart_payload)) - owners_dict.update(self.parse_owner_payload(dashboard_payload)) - return owners_dict + owners_id_to_email_dict.update(self._parse_owner_payload(dataset_payload)) + owners_id_to_email_dict.update(self._parse_owner_payload(chart_payload)) + owners_id_to_email_dict.update(self._parse_owner_payload(dashboard_payload)) + return owners_id_to_email_dict def build_owners_urn_list(self, data: Dict[str, Any]) -> List[str]: owners_urn_list = [] for owner in data.get("owners", []): owner_id = owner.get("id") - owner_email = self.owners_dict.get(owner_id) + owner_email = self.owners_id_to_email_dict.get(owner_id) if owner_email is not None: owners_urn = make_user_urn(owner_email) owners_urn_list.append(owners_urn) return owners_urn_list - def get_all_entity_owners(self, entity: str) -> Dict[str, Any]: + def _get_all_entity_owners(self, entity: str) -> Dict[str, Any]: current_page = 1 total_owners = PAGE_SIZE all_owners = [] @@ -405,7 +405,8 @@ def get_all_entity_owners(self, entity: str) -> Dict[str, Any]: logger.warning( f"Failed to get {entity} data: {full_owners_response.text}" ) - full_owners_response.raise_for_status() + current_page += 1 + continue payload = full_owners_response.json() total_owners = payload.get("count", total_owners)