Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingestion/dremio): Ignore filtered containers in schema allowdeny pattern #11959

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
72 commits
Select commit Hold shift + click to select a range
377b60b
improved schema filtering
acrylJonny Nov 25, 2024
caf4686
Update dremio_api.py
acrylJonny Nov 25, 2024
0028004
Merge branch 'master' into dremio-space-source-filters
acrylJonny Nov 25, 2024
6c7f7b0
linting
acrylJonny Nov 25, 2024
df308ab
Update dremio_aspects.py
acrylJonny Nov 26, 2024
8c45bce
Merge branch 'master' into dremio-space-source-filters
acrylJonny Nov 26, 2024
725feb6
Merge branch 'master' into dremio-space-source-filters
acrylJonny Nov 28, 2024
290e25a
Update dremio_api.py
acrylJonny Nov 29, 2024
2914f66
addressing comment
acrylJonny Nov 29, 2024
3847574
Merge branch 'master' into dremio-space-source-filters
acrylJonny Nov 29, 2024
0e0a006
linting
acrylJonny Nov 29, 2024
e780c7b
Merge branch 'dremio-space-source-filters' of https://github.com/acry…
acrylJonny Nov 29, 2024
b015626
more linting
acrylJonny Nov 29, 2024
1ef76bb
Update test_dremio.py
acrylJonny Nov 29, 2024
3d9e386
golden mce update
acrylJonny Dec 1, 2024
f2debf2
Update dremio_api.py
acrylJonny Dec 1, 2024
c94e1b5
Update dremio_api.py
acrylJonny Dec 1, 2024
e444c50
Update dremio_api.py
acrylJonny Dec 1, 2024
37ede9c
Update dremio_api.py
acrylJonny Dec 1, 2024
c72ec95
Merge branch 'master' into dremio-space-source-filters
acrylJonny Dec 1, 2024
4422ae3
Update dremio_mces_golden.json
acrylJonny Dec 1, 2024
5440ccc
test updates
acrylJonny Dec 3, 2024
3ae2dfb
Update dremio_mces_golden.json
acrylJonny Dec 3, 2024
595d0f4
test updates
acrylJonny Dec 3, 2024
1de3217
test updates
acrylJonny Dec 3, 2024
276aa7e
Revert "test updates"
acrylJonny Dec 3, 2024
c1b94aa
Update dremio_mces_golden.json
acrylJonny Dec 3, 2024
cdf69f9
Merge branch 'master' into dremio-space-source-filters
acrylJonny Dec 3, 2024
70b8e13
test updates
acrylJonny Dec 3, 2024
39a71c1
Update test_dremio.py
acrylJonny Dec 3, 2024
548f772
Update test_dremio.py
acrylJonny Dec 3, 2024
cfa809c
update golden mces
acrylJonny Dec 3, 2024
b29c217
Merge branch 'master' into dremio-space-source-filters
acrylJonny Dec 3, 2024
9f5ebc7
Update dremio_api.py
acrylJonny Dec 5, 2024
17674fb
Update dremio_api.py
acrylJonny Dec 5, 2024
5593d2f
Update dremio_api.py
acrylJonny Dec 5, 2024
b384a3e
Update dremio_api.py
acrylJonny Dec 5, 2024
70e37b6
Merge branch 'master' into dremio-space-source-filters
acrylJonny Dec 5, 2024
6a144f0
update to include multi-part paths
acrylJonny Dec 5, 2024
eb6bc03
Update dremio_api.py
acrylJonny Dec 5, 2024
f7287c3
Update dremio_api.py
acrylJonny Dec 5, 2024
39abde4
Update dremio_api.py
acrylJonny Dec 5, 2024
a224dc3
Update dremio_api.py
acrylJonny Dec 5, 2024
d1c1eb2
Update dremio_api.py
acrylJonny Dec 5, 2024
266d51b
Merge branch 'master' into dremio-space-source-filters
acrylJonny Dec 5, 2024
872bf9d
Update dremio_api.py
acrylJonny Dec 5, 2024
cc57083
reverting platform instance change to add platform instance to contai…
acrylJonny Dec 5, 2024
619bb48
Update dremio_schema_filter_to_file.yml
acrylJonny Dec 5, 2024
d5cf511
Update dremio_api.py
acrylJonny Dec 5, 2024
2ba1c59
reimplementing platform_instance
acrylJonny Dec 6, 2024
bf61172
Merge branch 'master' into dremio-space-source-filters
acrylJonny Dec 6, 2024
8ed0543
Update dremio_api.py
acrylJonny Dec 6, 2024
4378e8a
updating schema_filter with tests
acrylJonny Dec 6, 2024
3024299
Merge branch 'master' into dremio-space-source-filters
acrylJonny Dec 6, 2024
d193651
linting
acrylJonny Dec 6, 2024
0e3ee8e
Merge branch 'master' into dremio-space-source-filters
acrylJonny Dec 6, 2024
9574089
Update test_dremio_schema_filter.py
acrylJonny Dec 6, 2024
44d5811
Merge branch 'master' into dremio-space-source-filters
acrylJonny Dec 6, 2024
7a3523b
Update test_dremio_schema_filter.py
acrylJonny Dec 6, 2024
38d28b3
Merge branch 'dremio-space-source-filters' of https://github.com/acry…
acrylJonny Dec 6, 2024
f461e0a
Merge branch 'master' into dremio-space-source-filters
acrylJonny Dec 7, 2024
88ded76
Update metadata-ingestion/tests/unit/dremio/test_dremio_schema_filter.py
acrylJonny Dec 11, 2024
693f291
Update metadata-ingestion/tests/unit/dremio/test_dremio_schema_filter.py
acrylJonny Dec 11, 2024
7c21014
Update metadata-ingestion/tests/unit/dremio/test_dremio_schema_filter.py
acrylJonny Dec 11, 2024
5214565
Update metadata-ingestion/tests/unit/dremio/test_dremio_schema_filter.py
acrylJonny Dec 11, 2024
17c20eb
Merge branch 'master' into dremio-space-source-filters
acrylJonny Dec 11, 2024
3efaf0b
update to pass all tests
acrylJonny Dec 11, 2024
b07923f
quick lint
acrylJonny Dec 11, 2024
9d4be05
Merge branch 'master' into dremio-space-source-filters
mayurinehate Dec 12, 2024
9beace1
Merge branch 'master' into dremio-space-source-filters
acrylJonny Dec 12, 2024
66c21a5
updating tests
acrylJonny Dec 12, 2024
0f4ba84
Merge branch 'master' into dremio-space-source-filters
acrylJonny Dec 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
279 changes: 193 additions & 86 deletions metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_api.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import concurrent.futures
import json
import logging
import re
import warnings
from collections import defaultdict
from enum import Enum
Expand Down Expand Up @@ -609,32 +610,6 @@ def extract_all_queries(self) -> List[Dict[str, Any]]:

