Skip to content

Commit

Permalink
fix(ingestion/dremio): Ignore filtered containers in schema allowdeny…
Browse files Browse the repository at this point in the history
… pattern (#11959)

Co-authored-by: Mayuri Nehate <[email protected]>
  • Loading branch information
acrylJonny and mayurinehate authored Dec 13, 2024
1 parent 7c1d3b0 commit 06edf23
Show file tree
Hide file tree
Showing 10 changed files with 12,457 additions and 1,031 deletions.
279 changes: 193 additions & 86 deletions metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_api.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import concurrent.futures
import json
import logging
import re
import warnings
from collections import defaultdict
from enum import Enum
Expand Down Expand Up @@ -609,32 +610,6 @@ def extract_all_queries(self) -> List[Dict[str, Any]]:

return self.execute_query(query=jobs_query)

def get_source_by_id(self, source_id: str) -> Optional[Dict]:
"""
Fetch source details by ID.
"""
response = self.get(
url=f"/source/{source_id}",
)
return response if response else None

def get_source_for_dataset(self, schema: str, dataset: str) -> Optional[Dict]:
"""
Get source information for a dataset given its schema and name.
"""
dataset_id = self.get_dataset_id(schema, dataset)
if not dataset_id:
return None

catalog_entry = self.get(
url=f"/catalog/{dataset_id}",
)
if not catalog_entry or "path" not in catalog_entry:
return None

source_id = catalog_entry["path"][0]
return self.get_source_by_id(source_id)

def get_tags_for_resource(self, resource_id: str) -> Optional[List[str]]:
"""
Get Dremio tags for a given resource_id.
Expand Down Expand Up @@ -673,55 +648,119 @@ def get_description_for_resource(self, resource_id: str) -> Optional[str]:
)
return None

def get_containers_for_location(
self, resource_id: str, path: List[str]
) -> List[Dict[str, str]]:
containers = []
def _check_pattern_match(
self,
pattern: str,
paths: List[str],
allow_prefix: bool = True,
) -> bool:
"""
Helper method to check if a pattern matches any of the paths.
Handles hierarchical matching where each level is matched independently.
Also handles prefix matching for partial paths.
"""
if pattern == ".*":
return True

def traverse_path(location_id: str, entity_path: List[str]) -> List:
nonlocal containers
try:
response = self.get(url=f"/catalog/{location_id}")
if (
response.get("entityType")
== DremioEntityContainerType.FOLDER.value.lower()
):
containers.append(
{
"id": location_id,
"name": entity_path[-1],
"path": entity_path[:-1],
"container_type": DremioEntityContainerType.FOLDER,
}
)
# Convert the pattern to regex with proper anchoring
regex_pattern = pattern
if pattern.startswith("^"):
# Already has start anchor
regex_pattern = pattern.replace(".", r"\.") # Escape dots
regex_pattern = regex_pattern.replace(
r"\.*", ".*"
) # Convert .* to wildcard
else:
# Add start anchor and handle dots
regex_pattern = "^" + pattern.replace(".", r"\.").replace(r"\.*", ".*")

# Handle end matching
if not pattern.endswith(".*"):
if pattern.endswith("$"):
# Keep explicit end anchor
pass
elif not allow_prefix:
# Add end anchor for exact matching
regex_pattern = regex_pattern + "$"

for path in paths:
if re.match(regex_pattern, path, re.IGNORECASE):
return True

for container in response.get("children", []):
if (
container.get("type")
== DremioEntityContainerType.CONTAINER.value
):
traverse_path(container.get("id"), container.get("path"))
return False

except Exception as exc:
logging.info(
"Location {} contains no tables or views. Skipping...".format(id)
)
self.report.warning(
message="Failed to get tables or views",
context=f"{id}",
exc=exc,
)
def should_include_container(self, path: List[str], name: str) -> bool:
"""
Helper method to check if a container should be included based on schema patterns.
Used by both get_all_containers and get_containers_for_location.
"""
path_components = path + [name] if path else [name]
full_path = ".".join(path_components)

return containers
# Default allow everything case
if self.allow_schema_pattern == [".*"] and not self.deny_schema_pattern:
self.report.report_container_scanned(full_path)
return True

return traverse_path(location_id=resource_id, entity_path=path)
# Check deny patterns first
if self.deny_schema_pattern:
for pattern in self.deny_schema_pattern:
if self._check_pattern_match(
pattern=pattern,
paths=[full_path],
allow_prefix=False,
):
self.report.report_container_filtered(full_path)
return False

# Check allow patterns
for pattern in self.allow_schema_pattern:
# For patterns with wildcards, check if this path is a parent of the pattern
if "*" in pattern:
pattern_parts = pattern.split(".")
path_parts = path_components

# If pattern has exact same number of parts, check each component
if len(pattern_parts) == len(path_parts):
matches = True
for p_part, c_part in zip(pattern_parts, path_parts):
if p_part != "*" and p_part.lower() != c_part.lower():
matches = False
break
if matches:
self.report.report_container_scanned(full_path)
return True
# Otherwise check if current path is prefix match
else:
# Remove the trailing wildcard if present
if pattern_parts[-1] == "*":
pattern_parts = pattern_parts[:-1]

for i in range(len(path_parts)):
current_path = ".".join(path_parts[: i + 1])
pattern_prefix = ".".join(pattern_parts[: i + 1])

if pattern_prefix.startswith(current_path):
self.report.report_container_scanned(full_path)
return True

# Direct pattern matching
if self._check_pattern_match(
pattern=pattern,
paths=[full_path],
allow_prefix=True,
):
self.report.report_container_scanned(full_path)
return True

self.report.report_container_filtered(full_path)
return False

def get_all_containers(self):
"""
Query the Dremio sources API and return source information.
Query the Dremio sources API and return filtered source information.
"""
containers = []

response = self.get(url="/catalog")

def process_source(source):
Expand All @@ -731,34 +770,41 @@ def process_source(source):
)

source_config = source_resp.get("config", {})
if source_config.get("database"):
db = source_config.get("database")
else:
db = source_config.get("databaseName", "")

return {
"id": source.get("id"),
"name": source.get("path")[0],
"path": [],
"container_type": DremioEntityContainerType.SOURCE,
"source_type": source_resp.get("type"),
"root_path": source_config.get("rootPath"),
"database_name": db,
}
db = source_config.get(
"database", source_config.get("databaseName", "")
)

if self.should_include_container([], source.get("path")[0]):
return {
"id": source.get("id"),
"name": source.get("path")[0],
"path": [],
"container_type": DremioEntityContainerType.SOURCE,
"source_type": source_resp.get("type"),
"root_path": source_config.get("rootPath"),
"database_name": db,
}
else:
return {
"id": source.get("id"),
"name": source.get("path")[0],
"path": [],
"container_type": DremioEntityContainerType.SPACE,
}
if self.should_include_container([], source.get("path")[0]):
return {
"id": source.get("id"),
"name": source.get("path")[0],
"path": [],
"container_type": DremioEntityContainerType.SPACE,
}
return None

def process_source_and_containers(source):
container = process_source(source)
if not container:
return []

# Get sub-containers
sub_containers = self.get_containers_for_location(
resource_id=container.get("id"),
path=[container.get("name")],
)

return [container] + sub_containers

# Use ThreadPoolExecutor to parallelize the processing of sources
Expand All @@ -771,7 +817,16 @@ def process_source_and_containers(source):
}

for future in concurrent.futures.as_completed(future_to_source):
containers.extend(future.result())
source = future_to_source[future]
try:
containers.extend(future.result())
except Exception as exc:
logger.error(f"Error processing source: {exc}")
self.report.warning(
message="Failed to process source",
context=f"{source}",
exc=exc,
)

return containers

Expand All @@ -785,3 +840,55 @@ def get_context_for_vds(self, resource_id: str) -> str:
)
else:
return ""

def get_containers_for_location(
self, resource_id: str, path: List[str]
) -> List[Dict[str, str]]:
containers = []

def traverse_path(location_id: str, entity_path: List[str]) -> List:
nonlocal containers
try:
response = self.get(url=f"/catalog/{location_id}")

# Check if current folder should be included
if (
response.get("entityType")
== DremioEntityContainerType.FOLDER.value.lower()
):
folder_name = entity_path[-1]
folder_path = entity_path[:-1]

if self.should_include_container(folder_path, folder_name):
containers.append(
{
"id": location_id,
"name": folder_name,
"path": folder_path,
"container_type": DremioEntityContainerType.FOLDER,
}
)

# Recursively process child containers
for container in response.get("children", []):
if (
container.get("type")
== DremioEntityContainerType.CONTAINER.value
):
traverse_path(container.get("id"), container.get("path"))

except Exception as exc:
logging.info(
"Location {} contains no tables or views. Skipping...".format(
location_id
)
)
self.report.warning(
message="Failed to get tables or views",
context=f"{location_id}",
exc=exc,
)

return containers

return traverse_path(location_id=resource_id, entity_path=path)
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class DremioToDataHubSourceTypeMapping:
"SNOWFLAKE": "snowflake",
"SYNAPSE": "mssql",
"TERADATA": "teradata",
"VERTICA": "vertica",
}

DATABASE_SOURCE_TYPES = {
Expand All @@ -52,6 +53,7 @@ class DremioToDataHubSourceTypeMapping:
"SNOWFLAKE",
"SYNAPSE",
"TERADATA",
"VERTICA",
}

FILE_OBJECT_STORAGE_TYPES = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,27 @@ class DremioSourceReport(
):
num_containers_failed: int = 0
num_datasets_failed: int = 0
containers_scanned: int = 0
containers_filtered: int = 0

def report_upstream_latency(self, start_time: datetime, end_time: datetime) -> None:
# recording total combined latency is not very useful, keeping this method as a placeholder
# for future implementation of min / max / percentiles etc.
pass

def report_container_scanned(self, name: str) -> None:
"""
Record that a container was successfully scanned
"""
self.containers_scanned += 1

def report_container_filtered(self, container_name: str) -> None:
"""
Record that a container was filtered out
"""
self.containers_filtered += 1
self.report_dropped(container_name)

def report_entity_scanned(self, name: str, ent_type: str = "View") -> None:
"""
Entity could be a view or a table
Expand Down
Loading

0 comments on commit 06edf23

Please sign in to comment.