Skip to content

Commit

Permalink
feat: add more commands
Browse files Browse the repository at this point in the history
  • Loading branch information
Vixtir committed Sep 13, 2023
1 parent 9a3616b commit 950f5b8
Show file tree
Hide file tree
Showing 9 changed files with 197 additions and 67 deletions.
101 changes: 65 additions & 36 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,54 +1,83 @@
# OpenDataDiscovery dbt tests metadata collecting

[![PyPI version](https://badge.fury.io/py/odd-dbt.svg)](https://badge.fury.io/py/odd-dbt)

CLI tool helps automatically parse and ingest DBT test results to OpenDataDiscovery Platform.
It can be used as separated CLI tool or within [ODD CLI](https://github.com/opendatadiscovery/odd-cli) package which provides some useful additional features.
CLI tool helps run and ingest dbt test to platform.

It can be used as separated CLI tool or within [ODD CLI](https://github.com/opendatadiscovery/odd-cli) package which
provides some useful additional features for working with OpenDataDiscovery.

## Supported adapters

| Adapter | version |
|-----------|---------|
| Snowflake | ^1.6 |
| Postgres | ^1.6 |

Profiles inside the file looks different for each type of data source.

**Snowflake** host_settings value is created from field `account`. Field value should be `<account_identifier>`
For example the URL for an account uses the following format: `<account_identifier>`.snowflakecomputing.com
Example Snowflake account identifier `hj1234.eu-central-1`.

## Supported tests types

1. [x] Generic tests
2. [ ] Singular tests. Currently Singular tests are not supported.

## Installation
```pip install odd-dbt```

## Command options
## To see all available commands
```
╭─ Options ─────────────────────────────────────────────────────────────╮
│ --project-dir PATH [default: Path().cwd()odd-dbt] │
│ --target TEXT [default:None] │
│ --profile-name TEXT [default:None] │
│ * --host -h TEXT [env var: ODD_PLATFORM_HOST] │
│ * --token -t TEXT [env var: ODD_PLATFORM_TOKEN] │
│ --dbt-host TEXT [default: localhost] │
│ --help Show this message and exit. │
╰───────────────────────────────────────────────────────────────────────╯
odd_dbt_test --help
```

## Example
For each command that involves sending information to OpenDataDiscovery platform exists set of env variables:
1. `ODD_PLATFORM_HOST` - Where you platform is
2. `ODD_PLATFORM_TOKEN` - Token for ingesting data to platform (How to create [token](https://docs.opendatadiscovery.org/configuration-and-deployment/trylocally#create-collector-entity)?)
3. `DBT_DATA_SOURCE_ODDRN` - Unique oddrn string describes dbt source, i.e '//dbt/host/localhost'

## Command run example
How to create [collector token](https://docs.opendatadiscovery.org/configuration-and-deployment/trylocally#create-collector-entity)?
```bash
odd_dbt_test --host http://localhost:8080 --token <COLLECTOR_TOKEN>
It is recommended to add them as ENV variables or provide as flags to each command
```
export ODD_PLATFORM_HOST=http://localhost:8080
export ODD_PLATFORM_TOKEN=token***
export DBT_DATA_SOURCE_ODDRN=//dbt/host/localhost
```

## Supported data sources
| Source | |
| --------- | ------ |
| Snowflake | ^1.4.1 |
| Postgres | ^1.4.5 |
### Commands
`create-datasource` - helps to register dbt as data source at OpenDataDiscovery platform. User later for ingesting metadata.
```commandline
odd_dbt_test create-datasource --name=my_local_dbt --dbt-host=localhost
```

## Requirements
Library to inject Quality Tests entities requires presence of corresponding with them datasets entities in the platform.
For example: if you want to inject data quality test of Snowflake table, you need to have entity of that table present in the platform.
`ingest-test` - Read results_run file under the target folder to parse and ingest metadata.
```commandline
odd_dbt_test ingest-test --profile=my_profile
```

## Supported tests
Library supports for basics tests provided by dbt.
- `unique`: values in the column should be unique
- `not_null`: values in the column should not contain null values
- `accepted_values`: column should only contain values from list specified in the test config
- `relationships`: each value in the select column of the model exists as a specified field in the reference table (also known as referential integrity)
`test` - Proxy command to `dbt test`, then reads results_run file under the target folder to parse and ingest metadata.
```commandline
odd_dbt_test test --profile=my_profile
```

## ODDRN generation for datasets
`host_settings` of ODDRN generators required for source datasets are loaded from `.dbt/profiles.yml`.
### Run commands programmatically
You could run that scrip to read, parse and ingest test results to the platform.
```python
# ingest_test_result.py
from odd_dbt import config
from odd_dbt.domain.cli_args import CliArgs
from odd_dbt.service.dbt import get_context
from odd_dbt.service.odd import ingest_entities
from odd_dbt.mapper.test_results import DbtTestMapper

Profiles inside the file looks different for each type of data source.
cfg = config.Config() # All fields can be set manually or read from ENV variables
client = config.create_odd_client(host=cfg.odd_platform_host, token=cfg.odd_platform_token)
generator = config.create_dbt_generator_from_oddrn(oddrn=cfg.dbt_data_source_oddrn)

**Snowflake** host_settings value is created from field `account`. Field value should be `<account_identifier>`
For example the URL for an account uses the following format: `<account_identifier>`.snowflakecomputing.com
Example Snowflake account identifier `hj1234.eu-central-1`.
cli_args = CliArgs.default()
context = get_context(cli_args=cli_args)
data_entities = DbtTestMapper(context=context, generator=generator).map()
ingest_entities(data_entities, client)
```
79 changes: 56 additions & 23 deletions odd_dbt/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@

import typer
from dbt.cli.params import default_profiles_dir, default_project_dir
from odd_models.api_client.v2.odd_api_client import Client
from oddrn_generator import DbtGenerator

from odd_dbt import config
from odd_dbt import errors
from odd_dbt import get_version
from odd_dbt.logger import logger
from odd_dbt.mapper.test_results import DbtTestMapper
from odd_dbt.service import odd
from odd_dbt.service.dbt import run_tests, CliArgs, get_context

app = typer.Typer(
Expand All @@ -30,9 +30,8 @@ def test(
..., "--token", "-t", envvar="ODD_PLATFORM_TOKEN"
),
dbt_data_source_oddrn: str = typer.Option(
default=None,
..., "--dbt-oddrn", "-oddrn", envvar="DBT_DATA_SOURCE_ODDRN"
),
dbt_host: str = typer.Option(default="localhost"),
):
logger.info(f"Used OpenDataDiscovery dbt version: {get_version()}")
cli_args = CliArgs(
Expand All @@ -43,30 +42,15 @@ def test(
threads=1,
vars={},
)

try:
logger.info(
f"Run dbt process with {project_dir=}, {profiles_dir=}, {target=}, {profile=}"
)
run_tests(cli_args=cli_args)
context = get_context(cli_args=cli_args)

client = Client(host=platform_host, token=platform_token)
generator = DbtGenerator(host_settings=dbt_host)

if not dbt_data_source_oddrn:
dbt_data_source_oddrn = generator.get_data_source_oddrn()
logger.info(f"Creating data source for dbt: ODDRN={dbt_data_source_oddrn}")
client.create_data_source(
data_source_name="dbt",
data_source_oddrn=dbt_data_source_oddrn,
)
client = config.create_odd_client(host=platform_host, token=platform_token)
generator = config.create_dbt_generator_from_oddrn(oddrn=dbt_data_source_oddrn)

data_entities = DbtTestMapper(context=context, generator=generator).map()

logger.info("Mapping finished. Start injecting...")
client.ingest_data_entity_list(data_entities=data_entities)

logger.info("Injecting finished.")
odd.ingest_entities(data_entities, client)
except errors.DbtTestCommandError as e:
logger.error(e)
typer.Exit(2)
Expand All @@ -76,5 +60,54 @@ def test(
raise typer.Exit(1)


@app.command()
def create_datasource(
data_source_name=typer.Option(..., "--name", "-n"),
dbt_host: str = typer.Option(default="localhost"),
platform_host: str = typer.Option(..., "--host", "-h", envvar="ODD_PLATFORM_HOST"),
platform_token: str = typer.Option(
..., "--token", "-t", envvar="ODD_PLATFORM_TOKEN"
),
):
client = config.create_odd_client(host=platform_host, token=platform_token)
oddrn = odd.create_datasource(data_source_name, dbt_host, client)

logger.info(f"Data source oddrn: '{oddrn}'")
logger.info(
"You can use command below to add newly created ODDRN for next commands:"
)
logger.info(f"export DBT_DATA_SOURCE_ODDRN={oddrn}")


@app.command()
def ingest_test(
project_dir: Path = typer.Option(default=default_project_dir()),
profiles_dir: Path = typer.Option(default=default_profiles_dir()),
target: Optional[str] = typer.Option(default=None),
profile: Optional[str] = typer.Option(default=None),
platform_host: str = typer.Option(..., "--host", "-h", envvar="ODD_PLATFORM_HOST"),
platform_token: str = typer.Option(
..., "--token", "-t", envvar="ODD_PLATFORM_TOKEN"
),
dbt_data_source_oddrn: str = typer.Option(
..., "--dbt-oddrn", "-oddrn", envvar="DBT_DATA_SOURCE_ODDRN"
),
):
cli_args = CliArgs(
project_dir=project_dir,
profiles_dir=profiles_dir,
profile=profile,
target=target,
threads=1,
vars={},
)
context = get_context(cli_args=cli_args)
client = config.create_odd_client(host=platform_host, token=platform_token)
generator = config.create_dbt_generator_from_oddrn(oddrn=dbt_data_source_oddrn)

data_entities = DbtTestMapper(context=context, generator=generator).map()
odd.ingest_entities(data_entities, client)


if __name__ == "__main__":
app()
25 changes: 25 additions & 0 deletions odd_dbt/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import oddrn_generator as odd
from odd_models.api_client.v2.odd_api_client import Client
from pydantic import BaseSettings


class Config(BaseSettings):
odd_platform_host: str
odd_platform_token: str
dbt_data_source_oddrn: str


def create_odd_client(host: str = None, token: str = None) -> Client:
return Client(host=host, token=token)


def create_dbt_generator_from_oddrn(oddrn: str) -> odd.DbtGenerator:
return odd.DbtGenerator(host_settings=extract_host_from_oddrn(oddrn))


def create_dbt_generator(host: str) -> odd.DbtGenerator:
return odd.DbtGenerator(host_settings=host)


def extract_host_from_oddrn(oddrn: str) -> str:
return oddrn.split("//dbt/host/")[-1]
13 changes: 13 additions & 0 deletions odd_dbt/domain/cli_args.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from pathlib import Path
from typing import Optional

from dbt.cli.resolvers import default_project_dir, default_profiles_dir


@dataclass
class CliArgs:
Expand All @@ -12,6 +14,17 @@ class CliArgs:
threads: Optional[int]
vars: Optional[dict[str, str]]

@classmethod
def default(cls):
return cls(
project_dir=default_project_dir(),
profiles_dir=default_profiles_dir(),
profile=None,
target=None,
threads=1,
vars=dict(),
)


@dataclass
class FlagsArgs:
Expand Down
4 changes: 2 additions & 2 deletions odd_dbt/domain/context.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from dbt.config.runtime import RuntimeConfig

from odd_dbt.domain import Manifest, Credentials, RunResults
from odd_dbt.domain import Manifest, Credentials, RunResults, Result
from odd_dbt.domain.cli_args import CliArgs
from odd_dbt.errors import DbtInternalError
from odd_dbt.utils import load_json
Expand Down Expand Up @@ -38,7 +38,7 @@ def run_results(self) -> RunResults:
return RunResults(self.target_path / "run_results.json")

@property
def results(self) -> list[dict]:
def results(self) -> list[Result]:
return self.run_results.results

@property
Expand Down
6 changes: 4 additions & 2 deletions odd_dbt/domain/manifest.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from pathlib import Path

from odd_dbt.utils import load_json
from funcy import walk_values
from dbt.contracts.graph.nodes import ParsedNode


class Manifest:
def __init__(self, file: Path) -> None:
self._manifest = load_json(file)

@property
def nodes(self):
return self._manifest["nodes"]
def nodes(self) -> list[ParsedNode]:
return walk_values(ParsedNode._deserialize, self._manifest["nodes"])
9 changes: 5 additions & 4 deletions odd_dbt/mapper/test_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import pytz
from dbt.contracts.graph.nodes import ParsedNode, TestNode
from funcy import lkeep, walk_values
from funcy import lkeep
from odd_models.models import (
DataEntity,
DataEntityList,
Expand All @@ -16,8 +16,8 @@
)
from oddrn_generator import DbtGenerator

from odd_dbt.domain.context import DbtContext
from odd_dbt.domain import Result
from odd_dbt.domain.context import DbtContext
from odd_dbt.mapper.generator import create_generator
from odd_dbt.mapper.metadata import get_metadata
from odd_dbt.mapper.status_reason import StatusReason
Expand All @@ -31,11 +31,12 @@ def __init__(self, context: DbtContext, generator: DbtGenerator) -> None:

def map(self) -> DataEntityList:
data_entities = []
all_nodes = walk_values(ParsedNode._deserialize, self._context.manifest.nodes)

for result in self._context.results:
try:
data_entities.extend(self.map_result(result, all_nodes))
data_entities.extend(
self.map_result(result, self._context.manifest.nodes)
)
except Exception as e:
logger.warning(f"Can't map result {result.unique_id}: {str(e)}")
logger.debug(traceback.format_exc())
Expand Down
5 changes: 5 additions & 0 deletions odd_dbt/service/dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@
from odd_dbt.domain.cli_args import CliArgs, FlagsArgs
from odd_dbt.domain.context import DbtContext
from odd_dbt.logger import logger
from odd_dbt import domain


def collect_test_results(context: DbtContext) -> list[domain.Result]:
return context.results


def run_tests(cli_args: CliArgs) -> None:
Expand Down
22 changes: 22 additions & 0 deletions odd_dbt/service/odd.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from odd_models import DataEntityList
from odd_models.api_client.v2.odd_api_client import Client

from odd_dbt import config
from odd_dbt.logger import logger


def create_datasource(name: str, dbt_host: str, client: Client) -> None:
generator = config.create_dbt_generator(host=dbt_host)
oddrn = generator.get_data_source_oddrn()
client.create_data_source(
data_source_name=name,
data_source_oddrn=oddrn,
)
return oddrn


def ingest_entities(data_entities: DataEntityList, client: Client) -> None:
client.ingest_data_entity_list(data_entities=data_entities)
logger.success(
f"Injecting test results finished. Ingested {len(data_entities.items)} entities"
)

0 comments on commit 950f5b8

Please sign in to comment.