return self.execute_query(query=jobs_query)

def get_source_by_id(self, source_id: str) -> Optional[Dict]:
"""
Fetch source details by ID.
"""
response = self.get(
url=f"/source/{source_id}",
)
return response if response else None

def get_source_for_dataset(self, schema: str, dataset: str) -> Optional[Dict]:
"""
Get source information for a dataset given its schema and name.
"""
dataset_id = self.get_dataset_id(schema, dataset)
if not dataset_id:
return None

catalog_entry = self.get(
url=f"/catalog/{dataset_id}",
)
if not catalog_entry or "path" not in catalog_entry:
return None

source_id = catalog_entry["path"][0]
return self.get_source_by_id(source_id)

def get_tags_for_resource(self, resource_id: str) -> Optional[List[str]]:
"""
Get Dremio tags for a given resource_id.
Expand Down Expand Up @@ -673,55 +648,119 @@ def get_description_for_resource(self, resource_id: str) -> Optional[str]:
)
return None

def get_containers_for_location(
self, resource_id: str, path: List[str]
) -> List[Dict[str, str]]:
containers = []
def _check_pattern_match(
self,
pattern: str,
paths: List[str],
allow_prefix: bool = True,
) -> bool:
"""
Helper method to check if a pattern matches any of the paths.
Handles hierarchical matching where each level is matched independently.
Also handles prefix matching for partial paths.
"""
if pattern == ".*":
return True

def traverse_path(location_id: str, entity_path: List[str]) -> List:
nonlocal containers
try:
response = self.get(url=f"/catalog/{location_id}")
if (
response.get("entityType")
== DremioEntityContainerType.FOLDER.value.lower()
):
containers.append(
{
"id": location_id,
"name": entity_path[-1],
"path": entity_path[:-1],
"container_type": DremioEntityContainerType.FOLDER,
}
)
# Convert the pattern to regex with proper anchoring
regex_pattern = pattern
if pattern.startswith("^"):
# Already has start anchor
regex_pattern = pattern.replace(".", r"\.") # Escape dots
regex_pattern = regex_pattern.replace(
r"\.*", ".*"
) # Convert .* to wildcard
else:
# Add start anchor and handle dots
regex_pattern = "^" + pattern.replace(".", r"\.").replace(r"\.*", ".*")

# Handle end matching
if not pattern.endswith(".*"):
if pattern.endswith("$"):
# Keep explicit end anchor
pass
elif not allow_prefix:
# Add end anchor for exact matching
regex_pattern = regex_pattern + "$"

for path in paths:
if re.match(regex_pattern, path, re.IGNORECASE):
return True

for container in response.get("children", []):
if (
container.get("type")
== DremioEntityContainerType.CONTAINER.value
):
traverse_path(container.get("id"), container.get("path"))
return False

except Exception as exc:
logging.info(
"Location {} contains no tables or views. Skipping...".format(id)
)
self.report.warning(
message="Failed to get tables or views",
context=f"{id}",
exc=exc,
)
def should_include_container(self, path: List[str], name: str) -> bool:
"""
Helper method to check if a container should be included based on schema patterns.
Used by both get_all_containers and get_containers_for_location.
"""
path_components = path + [name] if path else [name]
full_path = ".".join(path_components)

return containers
# Default allow everything case
if self.allow_schema_pattern == [".*"] and not self.deny_schema_pattern:
self.report.report_container_scanned(full_path)
return True

return traverse_path(location_id=resource_id, entity_path=path)
# Check deny patterns first
if self.deny_schema_pattern:
for pattern in self.deny_schema_pattern:
if self._check_pattern_match(
pattern=pattern,
paths=[full_path],
allow_prefix=False,
):
self.report.report_container_filtered(full_path)
return False

# Check allow patterns
for pattern in self.allow_schema_pattern:
# For patterns with wildcards, check if this path is a parent of the pattern
if "*" in pattern:
pattern_parts = pattern.split(".")
path_parts = path_components

# If pattern has exact same number of parts, check each component
if len(pattern_parts) == len(path_parts):
matches = True
for p_part, c_part in zip(pattern_parts, path_parts):
if p_part != "*" and p_part.lower() != c_part.lower():
matches = False
break
if matches:
self.report.report_container_scanned(full_path)
return True
# Otherwise check if current path is prefix match
else:
# Remove the trailing wildcard if present
if pattern_parts[-1] == "*":
pattern_parts = pattern_parts[:-1]

for i in range(len(path_parts)):
current_path = ".".join(path_parts[: i + 1])
pattern_prefix = ".".join(pattern_parts[: i + 1])

if pattern_prefix.startswith(current_path):
self.report.report_container_scanned(full_path)
return True

# Direct pattern matching
if self._check_pattern_match(
pattern=pattern,
paths=[full_path],
allow_prefix=True,
):
self.report.report_container_scanned(full_path)
return True

self.report.report_container_filtered(full_path)
return False

def get_all_containers(self):
"""
Query the Dremio sources API and return source information.
Query the Dremio sources API and return filtered source information.
"""
containers = []

response = self.get(url="/catalog")

def process_source(source):
Expand All @@ -731,34 +770,41 @@ def process_source(source):
)

source_config = source_resp.get("config", {})
if source_config.get("database"):
db = source_config.get("database")
else:
db = source_config.get("databaseName", "")

return {
"id": source.get("id"),
"name": source.get("path")[0],
"path": [],
"container_type": DremioEntityContainerType.SOURCE,
"source_type": source_resp.get("type"),
"root_path": source_config.get("rootPath"),
"database_name": db,
}
db = source_config.get(
"database", source_config.get("databaseName", "")
)

if self.should_include_container([], source.get("path")[0]):
return {
"id": source.get("id"),
"name": source.get("path")[0],
"path": [],
"container_type": DremioEntityContainerType.SOURCE,
"source_type": source_resp.get("type"),
"root_path": source_config.get("rootPath"),
"database_name": db,
}
else:
return {
"id": source.get("id"),
"name": source.get("path")[0],
"path": [],
"container_type": DremioEntityContainerType.SPACE,
}
if self.should_include_container([], source.get("path")[0]):
return {
"id": source.get("id"),
"name": source.get("path")[0],
"path": [],
"container_type": DremioEntityContainerType.SPACE,
}
return None

def process_source_and_containers(source):
container = process_source(source)
if not container:
return []

# Get sub-containers
sub_containers = self.get_containers_for_location(
resource_id=container.get("id"),
path=[container.get("name")],
)

return [container] + sub_containers

# Use ThreadPoolExecutor to parallelize the processing of sources
Expand All @@ -771,7 +817,16 @@ def process_source_and_containers(source):
}

for future in concurrent.futures.as_completed(future_to_source):
containers.extend(future.result())
source = future_to_source[future]
try:
containers.extend(future.result())
except Exception as exc:
logger.error(f"Error processing source: {exc}")
self.report.warning(
message="Failed to process source",
context=f"{source}",
exc=exc,
)

return containers

Expand All @@ -785,3 +840,55 @@ def get_context_for_vds(self, resource_id: str) -> str:
)
else:
return ""

def get_containers_for_location(
self, resource_id: str, path: List[str]
) -> List[Dict[str, str]]:
containers = []

def traverse_path(location_id: str, entity_path: List[str]) -> List:
nonlocal containers
try:
response = self.get(url=f"/catalog/{location_id}")

# Check if current folder should be included
if (
response.get("entityType")
== DremioEntityContainerType.FOLDER.value.lower()
):
folder_name = entity_path[-1]
folder_path = entity_path[:-1]

if self.should_include_container(folder_path, folder_name):
containers.append(
{
"id": location_id,
"name": folder_name,
"path": folder_path,
"container_type": DremioEntityContainerType.FOLDER,
}
)

# Recursively process child containers
for container in response.get("children", []):
if (
container.get("type")
== DremioEntityContainerType.CONTAINER.value
):
traverse_path(container.get("id"), container.get("path"))

except Exception as exc:
logging.info(
"Location {} contains no tables or views. Skipping...".format(
location_id
)
)
self.report.warning(
message="Failed to get tables or views",
context=f"{location_id}",
exc=exc,
)

return containers

return traverse_path(location_id=resource_id, entity_path=path)
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class DremioToDataHubSourceTypeMapping:
"SNOWFLAKE": "snowflake",
"SYNAPSE": "mssql",
"TERADATA": "teradata",
"VERTICA": "vertica",
}

DATABASE_SOURCE_TYPES = {
Expand All @@ -52,6 +53,7 @@ class DremioToDataHubSourceTypeMapping:
"SNOWFLAKE",
"SYNAPSE",
"TERADATA",
"VERTICA",
}

FILE_OBJECT_STORAGE_TYPES = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,27 @@ class DremioSourceReport(
):
num_containers_failed: int = 0
num_datasets_failed: int = 0
containers_scanned: int = 0
containers_filtered: int = 0

def report_upstream_latency(self, start_time: datetime, end_time: datetime) -> None:
# recording total combined latency is not very useful, keeping this method as a placeholder
# for future implementation of min / max / percentiles etc.
pass

def report_container_scanned(self, name: str) -> None:
"""
Record that a container was successfully scanned
"""
self.containers_scanned += 1

def report_container_filtered(self, container_name: str) -> None:
"""
Record that a container was filtered out
"""
self.containers_filtered += 1
self.report_dropped(container_name)

def report_entity_scanned(self, name: str, ent_type: str = "View") -> None:
"""
Entity could be a view or a table
Expand Down
Loading
Loading