Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Ownership from Preset #12154

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/source/preset.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,6 @@ class PresetSource(SupersetSource):
platform = "preset"

def __init__(self, ctx: PipelineContext, config: PresetConfig):
logger.info(f"ctx is {ctx}")

super().__init__(ctx, config)
self.config = config
self.report = StaleEntityRemovalSourceReport()
Expand Down
115 changes: 110 additions & 5 deletions metadata-ingestion/src/datahub/ingestion/source/superset.py
enosodigie marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
make_dataset_urn,
make_dataset_urn_with_platform_instance,
make_domain_urn,
make_user_urn,
)
from datahub.emitter.mcp_builder import add_domain_to_entity_wu
from datahub.ingestion.api.common import PipelineContext
Expand Down Expand Up @@ -72,6 +73,9 @@
ChartTypeClass,
DashboardInfoClass,
DatasetPropertiesClass,
OwnerClass,
OwnershipClass,
OwnershipTypeClass,
)
from datahub.utilities import config_clean
from datahub.utilities.registries.domain_registry import DomainRegistry
Expand Down Expand Up @@ -232,6 +236,7 @@ def __init__(self, ctx: PipelineContext, config: SupersetConfig):
graph=self.ctx.graph,
)
self.session = self.login()
self.owners_id_to_email_dict = self.build_preset_owner_dict()

def login(self) -> requests.Session:
login_response = requests.post(
Expand Down Expand Up @@ -355,6 +360,61 @@ def get_datasource_urn_from_id(
)
raise ValueError("Could not construct dataset URN")

def _parse_owner_payload(self, payload: Dict[str, Any]) -> Dict[str, str]:
owners_id_to_email_dict = {}
for owner_data in payload["result"]:
owner_email = owner_data.get("extra", {}).get("email", None)
owner_id = owner_data.get("value", None)

if owner_id and owner_email:
owners_id_to_email_dict[owner_id] = owner_email
return owners_id_to_email_dict

def build_preset_owner_dict(self) -> Dict[str, str]:
owners_id_to_email_dict = {}
dataset_payload = self._get_all_entity_owners("dataset")
chart_payload = self._get_all_entity_owners("chart")
dashboard_payload = self._get_all_entity_owners("dashboard")

owners_id_to_email_dict.update(self._parse_owner_payload(dataset_payload))
owners_id_to_email_dict.update(self._parse_owner_payload(chart_payload))
owners_id_to_email_dict.update(self._parse_owner_payload(dashboard_payload))
return owners_id_to_email_dict

def build_owners_urn_list(self, data: Dict[str, Any]) -> List[str]:
owners_urn_list = []
for owner in data.get("owners", []):
owner_id = owner.get("id")
owner_email = self.owners_id_to_email_dict.get(owner_id)
if owner_email is not None:
owners_urn = make_user_urn(owner_email)
owners_urn_list.append(owners_urn)
return owners_urn_list

def _get_all_entity_owners(self, entity: str) -> Dict[str, Any]:
current_page = 1
total_owners = PAGE_SIZE
all_owners = []

while (current_page - 1) * PAGE_SIZE <= total_owners:
full_owners_response = self.session.get(
f"{self.config.connect_uri}/api/v1/{entity}/related/owners",
params=f"q=(page:{current_page},page_size:{PAGE_SIZE})",
)
if full_owners_response.status_code != 200:
logger.warning(
enosodigie marked this conversation as resolved.
Show resolved Hide resolved
f"Failed to get {entity} data: {full_owners_response.text}"
)
enosodigie marked this conversation as resolved.
Show resolved Hide resolved
current_page += 1
continue

payload = full_owners_response.json()
total_owners = payload.get("count", total_owners)
all_owners.extend(payload.get("result", []))
current_page += 1
# return combined payload
return {"result": all_owners, "count": total_owners}

def construct_dashboard_from_api_data(
self, dashboard_data: dict
) -> DashboardSnapshot:
Expand Down Expand Up @@ -427,6 +487,20 @@ def construct_dashboard_from_api_data(
customProperties=custom_properties,
)
dashboard_snapshot.aspects.append(dashboard_info)

dashboard_owners_list = self.build_owners_urn_list(dashboard_data)
owners_info = OwnershipClass(
owners=[
OwnerClass(
owner=urn,
# default as Technical Owners from Preset
type=OwnershipTypeClass.TECHNICAL_OWNER,
)
for urn in (dashboard_owners_list or [])
],
)
dashboard_snapshot.aspects.append(owners_info)

return dashboard_snapshot

def emit_dashboard_mces(self) -> Iterable[MetadataWorkUnit]:
Expand Down Expand Up @@ -474,10 +548,15 @@ def construct_chart_from_chart_data(self, chart_data: dict) -> ChartSnapshot:
chart_url = f"{self.config.display_uri}{chart_data.get('url', '')}"

datasource_id = chart_data.get("datasource_id")
dataset_response = self.get_dataset_info(datasource_id)
datasource_urn = self.get_datasource_urn_from_id(
dataset_response, self.platform
)
if not datasource_id:
logger.debug(
f'chart {chart_data["id"]} has no datasource_id, skipping fetching dataset info'
)
datasource_urn = None
else:
dataset_response = self.get_dataset_info(datasource_id)
ds_urn = self.get_datasource_urn_from_id(dataset_response, self.platform)
datasource_urn = [ds_urn] if not isinstance(ds_urn, list) else ds_urn

params = json.loads(chart_data.get("params", "{}"))
metrics = [
Expand Down Expand Up @@ -522,10 +601,24 @@ def construct_chart_from_chart_data(self, chart_data: dict) -> ChartSnapshot:
title=title,
lastModified=last_modified,
chartUrl=chart_url,
inputs=[datasource_urn] if datasource_urn else None,
inputs=datasource_urn,
customProperties=custom_properties,
)
chart_snapshot.aspects.append(chart_info)

chart_owners_list = self.build_owners_urn_list(chart_data)
owners_info = OwnershipClass(
owners=[
OwnerClass(
owner=urn,
# default as Technical Owners from Preset
type=OwnershipTypeClass.TECHNICAL_OWNER,
)
for urn in (chart_owners_list or [])
],
)
chart_snapshot.aspects.append(owners_info)

return chart_snapshot

def emit_chart_mces(self) -> Iterable[MetadataWorkUnit]:
Expand Down Expand Up @@ -615,6 +708,18 @@ def construct_dataset_from_dataset_data(
]
)

dataset_owners_list = self.build_owners_urn_list(dataset_data)
owners_info = OwnershipClass(
owners=[
OwnerClass(
owner=urn,
# default as Technical Owners from Preset
type=OwnershipTypeClass.TECHNICAL_OWNER,
)
for urn in (dataset_owners_list or [])
],
)
aspects_items.append(owners_info)
dataset_snapshot = DatasetSnapshot(
urn=datasource_urn,
aspects=aspects_items,
Expand Down
Loading