Skip to content

Commit

Permalink
index presto views (#25)
Browse files Browse the repository at this point in the history
* index presto views

* fix test

* address comment

* remove alphabetica sorting

* bump up version
  • Loading branch information
youngyjd authored and Hans Adriaans committed Jun 30, 2022
1 parent af9580d commit 2e72c37
Show file tree
Hide file tree
Showing 5 changed files with 225 additions and 10 deletions.
116 changes: 116 additions & 0 deletions databuilder/databuilder/extractor/presto_view_metadata_extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import base64
import json
import logging

from pyhocon import ConfigFactory, ConfigTree # noqa: F401
from typing import Iterator, Union, Dict, Any # noqa: F401

from databuilder import Scoped
from databuilder.extractor.base_extractor import Extractor
from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor
from databuilder.models.table_metadata import TableMetadata, ColumnMetadata


LOGGER = logging.getLogger(__name__)


class PrestoViewMetadataExtractor(Extractor):
"""
Extracts Presto View and column metadata from underlying meta store database using SQLAlchemyExtractor
PrestoViewMetadataExtractor does not require a separate table model but just reuse the existing TableMetadata
"""
# SQL statement to extract View metadata
# {where_clause_suffix} could be used to filter schemas
SQL_STATEMENT = """
SELECT t.TBL_ID, d.NAME as schema_name, t.TBL_NAME name, t.TBL_TYPE, t.VIEW_ORIGINAL_TEXT as view_original_text
FROM TBLS t
JOIN DBS d ON t.DB_ID = d.DB_ID
WHERE t.VIEW_EXPANDED_TEXT = '/* Presto View */'
{where_clause_suffix}
ORDER BY t.TBL_ID desc;
"""

# Presto View data prefix and suffix definition:
# https://github.com/prestodb/presto/blob/43bd519052ba4c56ff1f4fc807075637ab5f4f10/presto-hive/src/main/java/com/facebook/presto/hive/HiveUtil.java#L153-L154
PRESTO_VIEW_PREFIX = '/* Presto View: '
PRESTO_VIEW_SUFFIX = ' */'

# CONFIG KEYS
WHERE_CLAUSE_SUFFIX_KEY = 'where_clause_suffix'
CLUSTER_KEY = 'cluster'

DEFAULT_CONFIG = ConfigFactory.from_dict({WHERE_CLAUSE_SUFFIX_KEY: ' ',
CLUSTER_KEY: 'gold'})

def init(self, conf):
# type: (ConfigTree) -> None
conf = conf.with_fallback(PrestoViewMetadataExtractor.DEFAULT_CONFIG)
self._cluster = '{}'.format(conf.get_string(PrestoViewMetadataExtractor.CLUSTER_KEY))

self.sql_stmt = PrestoViewMetadataExtractor.SQL_STATEMENT.format(
where_clause_suffix=conf.get_string(PrestoViewMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY))

LOGGER.info('SQL for hive metastore: {}'.format(self.sql_stmt))

self._alchemy_extractor = SQLAlchemyExtractor()
sql_alch_conf = Scoped.get_scoped_conf(conf, self._alchemy_extractor.get_scope())\
.with_fallback(ConfigFactory.from_dict({SQLAlchemyExtractor.EXTRACT_SQL: self.sql_stmt}))

self._alchemy_extractor.init(sql_alch_conf)
self._extract_iter = None # type: Union[None, Iterator]

def extract(self):
# type: () -> Union[TableMetadata, None]
if not self._extract_iter:
self._extract_iter = self._get_extract_iter()
try:
return next(self._extract_iter)
except StopIteration:
return None

def get_scope(self):
# type: () -> str
return 'extractor.presto_view_metadata'

def _get_extract_iter(self):
# type: () -> Iterator[TableMetadata]
"""
Using itertools.groupby and raw level iterator, it groups to table and yields TableMetadata
:return:
"""
row = self._alchemy_extractor.extract()
while row:
columns = self._get_column_metadata(row['view_original_text'])
yield TableMetadata(database='presto',
cluster=self._cluster,
schema_name=row['schema_name'],
name=row['name'],
description=None,
columns=columns,
is_view=True)
row = self._alchemy_extractor.extract()

def _get_column_metadata(self, view_original_text):
# type: (str) -> List[ColumnMetadata]
"""
Get Column Metadata from VIEW_ORIGINAL_TEXT from TBLS table for Presto Views.
Columns are sorted the same way as they appear in Presto Create View SQL.
:param view_original_text:
:return:
"""
# remove encoded Presto View data prefix and suffix
encoded_view_info = (
view_original_text.
split(PrestoViewMetadataExtractor.PRESTO_VIEW_PREFIX, 1)[-1].
rsplit(PrestoViewMetadataExtractor.PRESTO_VIEW_SUFFIX, 1)[0]
)

# view_original_text is b64 encoded:
# https://github.com/prestodb/presto/blob/43bd519052ba4c56ff1f4fc807075637ab5f4f10/presto-hive/src/main/java/com/facebook/presto/hive/HiveUtil.java#L602-L605
decoded_view_info = base64.b64decode(encoded_view_info)
columns = json.loads(decoded_view_info).get('columns')

return [ColumnMetadata(name=column['name'],
description=None,
col_type=column['type'],
sort_order=i) for i, column in enumerate(columns)]
23 changes: 15 additions & 8 deletions databuilder/databuilder/models/table_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,13 @@ class TableMetadata(Neo4jCsvSerializable):
These are being created here as it does not make much sense to have different extraction to produce this. As
database, cluster, schema would be very repititive with low cardinality, it will perform de-dupe so that publisher
won't need to publish same nodes, relationships.
This class can be used for both table and view metadata. If it is a View, is_view=True should be passed in.
"""
TABLE_NODE_LABEL = 'Table'
TABLE_KEY_FORMAT = '{db}://{cluster}.{schema}/{tbl}'
TABLE_NAME = 'name'
IS_VIEW = 'is_view'

TABLE_DESCRIPTION = 'description'
TABLE_DESCRIPTION_FORMAT = '{db}://{cluster}.{schema}/{tbl}/_description'
Expand Down Expand Up @@ -100,7 +103,8 @@ def __init__(self,
schema_name, # type: str
name, # type: str
description, # type: Union[str, None]
columns=None # type: Iterable[ColumnMetadata]
columns=None, # type: Iterable[ColumnMetadata]
is_view=False, # type: bool
):
# type: (...) -> None
"""
Expand All @@ -118,17 +122,19 @@ def __init__(self,
self.name = name
self.description = description
self.columns = columns if columns else []
self.is_view = is_view
self._node_iterator = self._create_next_node()
self._relation_iterator = self._create_next_relation()

def __repr__(self):
# type: () -> str
return 'TableMetadata({!r}, {!r}, {!r}, {!r}, {!r}, {!r})'.format(self.database,
self.cluster,
self.schema_name,
self.name,
self.description,
self.columns)
return 'TableMetadata({!r}, {!r}, {!r}, {!r}, {!r}, {!r}, {!r})'.format(self.database,
self.cluster,
self.schema_name,
self.name,
self.description,
self.columns,
self.is_view)

def _get_table_key(self):
# type: () -> str
Expand Down Expand Up @@ -186,7 +192,8 @@ def _create_next_node(self):
# type: () -> Iterator[Any]
yield {NODE_LABEL: TableMetadata.TABLE_NODE_LABEL,
NODE_KEY: self._get_table_key(),
TableMetadata.TABLE_NAME: self.name}
TableMetadata.TABLE_NAME: self.name,
TableMetadata.IS_VIEW: self.is_view}

if self.description:
yield {NODE_LABEL: DESCRIPTION_NODE_LABEL,
Expand Down
2 changes: 1 addition & 1 deletion databuilder/setup.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from setuptools import setup, find_packages


__version__ = '1.0.9'
__version__ = '1.0.10'


setup(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import base64
import json
import logging
import unittest

from mock import patch, MagicMock
from pyhocon import ConfigFactory

from databuilder.extractor.presto_view_metadata_extractor import PrestoViewMetadataExtractor
from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor
from databuilder.models.table_metadata import TableMetadata, ColumnMetadata


class TestPrestoViewMetadataExtractor(unittest.TestCase):
def setUp(self):
# type: () -> None
logging.basicConfig(level=logging.INFO)

config_dict = {
'extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING):
'TEST_CONNECTION'
}
self.conf = ConfigFactory.from_dict(config_dict)

def test_extraction_with_empty_result(self):
# type: () -> None
"""
Test Extraction with empty result from query
"""
with patch.object(SQLAlchemyExtractor, '_get_connection'):
extractor = PrestoViewMetadataExtractor()
extractor.init(self.conf)

results = extractor.extract()
self.assertEqual(results, None)

def test_extraction_with_multiple_views(self):
# type: () -> None
with patch.object(SQLAlchemyExtractor, '_get_connection') as mock_connection:
connection = MagicMock()
mock_connection.return_value = connection
sql_execute = MagicMock()
connection.execute = sql_execute

columns1 = {'columns': [{'name': 'xyz', 'type': 'varchar'},
{'name': 'xyy', 'type': 'double'},
{'name': 'aaa', 'type': 'int'},
{'name': 'ab', 'type': 'varchar'}]}

columns2 = {'columns': [{'name': 'xyy', 'type': 'varchar'},
{'name': 'ab', 'type': 'double'},
{'name': 'aaa', 'type': 'int'},
{'name': 'xyz', 'type': 'varchar'}]}

sql_execute.return_value = [
{'tbl_id': 2,
'schema_name': 'test_schema2',
'name': 'test_view2',
'tbl_type': 'virtual_view',
'view_original_text': base64.b64encode(json.dumps(columns2).encode()).decode("utf-8")},
{'tbl_id': 1,
'schema_name': 'test_schema1',
'name': 'test_view1',
'tbl_type': 'virtual_view',
'view_original_text': base64.b64encode(json.dumps(columns1).encode()).decode("utf-8")},
]

extractor = PrestoViewMetadataExtractor()
extractor.init(self.conf)
actual_first_view = extractor.extract()
expected_first_view = TableMetadata('presto', 'gold', 'test_schema2', 'test_view2', None,
[ColumnMetadata(u'xyy', None, u'varchar', 0),
ColumnMetadata(u'ab', None, u'double', 1),
ColumnMetadata(u'aaa', None, u'int', 2),
ColumnMetadata(u'xyz', None, u'varchar', 3)],
True)
self.assertEqual(expected_first_view.__repr__(), actual_first_view.__repr__())

actual_second_view = extractor.extract()
expected_second_view = TableMetadata('presto', 'gold', 'test_schema1', 'test_view1', None,
[ColumnMetadata(u'xyz', None, u'varchar', 0),
ColumnMetadata(u'xyy', None, u'double', 1),
ColumnMetadata(u'aaa', None, u'int', 2),
ColumnMetadata(u'ab', None, u'varchar', 3)],
True)
self.assertEqual(expected_second_view.__repr__(), actual_second_view.__repr__())

self.assertIsNone(extractor.extract())


if __name__ == '__main__':
unittest.main()
2 changes: 1 addition & 1 deletion databuilder/tests/unit/models/test_table_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def setUp(self):
ColumnMetadata('ds', None, 'varchar', 5)])

self.expected_nodes_deduped = [
{'name': 'test_table1', 'KEY': 'hive://gold.test_schema1/test_table1', 'LABEL': 'Table'},
{'name': 'test_table1', 'KEY': 'hive://gold.test_schema1/test_table1', 'LABEL': 'Table', 'is_view': False},
{'description': 'test_table1', 'KEY': 'hive://gold.test_schema1/test_table1/_description',
'LABEL': 'Description'},
{'sort_order': 0, 'type': 'bigint', 'name': 'test_id1',
Expand Down

0 comments on commit 2e72c37

Please sign in to comment.