Skip to content

Commit

Permalink
Filter out non-root resources (#61)
Browse files Browse the repository at this point in the history
Resource with YAML description defined in a package are filtered out.
Closes #60
  • Loading branch information
kokorin authored Sep 21, 2024
1 parent 26f82c8 commit 6a0d5ec
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 23 deletions.
52 changes: 38 additions & 14 deletions dbt_pumpkin/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def load_manifest(self) -> Manifest:
self._manifest = self._do_load_manifest()
return self._manifest

def _do_select_resource_ids(self) -> dict[ResourceType, set[ResourceID]]:
def _do_list_all_resource_ids(self) -> dict[ResourceType, set[ResourceID]]:
"""
Returns a dictionary mapping resource type to a set of resource identifiers
"""
Expand All @@ -86,15 +86,11 @@ def _do_select_resource_ids(self) -> dict[ResourceType, set[ResourceID]]:
raise res.exception

result: dict[ResourceType, set[ResourceID]] = {}
# TODO: after dropping DBT 1.5 support we can get project name from Manifest
# self.load_manifest().metadata.project_name
project_name = self._parse_project_yml()["name"]

for raw_resource in res.result:
resource = json.loads(raw_resource)
resource_type_str = resource["resource_type"]
resource_package_name = resource["package_name"]
if resource_type_str in ResourceType.values() and resource_package_name == project_name:
if resource_type_str in ResourceType.values():
res_type = ResourceType(resource_type_str)
res_id = ResourceID(resource["unique_id"])

Expand All @@ -104,25 +100,47 @@ def _do_select_resource_ids(self) -> dict[ResourceType, set[ResourceID]]:

return result

def select_resource_ids(self) -> dict[ResourceType, set[ResourceID]]:
def list_all_resource_ids(self) -> dict[ResourceType, set[ResourceID]]:
"""
Returns all Resource Identifiers (grouped by Resource type) defined in DBT project (including packages)
"""
if self._resource_ids is None:
self._resource_ids = self._do_select_resource_ids()
self._resource_ids = self._do_list_all_resource_ids()

return self._resource_ids

def select_raw_resources(self) -> list[SourceDefinition | SeedNode | ModelNode | SnapshotNode]:
"""
Returns a list of raw Resources that can be processed by dbt-pumpkin.
Resources defined in a package or having YAML description defined in a package are filtered out.
"""
manifest = self.load_manifest()
results: list[SourceDefinition | SeedNode | ModelNode | SnapshotNode] = []

project_name = self.get_project_name()
project_path_path_prefix = project_name + "://"
raw_resources_by_type = {
ResourceType.SOURCE: manifest.sources,
ResourceType.MODEL: manifest.nodes,
ResourceType.SEED: manifest.nodes,
ResourceType.SNAPSHOT: manifest.nodes,
}
for res_type, res_ids in self.select_resource_ids().items():
for res_type, res_ids in self.list_all_resource_ids().items():
for res_id in res_ids:
raw_resource = raw_resources_by_type[res_type][str(res_id)]
if raw_resource.package_name != project_name:
logger.debug(
"Skipping resource %s defined in package %s", raw_resource.unique_id, raw_resource.package_name
)
continue
if raw_resource.patch_path and not raw_resource.patch_path.startswith(project_path_path_prefix):
logger.warning(
"Skipping resource %s: YAML descriptor is not in root package %s",
raw_resource.unique_id,
raw_resource.package_name,
)
continue
results.append(raw_resource)

return results
Expand All @@ -149,11 +167,7 @@ def _do_select_resources(self) -> list[Resource]:
else:
path = Path(raw_resource.original_file_path)
if raw_resource.patch_path:
# patch_path starts with "project_name://", we just remove it
# DBT 1.5 has no manifest.metadata.project_name, so we use resource FQN which starts with project name
# patch_path_prefix = self.manifest.metadata.project_name + "://"
patch_path_prefix = raw_resource.fqn[0] + "://"
fixed_patch_path = raw_resource.patch_path.removeprefix(patch_path_prefix)
fixed_patch_path = raw_resource.patch_path.split("://")[-1]
yaml_path = Path(fixed_patch_path)

pumpkin_types = raw_resource.config.get("dbt-pumpkin-types", {})
Expand Down Expand Up @@ -189,6 +203,11 @@ def _do_select_resources(self) -> list[Resource]:
return results

def select_resources(self) -> list[Resource]:
"""
Returns a list of Resources that can be processed by dbt-pumpkin.
Resources defined in a package or having YAML description defined in a package are filtered out.
"""
if self._resources is None:
self._resources = self._do_select_resources()

Expand Down Expand Up @@ -251,6 +270,11 @@ def _parse_project_yml(self) -> dict[str, any]:
raise PumpkinError(msg)
return self._yaml.load(project_yml_path)

def get_project_name(self) -> str:
# TODO: after dropping DBT 1.5 support we can get project name from Manifest
# self.load_manifest().metadata.project_name
return self._parse_project_yml()["name"]

def detect_yaml_format(self) -> YamlFormat | None:
pumpkin_var = self._parse_project_yml().get("vars", {}).get("dbt-pumpkin")
if pumpkin_var is None:
Expand Down
4 changes: 2 additions & 2 deletions tests/test_dbt_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,14 @@ def test_load_manifest(monkeypatch: MonkeyPatch, dbt_project_path):


def test_resource_ids(monkeypatch: MonkeyPatch, dbt_project_path):
not_patched = new_loader(dbt_project_path).select_resource_ids()
not_patched = new_loader(dbt_project_path).list_all_resource_ids()
assert not_patched

with monkeypatch.context() as m:
for patch in prepare_monkey_patches():
m.setattr(patch.obj, patch.name, patch.value)

patched = new_loader(dbt_project_path).select_resource_ids()
patched = new_loader(dbt_project_path).list_all_resource_ids()

assert patched
assert not_patched == patched
Expand Down
114 changes: 107 additions & 7 deletions tests/test_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,85 @@ def my_pumpkin() -> Path:
)


