Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cli): added cli option for ingestion source #11980

Merged
13 changes: 13 additions & 0 deletions docs/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,19 @@ datahub ingest -c ./examples/recipes/example_to_datahub_rest.dhub.yaml --dry-run
datahub ingest -c ./examples/recipes/example_to_datahub_rest.dhub.yaml -n
```

#### ingest --list-source-runs

The `--list-source-runs` option of the `ingest` command lists the previous runs, displaying their run ID, source name,
start time, status, and source URN. This command allows you to filter results using the --urn option for URN-based
filtering or the --source option to filter by source name (partial or complete matches are supported).

```shell
# List all ingestion runs
datahub ingest --list-source-runs
# Filter runs by a source name containing "demo"
datahub ingest --list-source-runs --source "demo"
```

#### ingest --preview

The `--preview` option of the `ingest` command performs all of the ingestion steps, but limits the processing to only the first 10 workunits produced by the source.
Expand Down
10 changes: 8 additions & 2 deletions docs/how/delete-metadata.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
To follow this guide, you'll need the [DataHub CLI](../cli.md).
:::

There are a two ways to delete metadata from DataHub:
There are two ways to delete metadata from DataHub:

1. Delete metadata attached to entities by providing a specific urn or filters that identify a set of urns (delete CLI).
2. Delete metadata created by a single ingestion run (rollback).
Expand Down Expand Up @@ -233,7 +233,13 @@ To view the ids of the most recent set of ingestion batches, execute
datahub ingest list-runs
```

That will print out a table of all the runs. Once you have an idea of which run you want to roll back, run
That will print out a table of all the runs. To see run statuses or to filter runs by URN/source run

```shell
datahub ingest list-source-runs
```

Once you have an idea of which run you want to roll back, run

```shell
datahub ingest show --run-id <run-id>
Expand Down
110 changes: 110 additions & 0 deletions metadata-ingestion/src/datahub/cli/ingest_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

logger = logging.getLogger(__name__)

INGEST_SRC_TABLE_COLUMNS = ["runId", "source", "startTime", "status", "URN"]
RUNS_TABLE_COLUMNS = ["runId", "rows", "created at"]
RUN_TABLE_COLUMNS = ["urn", "aspect name", "created at"]

Expand Down Expand Up @@ -437,6 +438,115 @@ def mcps(path: str) -> None:
sys.exit(ret)


@ingest.command()
@click.argument("page_offset", type=int, default=0)
@click.argument("page_size", type=int, default=100)
@click.option("--urn", type=str, default=None, help="Filter by ingestion source URN.")
@click.option(
"--source", type=str, default=None, help="Filter by ingestion source name."
)
@upgrade.check_upgrade
@telemetry.with_telemetry()
def list_source_runs(page_offset: int, page_size: int, urn: str, source: str) -> None:
"""List ingestion source runs with their details, optionally filtered by URN or source."""

query = """
query listIngestionRuns($input: ListIngestionSourcesInput!) {
listIngestionSources(input: $input) {
ingestionSources {
urn
name
executions {
executionRequests {
id
result {
startTimeMs
status
}
}
}
}
}
}
"""

# filter by urn and/or source using CONTAINS
filters = []
if urn:
filters.append({"field": "urn", "values": [urn], "condition": "CONTAIN"})
if source:
filters.append({"field": "name", "values": [source], "condition": "CONTAIN"})

variables = {
"input": {
"start": page_offset,
"count": page_size,
"filters": filters,
}
}

client = get_default_graph()
session = client._session
gms_host = client.config.server

url = f"{gms_host}/api/graphql"
try:
response = session.post(url, json={"query": query, "variables": variables})
response.raise_for_status()
except Exception as e:
click.echo(f"Error fetching data: {str(e)}")
return

try:
data = response.json()
except ValueError:
click.echo("Failed to parse JSON response from server.")
return

if not data:
click.echo("No response received from the server.")
return

# when urn or source filter does not match, exit gracefully
if (
not isinstance(data.get("data"), dict)
or "listIngestionSources" not in data["data"]
):
click.echo("No matching ingestion sources found. Please check your filters.")
return

ingestion_sources = data["data"]["listIngestionSources"]["ingestionSources"]
if not ingestion_sources:
click.echo("No ingestion sources or executions found.")
return

rows = []
for ingestion_source in ingestion_sources:
urn = ingestion_source.get("urn", "N/A")
name = ingestion_source.get("name", "N/A")

executions = ingestion_source.get("executions", {}).get("executionRequests", [])
for execution in executions:
execution_id = execution.get("id", "N/A")
start_time = execution.get("result", {}).get("startTimeMs", "N/A")
start_time = (
datetime.fromtimestamp(start_time / 1000).strftime("%Y-%m-%d %H:%M:%S")
if start_time != "N/A"
else "N/A"
)
status = execution.get("result", {}).get("status", "N/A")

rows.append([execution_id, name, start_time, status, urn])

click.echo(
tabulate(
rows,
headers=INGEST_SRC_TABLE_COLUMNS,
tablefmt="grid",
)
)


@ingest.command()
@click.argument("page_offset", type=int, default=0)
@click.argument("page_size", type=int, default=100)
Expand Down
Loading