Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/qlik): Qlik cloud connector integration #9682

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
2b9f8d4
Add Qlik Cloud source code
shubhamjagtap639 Jan 22, 2024
aefbd4b
Modify qlik cloud soure name
shubhamjagtap639 Jan 22, 2024
a3c4696
Add qlik source integration test cases
shubhamjagtap639 Jan 22, 2024
d7c9a31
Add missing capabilities decorator
shubhamjagtap639 Jan 22, 2024
f98b534
Address review comments and add dashboard and chart metadata ingestio…
shubhamjagtap639 Jan 30, 2024
13f3eb4
Merge branch 'master' of https://github.com/shubhamjagtap639/datahub …
shubhamjagtap639 Jan 30, 2024
a8fcc00
Address review comments and add qlik app used dataset metadata ingest…
shubhamjagtap639 Feb 1, 2024
002baa5
Merge branch 'master' into Qlik-Connector-Integration
shubhamjagtap639 Feb 1, 2024
47cd2ff
Merge branch 'master' into Qlik-Connector-Integration
hsheth2 Feb 2, 2024
924ab3f
Address review comments
shubhamjagtap639 Feb 4, 2024
db3d62d
Merge branch 'Qlik-Connector-Integration' of https://github.com/shubh…
shubhamjagtap639 Feb 4, 2024
6c48007
Address review comments
shubhamjagtap639 Feb 7, 2024
191f0ff
Add fineGrained Lineage for app used tables and datasets
shubhamjagtap639 Feb 8, 2024
6f97cc5
Add app used tables as upstreams of all charts
shubhamjagtap639 Feb 13, 2024
c2dd545
Merge branch 'master' into Qlik-Connector-Integration
shubhamjagtap639 Feb 13, 2024
1dadb27
Merge branch 'master' of https://github.com/shubhamjagtap639/datahub …
shubhamjagtap639 Feb 16, 2024
f0e9e1f
Merge branch 'master' of https://github.com/shubhamjagtap639/datahub …
shubhamjagtap639 Feb 16, 2024
90b8790
Add Websocket-client package in qlik-sense plugin
shubhamjagtap639 Feb 20, 2024
2c4931a
Big fixed to fetch all items present
shubhamjagtap639 Feb 21, 2024
ae7bd0b
Fix bug for lineage metadata
shubhamjagtap639 Feb 22, 2024
224b656
Add remained constants
shubhamjagtap639 Feb 22, 2024
e25fe4b
Merge branch 'master' into Qlik-Connector-Integration
shubhamjagtap639 Feb 22, 2024
ac249d6
Update setup.py
shubhamjagtap639 Feb 23, 2024
dc0bd90
Merge branch 'master' into Qlik-Connector-Integration
hsheth2 Feb 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ source:

ingest_owner: true

# Optional -- This mapping is optional and only required to configure platform-instance for Qlik app dataset upstream source table
# A mapping of the Qlik app dataset upstream source to platform instance. Use <database>.<schema>.<table> as key.
# app_dataset_source_to_platform_instance:
# <database>.<schema>.<table>:
# Optional -- This mapping is optional and only required to configure platform-instance for Qlik app dataset upstream source tables
# A mapping of the Qlik app dataset upstream tables from data connection to platform instance. Use 'data_connection_name' as key.
# data_connection_to_platform_instance:
# data_connection_name:
# platform_instance: cloud_instance
# env: DEV

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,6 @@
QLIK_DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"


class WebSocketRequest:
"""
Web socket send request dict
"""

OPEN_DOC = {
"jsonrpc": "2.0",
"id": 1,
"handle": -1,
"method": "OpenDoc",
"params": {"qDocName": "f0714ca7-7093-49e4-8b58-47bb38563647"},
}