@pytest.fixture(scope="module")
def overridden_resources() -> Path:
return mock_project(
files={
"dbt_project.yml": """\
name: my_pumpkin
version: 1.0.0
profile: test_pumpkin
seeds:
my_pumpkin:
+dbt-pumpkin-path: _schema.yml
my_package:
+enabled: false
snapshots:
my_pumpkin:
+dbt-pumpkin-path: _schema.yml
my_package:
+enabled: false
models:
my_pumpkin:
+dbt-pumpkin-path: _schema.yml
my_package:
+enabled: false
""",
"models/stg_customers.sql": """\
select 42 as id, 'Jon Snow' as name
""",
"seeds/seed_customers.csv": """\
id,name
1,Tyrion Lannister
""",
"snapshots/customers_snapshot.sql": """\
{% snapshot customers_snapshot %}
{{ config(unique_key='id', target_schema='snapshots', strategy='check', check_cols='all',) }}
select 41 as id, 'Eddard Stark' as name
{% endsnapshot %}
""",
},
local_packages={
"my_package": {
"dbt_project.yml": """\
name: my_package
version: 1.0.0
profile: test_pumpkin
""",
"models/stg_customers.yml": """\
version: 2
models:
- name: stg_customers
""",
"models/stg_customers.sql": """\
select 0/0 as id
""",
"seeds/seed_customers.yml": """\
version: 2
seeds:
- name: seed_customers
""",
"seeds/seed_customers.csv": """\
id,name
0,noname00
""",
"snapshots/customers_snapshot.yml": """\
version: 2
snapshots:
- name: customers_snapshot
""",
"snapshots/customers_snapshot.sql": """\
{% snapshot customers_snapshot %}
{{ config(unique_key='id', target_schema='snapshots', strategy='check', check_cols='all',) }}
select 0/0 as id
{% endsnapshot %}
""",
}
},
build=False,
)


