From 419f7f6cdeb1d2873873d61d8b1bc02022423a6f Mon Sep 17 00:00:00 2001 From: Kevin Karch Date: Wed, 27 Nov 2024 16:25:17 -0500 Subject: [PATCH 1/8] feat(cli): added cli option for ingestion source --- .../src/datahub/cli/ingest_cli.py | 51 +++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/metadata-ingestion/src/datahub/cli/ingest_cli.py b/metadata-ingestion/src/datahub/cli/ingest_cli.py index 51f095751f7dd9..df59c6cfd65a25 100644 --- a/metadata-ingestion/src/datahub/cli/ingest_cli.py +++ b/metadata-ingestion/src/datahub/cli/ingest_cli.py @@ -437,6 +437,57 @@ def mcps(path: str) -> None: sys.exit(ret) +@ingest.command() +@click.argument("page_offset", type=int, default=0) +@click.argument("page_size", type=int, default=100) +@upgrade.check_upgrade +@telemetry.with_telemetry() +def list_sources(page_offset: int, page_size: int) -> None: + """List ingestion sources with their number of executions""" + + query = """ + query listIngestionRuns($input: ListIngestionSourcesInput!) { + listIngestionSources(input: $input) { + ingestionSources { + urn + name + executions { + total + } + } + } + } + """ + + variables = { + "input": { + "start": page_offset, + "count": page_size + } + } + + client = get_default_graph() + session = client._session + gms_host = client.config.server + + url = f"{gms_host}/api/graphql" + response = session.post(url, json={"query": query, "variables": variables}) + + data = response.json() + + rows = [] + if "data" in data and "listIngestionSources" in data["data"]: + sources = data["data"]["listIngestionSources"]["ingestionSources"] + for source in sources: + urn = source.get("urn", "N/A") + name = source.get("name", "N/A") + executions = source.get("executions", {}) + total = executions.get("total", 0) + rows.append([urn, name, total]) + + click.echo(tabulate(rows, headers=["URN", "Name", "Total Executions"], tablefmt="grid") if rows else "No ingestion sources found.") + + @ingest.command() @click.argument("page_offset", type=int, default=0) @click.argument("page_size", type=int, default=100) From cb5adbfb7658f15bc0256d9b40a9fa45a09c75c8 Mon Sep 17 00:00:00 2001 From: Kevin Karch Date: Mon, 2 Dec 2024 09:29:40 -0500 Subject: [PATCH 2/8] fix(cli): ran linter --- metadata-ingestion/src/datahub/cli/ingest_cli.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/metadata-ingestion/src/datahub/cli/ingest_cli.py b/metadata-ingestion/src/datahub/cli/ingest_cli.py index df59c6cfd65a25..71ff5172e1d867 100644 --- a/metadata-ingestion/src/datahub/cli/ingest_cli.py +++ b/metadata-ingestion/src/datahub/cli/ingest_cli.py @@ -459,12 +459,7 @@ def list_sources(page_offset: int, page_size: int) -> None: } """ - variables = { - "input": { - "start": page_offset, - "count": page_size - } - } + variables = {"input": {"start": page_offset, "count": page_size}} client = get_default_graph() session = client._session @@ -485,7 +480,11 @@ def list_sources(page_offset: int, page_size: int) -> None: total = executions.get("total", 0) rows.append([urn, name, total]) - click.echo(tabulate(rows, headers=["URN", "Name", "Total Executions"], tablefmt="grid") if rows else "No ingestion sources found.") + click.echo( + tabulate(rows, headers=["URN", "Name", "Total Executions"], tablefmt="grid") + if rows + else "No ingestion sources found." + ) @ingest.command() From 31ceb9a41230b91fdeda5349fc300922fbfc6776 Mon Sep 17 00:00:00 2001 From: Kevin Karch Date: Mon, 2 Dec 2024 10:25:56 -0500 Subject: [PATCH 3/8] fix(cli) replace instead of add cli command --- .../src/datahub/cli/ingest_cli.py | 86 ++++++------------- 1 file changed, 26 insertions(+), 60 deletions(-) diff --git a/metadata-ingestion/src/datahub/cli/ingest_cli.py b/metadata-ingestion/src/datahub/cli/ingest_cli.py index 71ff5172e1d867..27c2df1f970452 100644 --- a/metadata-ingestion/src/datahub/cli/ingest_cli.py +++ b/metadata-ingestion/src/datahub/cli/ingest_cli.py @@ -27,7 +27,7 @@ logger = logging.getLogger(__name__) -RUNS_TABLE_COLUMNS = ["runId", "rows", "created at"] +RUNS_TABLE_COLUMNS = ["runId", "source", "startTime", "status"] RUN_TABLE_COLUMNS = ["urn", "aspect name", "created at"] @@ -442,17 +442,22 @@ def mcps(path: str) -> None: @click.argument("page_size", type=int, default=100) @upgrade.check_upgrade @telemetry.with_telemetry() -def list_sources(page_offset: int, page_size: int) -> None: - """List ingestion sources with their number of executions""" +def list_runs(page_offset: int, page_size: int) -> None: + """List ingestion runs with their execution details""" query = """ query listIngestionRuns($input: ListIngestionSourcesInput!) { listIngestionSources(input: $input) { ingestionSources { - urn name executions { - total + executionRequests { + id + result { + startTimeMs + status + } + } } } } @@ -467,74 +472,35 @@ def list_sources(page_offset: int, page_size: int) -> None: url = f"{gms_host}/api/graphql" response = session.post(url, json={"query": query, "variables": variables}) - data = response.json() rows = [] if "data" in data and "listIngestionSources" in data["data"]: sources = data["data"]["listIngestionSources"]["ingestionSources"] for source in sources: - urn = source.get("urn", "N/A") name = source.get("name", "N/A") - executions = source.get("executions", {}) - total = executions.get("total", 0) - rows.append([urn, name, total]) + executions = source.get("executions", {}).get("executionRequests", []) + for execution in executions: + execution_id = execution.get("id", "N/A") + start_time = execution.get("result", {}).get("startTimeMs", "N/A") + start_time = datetime.fromtimestamp(start_time / 1000).strftime( + "%Y-%m-%d %H:%M:%S" + ) + status = execution.get("result", {}).get("status", "N/A") + + rows.append([execution_id, name, start_time, status]) click.echo( - tabulate(rows, headers=["URN", "Name", "Total Executions"], tablefmt="grid") + tabulate( + rows, + headers=RUNS_TABLE_COLUMNS, + tablefmt="grid", + ) if rows - else "No ingestion sources found." + else "No ingestion sources or executions found." ) -@ingest.command() -@click.argument("page_offset", type=int, default=0) -@click.argument("page_size", type=int, default=100) -@click.option( - "--include-soft-deletes", - is_flag=True, - default=False, - help="If enabled, will list ingestion runs which have been soft deleted", -) -@upgrade.check_upgrade -@telemetry.with_telemetry() -def list_runs(page_offset: int, page_size: int, include_soft_deletes: bool) -> None: - """List recent ingestion runs to datahub""" - - client = get_default_graph() - session = client._session - gms_host = client.config.server - - url = f"{gms_host}/runs?action=list" - - payload_obj = { - "pageOffset": page_offset, - "pageSize": page_size, - "includeSoft": include_soft_deletes, - } - - payload = json.dumps(payload_obj) - - response = session.post(url, data=payload) - - rows = parse_restli_response(response) - local_timezone = datetime.now().astimezone().tzinfo - - structured_rows = [ - [ - row.get("runId"), - row.get("rows"), - datetime.fromtimestamp(row.get("timestamp") / 1000).strftime( - "%Y-%m-%d %H:%M:%S" - ) - + f" ({local_timezone})", - ] - for row in rows - ] - - click.echo(tabulate(structured_rows, RUNS_TABLE_COLUMNS, tablefmt="grid")) - - @ingest.command() @click.option("--run-id", required=True, type=str) @click.option("--start", type=int, default=0) From 66da66aefc4a2bb5b643e83437cd7a26b6b3f93e Mon Sep 17 00:00:00 2001 From: Kevin Karch Date: Mon, 2 Dec 2024 16:57:56 -0500 Subject: [PATCH 4/8] feat(cli): added urn filter --- .../src/datahub/cli/ingest_cli.py | 133 +++++++++++++++--- 1 file changed, 111 insertions(+), 22 deletions(-) diff --git a/metadata-ingestion/src/datahub/cli/ingest_cli.py b/metadata-ingestion/src/datahub/cli/ingest_cli.py index 27c2df1f970452..3b75c7456f180e 100644 --- a/metadata-ingestion/src/datahub/cli/ingest_cli.py +++ b/metadata-ingestion/src/datahub/cli/ingest_cli.py @@ -27,7 +27,8 @@ logger = logging.getLogger(__name__) -RUNS_TABLE_COLUMNS = ["runId", "source", "startTime", "status"] +INGEST_SRC_TABLE_COLUMNS = ["runId", "URN", "source", "startTime", "status"] +RUNS_TABLE_COLUMNS = ["runId", "rows", "created at"] RUN_TABLE_COLUMNS = ["urn", "aspect name", "created at"] @@ -440,15 +441,17 @@ def mcps(path: str) -> None: @ingest.command() @click.argument("page_offset", type=int, default=0) @click.argument("page_size", type=int, default=100) +@click.option("--urn", type=str, default=None, help="Filter by ingestion source URN.") @upgrade.check_upgrade @telemetry.with_telemetry() -def list_runs(page_offset: int, page_size: int) -> None: - """List ingestion runs with their execution details""" +def list_source_runs(page_offset: int, page_size: int, urn: str) -> None: + """List ingestion source runs with their details, optionally filtered by URN""" query = """ query listIngestionRuns($input: ListIngestionSourcesInput!) { listIngestionSources(input: $input) { ingestionSources { + urn name executions { executionRequests { @@ -464,43 +467,129 @@ def list_runs(page_offset: int, page_size: int) -> None: } """ - variables = {"input": {"start": page_offset, "count": page_size}} + # filter by urn using CONTAINS + filters = [] + if urn: + filters.append({"field": "urn", "values": [urn], "condition": "CONTAIN"}) + + variables = { + "input": { + "start": page_offset, + "count": page_size, + "filters": filters, + } + } client = get_default_graph() session = client._session gms_host = client.config.server url = f"{gms_host}/api/graphql" - response = session.post(url, json={"query": query, "variables": variables}) - data = response.json() + try: + response = session.post(url, json={"query": query, "variables": variables}) + response.raise_for_status() + except Exception as e: + click.echo(f"Error fetching data: {str(e)}") + return + + try: + data = response.json() + except ValueError: + click.echo("Failed to parse JSON response from server.") + return + + if not data: + click.echo("No response received from the server.") + return + + # when urn filter does not match + if ( + not isinstance(data.get("data"), dict) + or "listIngestionSources" not in data["data"] + ): + click.echo("No matching ingestion sources found. Please check your filters.") + return + + sources = data["data"]["listIngestionSources"]["ingestionSources"] + if not sources: + click.echo("No ingestion sources or executions found.") + return rows = [] - if "data" in data and "listIngestionSources" in data["data"]: - sources = data["data"]["listIngestionSources"]["ingestionSources"] - for source in sources: - name = source.get("name", "N/A") - executions = source.get("executions", {}).get("executionRequests", []) - for execution in executions: - execution_id = execution.get("id", "N/A") - start_time = execution.get("result", {}).get("startTimeMs", "N/A") - start_time = datetime.fromtimestamp(start_time / 1000).strftime( - "%Y-%m-%d %H:%M:%S" - ) - status = execution.get("result", {}).get("status", "N/A") + for source in sources: + urn = source.get("urn", "N/A") + name = source.get("name", "N/A") + + executions = source.get("executions", {}).get("executionRequests", []) + for execution in executions: + execution_id = execution.get("id", "N/A") + start_time = execution.get("result", {}).get("startTimeMs", "N/A") + start_time = ( + datetime.fromtimestamp(start_time / 1000).strftime("%Y-%m-%d %H:%M:%S") + if start_time != "N/A" + else "N/A" + ) + status = execution.get("result", {}).get("status", "N/A") - rows.append([execution_id, name, start_time, status]) + rows.append([execution_id, urn, name, start_time, status]) click.echo( tabulate( rows, - headers=RUNS_TABLE_COLUMNS, + headers=INGEST_SRC_TABLE_COLUMNS, tablefmt="grid", ) - if rows - else "No ingestion sources or executions found." ) +@ingest.command() +@click.argument("page_offset", type=int, default=0) +@click.argument("page_size", type=int, default=100) +@click.option( + "--include-soft-deletes", + is_flag=True, + default=False, + help="If enabled, will list ingestion runs which have been soft deleted", +) +@upgrade.check_upgrade +@telemetry.with_telemetry() +def list_run_ids(page_offset: int, page_size: int, include_soft_deletes: bool) -> None: + """List recent ingestion runs to datahub""" + + client = get_default_graph() + session = client._session + gms_host = client.config.server + + url = f"{gms_host}/runs?action=list" + + payload_obj = { + "pageOffset": page_offset, + "pageSize": page_size, + "includeSoft": include_soft_deletes, + } + + payload = json.dumps(payload_obj) + + response = session.post(url, data=payload) + + rows = parse_restli_response(response) + local_timezone = datetime.now().astimezone().tzinfo + + structured_rows = [ + [ + row.get("runId"), + row.get("rows"), + datetime.fromtimestamp(row.get("timestamp") / 1000).strftime( + "%Y-%m-%d %H:%M:%S" + ) + + f" ({local_timezone})", + ] + for row in rows + ] + + click.echo(tabulate(structured_rows, RUNS_TABLE_COLUMNS, tablefmt="grid")) + + @ingest.command() @click.option("--run-id", required=True, type=str) @click.option("--start", type=int, default=0) From 60bdaf164419db63f93bfbcf34689c658a95e426 Mon Sep 17 00:00:00 2001 From: Kevin Karch Date: Tue, 3 Dec 2024 15:47:17 -0500 Subject: [PATCH 5/8] feat(cli): add source name filtering --- .../src/datahub/cli/ingest_cli.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/metadata-ingestion/src/datahub/cli/ingest_cli.py b/metadata-ingestion/src/datahub/cli/ingest_cli.py index 3b75c7456f180e..38608a46e6c641 100644 --- a/metadata-ingestion/src/datahub/cli/ingest_cli.py +++ b/metadata-ingestion/src/datahub/cli/ingest_cli.py @@ -27,7 +27,7 @@ logger = logging.getLogger(__name__) -INGEST_SRC_TABLE_COLUMNS = ["runId", "URN", "source", "startTime", "status"] +INGEST_SRC_TABLE_COLUMNS = ["runId", "source", "startTime", "status", "URN"] RUNS_TABLE_COLUMNS = ["runId", "rows", "created at"] RUN_TABLE_COLUMNS = ["urn", "aspect name", "created at"] @@ -442,10 +442,13 @@ def mcps(path: str) -> None: @click.argument("page_offset", type=int, default=0) @click.argument("page_size", type=int, default=100) @click.option("--urn", type=str, default=None, help="Filter by ingestion source URN.") +@click.option( + "--source", type=str, default=None, help="Filter by ingestion source name." +) @upgrade.check_upgrade @telemetry.with_telemetry() -def list_source_runs(page_offset: int, page_size: int, urn: str) -> None: - """List ingestion source runs with their details, optionally filtered by URN""" +def list_source_runs(page_offset: int, page_size: int, urn: str, source: str) -> None: + """List ingestion source runs with their details, optionally filtered by URN or source.""" query = """ query listIngestionRuns($input: ListIngestionSourcesInput!) { @@ -467,10 +470,12 @@ def list_source_runs(page_offset: int, page_size: int, urn: str) -> None: } """ - # filter by urn using CONTAINS + # filter by urn and/or source using CONTAINS filters = [] if urn: filters.append({"field": "urn", "values": [urn], "condition": "CONTAIN"}) + if source: + filters.append({"field": "name", "values": [source], "condition": "CONTAIN"}) variables = { "input": { @@ -502,7 +507,7 @@ def list_source_runs(page_offset: int, page_size: int, urn: str) -> None: click.echo("No response received from the server.") return - # when urn filter does not match + # when urn or source filter does not match, exit gracefully if ( not isinstance(data.get("data"), dict) or "listIngestionSources" not in data["data"] @@ -531,7 +536,7 @@ def list_source_runs(page_offset: int, page_size: int, urn: str) -> None: ) status = execution.get("result", {}).get("status", "N/A") - rows.append([execution_id, urn, name, start_time, status]) + rows.append([execution_id, name, start_time, status, urn]) click.echo( tabulate( From dbe878e5a04490199a7214099e83023e1483a1a6 Mon Sep 17 00:00:00 2001 From: Kevin Karch Date: Tue, 3 Dec 2024 16:42:09 -0500 Subject: [PATCH 6/8] feat(cli): changed variable name for failing pipeline --- metadata-ingestion/src/datahub/cli/ingest_cli.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/metadata-ingestion/src/datahub/cli/ingest_cli.py b/metadata-ingestion/src/datahub/cli/ingest_cli.py index 38608a46e6c641..69ebb5d1c979ba 100644 --- a/metadata-ingestion/src/datahub/cli/ingest_cli.py +++ b/metadata-ingestion/src/datahub/cli/ingest_cli.py @@ -515,17 +515,17 @@ def list_source_runs(page_offset: int, page_size: int, urn: str, source: str) -> click.echo("No matching ingestion sources found. Please check your filters.") return - sources = data["data"]["listIngestionSources"]["ingestionSources"] - if not sources: + ingestion_sources = data["data"]["listIngestionSources"]["ingestionSources"] + if not ingestion_sources: click.echo("No ingestion sources or executions found.") return rows = [] - for source in sources: - urn = source.get("urn", "N/A") - name = source.get("name", "N/A") + for ingestion_source in ingestion_sources: + urn = ingestion_source.get("urn", "N/A") + name = ingestion_source.get("name", "N/A") - executions = source.get("executions", {}).get("executionRequests", []) + executions = ingestion_source.get("executions", {}).get("executionRequests", []) for execution in executions: execution_id = execution.get("id", "N/A") start_time = execution.get("result", {}).get("startTimeMs", "N/A") From 1d635fec2d89c36042b2240450294dd0fa8d116e Mon Sep 17 00:00:00 2001 From: Kevin Karch Date: Wed, 4 Dec 2024 10:21:57 -0500 Subject: [PATCH 7/8] feat(cli): updated docs for cli commands --- docs/cli.md | 15 ++++++++++++++- docs/how/delete-metadata.md | 4 ++-- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/docs/cli.md b/docs/cli.md index c633b7f4a38ad3..4d75e106b82f03 100644 --- a/docs/cli.md +++ b/docs/cli.md @@ -88,7 +88,7 @@ The `docker` command allows you to start up a local DataHub instance using `data The `ingest` command allows you to ingest metadata from your sources using ingestion configuration files, which we call recipes. Source specific crawlers are provided by plugins and might sometimes need additional extras to be installed. See [installing plugins](#installing-plugins) for more information. -[Removing Metadata from DataHub](./how/delete-metadata.md) contains detailed instructions about how you can use the ingest command to perform operations like rolling-back previously ingested metadata through the `rollback` sub-command and listing all runs that happened through `list-runs` sub-command. +[Removing Metadata from DataHub](./how/delete-metadata.md) contains detailed instructions about how you can use the ingest command to perform operations like rolling-back previously ingested metadata through the `rollback` sub-command and listing all runs that happened through `list-run-ids` sub-command. ```console Usage: datahub [datahub-options] ingest [command-options] @@ -115,6 +115,19 @@ datahub ingest -c ./examples/recipes/example_to_datahub_rest.dhub.yaml --dry-run datahub ingest -c ./examples/recipes/example_to_datahub_rest.dhub.yaml -n ``` +#### ingest --list-source-runs + +The `--list-source-runs` option of the `ingest` command lists the previous runs, displaying their run ID, source name, +start time, status, and source URN. This command allows you to filter results using the --urn option for URN-based +filtering or the --source option to filter by source name (partial or complete matches are supported). + +```shell +# List all ingestion runs +datahub ingest --list-source-runs +# Filter runs by a source name containing "demo" +datahub ingest --list-source-runs --source "demo" +``` + #### ingest --preview The `--preview` option of the `ingest` command performs all of the ingestion steps, but limits the processing to only the first 10 workunits produced by the source. diff --git a/docs/how/delete-metadata.md b/docs/how/delete-metadata.md index f720a66ce57652..d3f533b229cc1a 100644 --- a/docs/how/delete-metadata.md +++ b/docs/how/delete-metadata.md @@ -4,7 +4,7 @@ To follow this guide, you'll need the [DataHub CLI](../cli.md). ::: -There are a two ways to delete metadata from DataHub: +There are two ways to delete metadata from DataHub: 1. Delete metadata attached to entities by providing a specific urn or filters that identify a set of urns (delete CLI). 2. Delete metadata created by a single ingestion run (rollback). @@ -230,7 +230,7 @@ The second way to delete metadata is to identify entities (and the aspects affec To view the ids of the most recent set of ingestion batches, execute ```shell -datahub ingest list-runs +datahub ingest list-run-ids ``` That will print out a table of all the runs. Once you have an idea of which run you want to roll back, run From 311a05a5c4fcbe22fe9fc83c735bc094bac4dd68 Mon Sep 17 00:00:00 2001 From: Kevin Karch Date: Mon, 16 Dec 2024 08:53:23 -0500 Subject: [PATCH 8/8] fix(cli): changed name back to list-runs and updated doc --- docs/cli.md | 2 +- docs/how/delete-metadata.md | 10 ++++++++-- metadata-ingestion/src/datahub/cli/ingest_cli.py | 2 +- 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/docs/cli.md b/docs/cli.md index 4d75e106b82f03..1c38077d0d12ef 100644 --- a/docs/cli.md +++ b/docs/cli.md @@ -88,7 +88,7 @@ The `docker` command allows you to start up a local DataHub instance using `data The `ingest` command allows you to ingest metadata from your sources using ingestion configuration files, which we call recipes. Source specific crawlers are provided by plugins and might sometimes need additional extras to be installed. See [installing plugins](#installing-plugins) for more information. -[Removing Metadata from DataHub](./how/delete-metadata.md) contains detailed instructions about how you can use the ingest command to perform operations like rolling-back previously ingested metadata through the `rollback` sub-command and listing all runs that happened through `list-run-ids` sub-command. +[Removing Metadata from DataHub](./how/delete-metadata.md) contains detailed instructions about how you can use the ingest command to perform operations like rolling-back previously ingested metadata through the `rollback` sub-command and listing all runs that happened through `list-runs` sub-command. ```console Usage: datahub [datahub-options] ingest [command-options] diff --git a/docs/how/delete-metadata.md b/docs/how/delete-metadata.md index d3f533b229cc1a..e36940bf398356 100644 --- a/docs/how/delete-metadata.md +++ b/docs/how/delete-metadata.md @@ -230,10 +230,16 @@ The second way to delete metadata is to identify entities (and the aspects affec To view the ids of the most recent set of ingestion batches, execute ```shell -datahub ingest list-run-ids +datahub ingest list-runs ``` -That will print out a table of all the runs. Once you have an idea of which run you want to roll back, run +That will print out a table of all the runs. To see run statuses or to filter runs by URN/source run + +```shell +datahub ingest list-source-runs +``` + +Once you have an idea of which run you want to roll back, run ```shell datahub ingest show --run-id diff --git a/metadata-ingestion/src/datahub/cli/ingest_cli.py b/metadata-ingestion/src/datahub/cli/ingest_cli.py index 69ebb5d1c979ba..fcab07a1c2aaf6 100644 --- a/metadata-ingestion/src/datahub/cli/ingest_cli.py +++ b/metadata-ingestion/src/datahub/cli/ingest_cli.py @@ -558,7 +558,7 @@ def list_source_runs(page_offset: int, page_size: int, urn: str, source: str) -> ) @upgrade.check_upgrade @telemetry.with_telemetry() -def list_run_ids(page_offset: int, page_size: int, include_soft_deletes: bool) -> None: +def list_runs(page_offset: int, page_size: int, include_soft_deletes: bool) -> None: """List recent ingestion runs to datahub""" client = get_default_graph()