class Constant:
"""
keys used in qlik plugin
Expand Down Expand Up @@ -122,8 +108,9 @@ class QlikSourceConfig(
default=True,
description="Ingest Owner from source. This will override Owner info entered from UI",
)
# Qlik app dataset upstream source to platform instance mapping
app_dataset_source_to_platform_instance: Dict[str, PlatformDetail] = pydantic.Field(
# Qlik app dataset upstream tables from data connection to platform instance mapping
data_connection_to_platform_instance: Dict[str, PlatformDetail] = pydantic.Field(
default={},
description="A mapping of the Qlik app dataset upstream source to platform instance. Use <database>.<schema>.<table> as key.",
description="A mapping of the Qlik app dataset upstream tables from data connection to platform instance."
"Use 'data_connection_name' as key.",
)
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
class QlikAPI:
def __init__(self, config: QlikSourceConfig) -> None:
self.spaces: Dict = {}
self.users: Dict = {}
self.config = config
self.session = requests.Session()
self.session.headers.update(
Expand Down Expand Up @@ -74,6 +75,21 @@ def _get_dataset(self, dataset_id: str) -> Optional[QlikDataset]:
)
return None

def get_user_name(self, user_id: str) -> Optional[str]:
try:
if user_id in self.users:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not necessary, but we could use functools.lru_cache here instead

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use of functools.lru_cache creating some issue in api request.

# To avoid fetching same user details again
return self.users[user_id]
else:
response = self.session.get(f"{self.rest_api_url}/users/{user_id}")
response.raise_for_status()
user_name = response.json()[Constant.NAME]
self.users[user_id] = user_name
return user_name
except Exception:
self._log_http_error(message=f"Unable to fetch user with id {user_id}")
return None

def _get_sheet(
self,
websocket_connection: WebsocketConnection,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,14 +171,17 @@ def _gen_space_workunit(self, space: Space) -> Iterable[MetadataWorkUnit]:
"""
Map Qlik space to Datahub container
"""
owner_username: Optional[str] = None
if space.ownerId:
owner_username = self.qlik_api.get_user_name(space.ownerId)
yield from gen_containers(
container_key=self._gen_space_key(space.id),
name=space.name,
description=space.description,
sub_types=[BIContainerSubTypes.QLIK_SPACE],
extra_properties={Constant.TYPE: str(space.type)},
owner_urn=builder.make_user_urn(space.ownerId)
if self.config.ingest_owner and space.ownerId
owner_urn=builder.make_user_urn(owner_username)
if self.config.ingest_owner and owner_username
else None,
created=int(space.createdAt.timestamp() * 1000),
last_modified=int(space.updatedAt.timestamp() * 1000),
Expand All @@ -190,12 +193,12 @@ def _gen_entity_status_aspect(self, entity_urn: str) -> MetadataWorkUnit:
).as_workunit()

def _gen_entity_owner_aspect(
self, entity_urn: str, owner_id: str
self, entity_urn: str, user_name: str
) -> MetadataWorkUnit:
aspect = OwnershipClass(
owners=[
OwnerClass(
owner=builder.make_user_urn(owner_id),
owner=builder.make_user_urn(user_name),
type=OwnershipTypeClass.DATAOWNER,
)
]
Expand Down Expand Up @@ -281,8 +284,9 @@ def _gen_sheets_workunit(
if dpi_aspect:
yield dpi_aspect

if self.config.ingest_owner:
yield self._gen_entity_owner_aspect(dashboard_urn, sheet.ownerId)
owner_username = self.qlik_api.get_user_name(sheet.ownerId)
if self.config.ingest_owner and owner_username:
yield self._gen_entity_owner_aspect(dashboard_urn, owner_username)

yield from self._gen_charts_workunit(sheet.charts)

Expand All @@ -292,8 +296,8 @@ def _gen_app_dataset_upstream_lineage(
dataset_identifier = self._get_app_dataset_identifier(dataset)
dataset_urn = self._gen_qlik_dataset_urn(dataset_identifier)
upstream_dataset_platform_detail = (
self.config.app_dataset_source_to_platform_instance.get(
dataset_identifier,
self.config.data_connection_to_platform_instance.get(
dataset.dataconnectorName,
PlatformDetail(
env=self.config.env, platform_instance=self.config.platform_instance
),
Expand Down Expand Up @@ -339,7 +343,9 @@ def _gen_app_dataset_properties(self, dataset: QlikAppDataset) -> MetadataWorkUn
).as_workunit()

def _get_app_dataset_identifier(self, dataset: QlikAppDataset) -> str:
return f"{dataset.databaseName}.{dataset.schemaName}.{dataset.tableName}"
return (
f"{dataset.databaseName}.{dataset.schemaName}.{dataset.tableName}".lower()
)

def _gen_app_dataset_workunit(
self, datasets: List[QlikAppDataset], app_id: str
Expand Down Expand Up @@ -375,15 +381,16 @@ def _gen_app_workunit(self, app: App) -> Iterable[MetadataWorkUnit]:
"""
Map Qlik App to Datahub container
"""
owner_username = self.qlik_api.get_user_name(app.ownerId)
yield from gen_containers(
container_key=self._gen_app_key(app.id),
name=app.qTitle,
description=app.description,
sub_types=[BIContainerSubTypes.QLIK_APP],
parent_container_key=self._gen_space_key(app.spaceId),
extra_properties={Constant.QRI: app.qri, Constant.USAGE: app.qUsage},
owner_urn=builder.make_user_urn(app.ownerId)
if self.config.ingest_owner
owner_urn=builder.make_user_urn(owner_username)
if self.config.ingest_owner and owner_username
else None,
created=int(app.createdAt.timestamp() * 1000),
last_modified=int(app.updatedAt.timestamp() * 1000),
Expand Down Expand Up @@ -427,7 +434,6 @@ def _gen_schema_fields(
if field.dataType
else NullType()
),
# NOTE: nativeDataType will not be in sync with older connector
nativeDataType=field.dataType if field.dataType else "",
nullable=field.nullable,
isPartOfKey=field.primaryKey,
Expand Down Expand Up @@ -496,8 +502,9 @@ def _gen_dataset_workunit(self, dataset: QlikDataset) -> Iterable[MetadataWorkUn
if dpi_aspect:
yield dpi_aspect

if self.config.ingest_owner:
yield self._gen_entity_owner_aspect(dataset_urn, dataset.ownerId)
owner_username = self.qlik_api.get_user_name(dataset.ownerId)
if self.config.ingest_owner and owner_username:
yield self._gen_entity_owner_aspect(dataset_urn, owner_username)

yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
Expand Down
Loading