Skip to content

Commit

Permalink
fix(ingest): run sqllineage in process by default (datahub-project#11650
Browse files Browse the repository at this point in the history
)
  • Loading branch information
hsheth2 authored Oct 17, 2024
1 parent b814469 commit 8b42ac8
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 2,883 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,6 @@ class LookMLSourceConfig(
description="List of regex patterns for LookML views to include in the extraction.",
)
parse_table_names_from_sql: bool = Field(True, description="See note below.")
sql_parser: str = Field(
"datahub.utilities.sql_parser.DefaultSQLParser", description="See note below."
)
api: Optional[LookerAPIConfig]
project_name: Optional[str] = Field(
None,
Expand Down
23 changes: 12 additions & 11 deletions metadata-ingestion/src/datahub/ingestion/source/redash.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import math
import sys
from dataclasses import dataclass, field
from multiprocessing.pool import ThreadPool
from typing import Dict, Iterable, List, Optional, Set, Type

import dateutil.parser as dp
Expand Down Expand Up @@ -43,6 +42,7 @@
from datahub.utilities.lossy_collections import LossyDict, LossyList
from datahub.utilities.perf_timer import PerfTimer
from datahub.utilities.sql_parser import SQLParser
from datahub.utilities.threaded_iterator_executor import ThreadedIteratorExecutor

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
Expand Down Expand Up @@ -646,11 +646,11 @@ def _emit_dashboard_mces(self) -> Iterable[MetadataWorkUnit]:
self.report.total_dashboards = total_dashboards
self.report.max_page_dashboards = max_page

dash_exec_pool = ThreadPool(self.config.parallelism)
for response in dash_exec_pool.imap_unordered(
self._process_dashboard_response, range(1, max_page + 1)
):
yield from response
yield from ThreadedIteratorExecutor.process(
self._process_dashboard_response,
[(page,) for page in range(1, max_page + 1)],
max_workers=self.config.parallelism,
)

def _get_chart_type_from_viz_data(self, viz_data: Dict) -> str:
"""
Expand Down Expand Up @@ -769,11 +769,12 @@ def _emit_chart_mces(self) -> Iterable[MetadataWorkUnit]:
logger.info(f"/api/queries total count {total_queries} and max page {max_page}")
self.report.total_queries = total_queries
self.report.max_page_queries = max_page
chart_exec_pool = ThreadPool(self.config.parallelism)
for response in chart_exec_pool.imap_unordered(
self._process_query_response, range(1, max_page + 1)
):
yield from response

yield from ThreadedIteratorExecutor.process(
self._process_query_response,
[(page,) for page in range(1, max_page + 1)],
max_workers=self.config.parallelism,
)

def add_config_to_report(self) -> None:
self.report.api_page_limit = self.config.api_page_limit
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/src/datahub/utilities/sql_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class SqlLineageSQLParser(SQLParser):
def __init__(
self,
sql_query: str,
use_external_process: bool = True,
use_external_process: bool = False,
use_raw_names: bool = False,
) -> None:
super().__init__(sql_query, use_external_process)
Expand Down
Loading

0 comments on commit 8b42ac8

Please sign in to comment.