@pytest.fixture
def loader_all(my_pumpkin) -> ResourceLoader:
return ResourceLoader(
Expand All @@ -83,6 +162,14 @@ def loader_all(my_pumpkin) -> ResourceLoader:
)


@pytest.fixture
def loader_overridden_resources(overridden_resources) -> ResourceLoader:
return ResourceLoader(
project_params=ProjectParams(str(overridden_resources), str(overridden_resources)),
resource_params=ResourceParams(),
)


@pytest.fixture
def loader_only_sources(my_pumpkin) -> ResourceLoader:
return ResourceLoader(
Expand Down Expand Up @@ -316,7 +403,7 @@ def test_manifest(loader_all):


def test_selected_resource_ids(loader_all):
assert loader_all.select_resource_ids() == {
assert loader_all.list_all_resource_ids() == {
ResourceType.SEED: {
ResourceID("seed.my_pumpkin.seed_customers"),
},
Expand All @@ -333,44 +420,47 @@ def test_selected_resource_ids(loader_all):


def test_selected_resource_ids_only_sources(loader_only_sources: ResourceLoader):
assert loader_only_sources.select_resource_ids() == {
assert loader_only_sources.list_all_resource_ids() == {
ResourceType.SOURCE: {
ResourceID("source.my_pumpkin.pumpkin.customers"),
}
}


def test_selected_resource_ids_only_seeds(loader_only_seeds: ResourceLoader):
assert loader_only_seeds.select_resource_ids() == {
assert loader_only_seeds.list_all_resource_ids() == {
ResourceType.SEED: {
ResourceID("seed.my_pumpkin.seed_customers"),
},
}


def test_selected_resource_ids_only_snapshots(loader_only_snapshots: ResourceLoader):
assert loader_only_snapshots.select_resource_ids() == {
assert loader_only_snapshots.list_all_resource_ids() == {
ResourceType.SNAPSHOT: {
ResourceID("snapshot.my_pumpkin.customers_snapshot"),
},
}


def test_selected_resource_ids_only_models(loader_only_models: ResourceLoader):
assert loader_only_models.select_resource_ids() == {
assert loader_only_models.list_all_resource_ids() == {
ResourceType.MODEL: {
ResourceID("model.my_pumpkin.stg_customers"),
},
}


def test_selected_resources_non_project_resources_excluded(loader_with_deps):
assert loader_with_deps.select_resource_ids() == {
assert loader_with_deps.list_all_resource_ids() == {
ResourceType.MODEL: {
ResourceID("model.test_pumpkin.customers"),
ResourceID("model.extra.extra_customers"),
},
}

assert {r.unique_id for r in loader_with_deps.select_raw_resources()} == {"model.test_pumpkin.customers"}


def test_selected_resources(loader_all):
def sort_order(res: Resource):
Expand Down Expand Up @@ -530,8 +620,18 @@ def test_selected_resource_config(loader_configured_paths):
}


def test_overridden_resources(loader_overridden_resources):
assert loader_overridden_resources.list_all_resource_ids() == {
ResourceType.SEED: {ResourceID("seed.my_pumpkin.seed_customers")},
ResourceType.MODEL: {ResourceID("model.my_pumpkin.stg_customers")},
ResourceType.SNAPSHOT: {ResourceID("snapshot.my_pumpkin.customers_snapshot")},
}
assert [] == loader_overridden_resources.select_raw_resources()
assert [] == loader_overridden_resources.select_resources()


def test_selected_resources_total_count(loader_all):
assert sum(len(ids) for ids in loader_all.select_resource_ids().values()) == len(loader_all.select_resources())
assert sum(len(ids) for ids in loader_all.list_all_resource_ids().values()) == len(loader_all.select_resources())


def test_selected_resource_tables(loader_all):
Expand Down

0 comments on commit 6a0d5ec

Please sign in to comment.