From 2e72c37107200a65efcbeda4b60b1c4a69e7979a Mon Sep 17 00:00:00 2001 From: Junda Yang Date: Tue, 23 Apr 2019 16:21:47 -0700 Subject: [PATCH] index presto views (#25) * index presto views * fix test * address comment * remove alphabetica sorting * bump up version --- .../presto_view_metadata_extractor.py | 116 ++++++++++++++++++ .../databuilder/models/table_metadata.py | 23 ++-- databuilder/setup.py | 2 +- .../test_presto_view_metadata_extractor.py | 92 ++++++++++++++ .../tests/unit/models/test_table_metadata.py | 2 +- 5 files changed, 225 insertions(+), 10 deletions(-) create mode 100644 databuilder/databuilder/extractor/presto_view_metadata_extractor.py create mode 100644 databuilder/tests/unit/extractor/test_presto_view_metadata_extractor.py diff --git a/databuilder/databuilder/extractor/presto_view_metadata_extractor.py b/databuilder/databuilder/extractor/presto_view_metadata_extractor.py new file mode 100644 index 0000000000..1201e438d5 --- /dev/null +++ b/databuilder/databuilder/extractor/presto_view_metadata_extractor.py @@ -0,0 +1,116 @@ +import base64 +import json +import logging + +from pyhocon import ConfigFactory, ConfigTree # noqa: F401 +from typing import Iterator, Union, Dict, Any # noqa: F401 + +from databuilder import Scoped +from databuilder.extractor.base_extractor import Extractor +from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor +from databuilder.models.table_metadata import TableMetadata, ColumnMetadata + + +LOGGER = logging.getLogger(__name__) + + +class PrestoViewMetadataExtractor(Extractor): + """ + Extracts Presto View and column metadata from underlying meta store database using SQLAlchemyExtractor + PrestoViewMetadataExtractor does not require a separate table model but just reuse the existing TableMetadata + """ + # SQL statement to extract View metadata + # {where_clause_suffix} could be used to filter schemas + SQL_STATEMENT = """ + SELECT t.TBL_ID, d.NAME as schema_name, t.TBL_NAME name, t.TBL_TYPE, t.VIEW_ORIGINAL_TEXT as view_original_text + FROM TBLS t + JOIN DBS d ON t.DB_ID = d.DB_ID + WHERE t.VIEW_EXPANDED_TEXT = '/* Presto View */' + {where_clause_suffix} + ORDER BY t.TBL_ID desc; + """ + + # Presto View data prefix and suffix definition: + # https://github.com/prestodb/presto/blob/43bd519052ba4c56ff1f4fc807075637ab5f4f10/presto-hive/src/main/java/com/facebook/presto/hive/HiveUtil.java#L153-L154 + PRESTO_VIEW_PREFIX = '/* Presto View: ' + PRESTO_VIEW_SUFFIX = ' */' + + # CONFIG KEYS + WHERE_CLAUSE_SUFFIX_KEY = 'where_clause_suffix' + CLUSTER_KEY = 'cluster' + + DEFAULT_CONFIG = ConfigFactory.from_dict({WHERE_CLAUSE_SUFFIX_KEY: ' ', + CLUSTER_KEY: 'gold'}) + + def init(self, conf): + # type: (ConfigTree) -> None + conf = conf.with_fallback(PrestoViewMetadataExtractor.DEFAULT_CONFIG) + self._cluster = '{}'.format(conf.get_string(PrestoViewMetadataExtractor.CLUSTER_KEY)) + + self.sql_stmt = PrestoViewMetadataExtractor.SQL_STATEMENT.format( + where_clause_suffix=conf.get_string(PrestoViewMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY)) + + LOGGER.info('SQL for hive metastore: {}'.format(self.sql_stmt)) + + self._alchemy_extractor = SQLAlchemyExtractor() + sql_alch_conf = Scoped.get_scoped_conf(conf, self._alchemy_extractor.get_scope())\ + .with_fallback(ConfigFactory.from_dict({SQLAlchemyExtractor.EXTRACT_SQL: self.sql_stmt})) + + self._alchemy_extractor.init(sql_alch_conf) + self._extract_iter = None # type: Union[None, Iterator] + + def extract(self): + # type: () -> Union[TableMetadata, None] + if not self._extract_iter: + self._extract_iter = self._get_extract_iter() + try: + return next(self._extract_iter) + except StopIteration: + return None + + def get_scope(self): + # type: () -> str + return 'extractor.presto_view_metadata' + + def _get_extract_iter(self): + # type: () -> Iterator[TableMetadata] + """ + Using itertools.groupby and raw level iterator, it groups to table and yields TableMetadata + :return: + """ + row = self._alchemy_extractor.extract() + while row: + columns = self._get_column_metadata(row['view_original_text']) + yield TableMetadata(database='presto', + cluster=self._cluster, + schema_name=row['schema_name'], + name=row['name'], + description=None, + columns=columns, + is_view=True) + row = self._alchemy_extractor.extract() + + def _get_column_metadata(self, view_original_text): + # type: (str) -> List[ColumnMetadata] + """ + Get Column Metadata from VIEW_ORIGINAL_TEXT from TBLS table for Presto Views. + Columns are sorted the same way as they appear in Presto Create View SQL. + :param view_original_text: + :return: + """ + # remove encoded Presto View data prefix and suffix + encoded_view_info = ( + view_original_text. + split(PrestoViewMetadataExtractor.PRESTO_VIEW_PREFIX, 1)[-1]. + rsplit(PrestoViewMetadataExtractor.PRESTO_VIEW_SUFFIX, 1)[0] + ) + + # view_original_text is b64 encoded: + # https://github.com/prestodb/presto/blob/43bd519052ba4c56ff1f4fc807075637ab5f4f10/presto-hive/src/main/java/com/facebook/presto/hive/HiveUtil.java#L602-L605 + decoded_view_info = base64.b64decode(encoded_view_info) + columns = json.loads(decoded_view_info).get('columns') + + return [ColumnMetadata(name=column['name'], + description=None, + col_type=column['type'], + sort_order=i) for i, column in enumerate(columns)] diff --git a/databuilder/databuilder/models/table_metadata.py b/databuilder/databuilder/models/table_metadata.py index 119f74741c..c9f35b1d7e 100644 --- a/databuilder/databuilder/models/table_metadata.py +++ b/databuilder/databuilder/models/table_metadata.py @@ -62,10 +62,13 @@ class TableMetadata(Neo4jCsvSerializable): These are being created here as it does not make much sense to have different extraction to produce this. As database, cluster, schema would be very repititive with low cardinality, it will perform de-dupe so that publisher won't need to publish same nodes, relationships. + + This class can be used for both table and view metadata. If it is a View, is_view=True should be passed in. """ TABLE_NODE_LABEL = 'Table' TABLE_KEY_FORMAT = '{db}://{cluster}.{schema}/{tbl}' TABLE_NAME = 'name' + IS_VIEW = 'is_view' TABLE_DESCRIPTION = 'description' TABLE_DESCRIPTION_FORMAT = '{db}://{cluster}.{schema}/{tbl}/_description' @@ -100,7 +103,8 @@ def __init__(self, schema_name, # type: str name, # type: str description, # type: Union[str, None] - columns=None # type: Iterable[ColumnMetadata] + columns=None, # type: Iterable[ColumnMetadata] + is_view=False, # type: bool ): # type: (...) -> None """ @@ -118,17 +122,19 @@ def __init__(self, self.name = name self.description = description self.columns = columns if columns else [] + self.is_view = is_view self._node_iterator = self._create_next_node() self._relation_iterator = self._create_next_relation() def __repr__(self): # type: () -> str - return 'TableMetadata({!r}, {!r}, {!r}, {!r}, {!r}, {!r})'.format(self.database, - self.cluster, - self.schema_name, - self.name, - self.description, - self.columns) + return 'TableMetadata({!r}, {!r}, {!r}, {!r}, {!r}, {!r}, {!r})'.format(self.database, + self.cluster, + self.schema_name, + self.name, + self.description, + self.columns, + self.is_view) def _get_table_key(self): # type: () -> str @@ -186,7 +192,8 @@ def _create_next_node(self): # type: () -> Iterator[Any] yield {NODE_LABEL: TableMetadata.TABLE_NODE_LABEL, NODE_KEY: self._get_table_key(), - TableMetadata.TABLE_NAME: self.name} + TableMetadata.TABLE_NAME: self.name, + TableMetadata.IS_VIEW: self.is_view} if self.description: yield {NODE_LABEL: DESCRIPTION_NODE_LABEL, diff --git a/databuilder/setup.py b/databuilder/setup.py index b26b5cf423..2e2ef3718f 100644 --- a/databuilder/setup.py +++ b/databuilder/setup.py @@ -1,7 +1,7 @@ from setuptools import setup, find_packages -__version__ = '1.0.9' +__version__ = '1.0.10' setup( diff --git a/databuilder/tests/unit/extractor/test_presto_view_metadata_extractor.py b/databuilder/tests/unit/extractor/test_presto_view_metadata_extractor.py new file mode 100644 index 0000000000..c31d89eae5 --- /dev/null +++ b/databuilder/tests/unit/extractor/test_presto_view_metadata_extractor.py @@ -0,0 +1,92 @@ +import base64 +import json +import logging +import unittest + +from mock import patch, MagicMock +from pyhocon import ConfigFactory + +from databuilder.extractor.presto_view_metadata_extractor import PrestoViewMetadataExtractor +from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor +from databuilder.models.table_metadata import TableMetadata, ColumnMetadata + + +class TestPrestoViewMetadataExtractor(unittest.TestCase): + def setUp(self): + # type: () -> None + logging.basicConfig(level=logging.INFO) + + config_dict = { + 'extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING): + 'TEST_CONNECTION' + } + self.conf = ConfigFactory.from_dict(config_dict) + + def test_extraction_with_empty_result(self): + # type: () -> None + """ + Test Extraction with empty result from query + """ + with patch.object(SQLAlchemyExtractor, '_get_connection'): + extractor = PrestoViewMetadataExtractor() + extractor.init(self.conf) + + results = extractor.extract() + self.assertEqual(results, None) + + def test_extraction_with_multiple_views(self): + # type: () -> None + with patch.object(SQLAlchemyExtractor, '_get_connection') as mock_connection: + connection = MagicMock() + mock_connection.return_value = connection + sql_execute = MagicMock() + connection.execute = sql_execute + + columns1 = {'columns': [{'name': 'xyz', 'type': 'varchar'}, + {'name': 'xyy', 'type': 'double'}, + {'name': 'aaa', 'type': 'int'}, + {'name': 'ab', 'type': 'varchar'}]} + + columns2 = {'columns': [{'name': 'xyy', 'type': 'varchar'}, + {'name': 'ab', 'type': 'double'}, + {'name': 'aaa', 'type': 'int'}, + {'name': 'xyz', 'type': 'varchar'}]} + + sql_execute.return_value = [ + {'tbl_id': 2, + 'schema_name': 'test_schema2', + 'name': 'test_view2', + 'tbl_type': 'virtual_view', + 'view_original_text': base64.b64encode(json.dumps(columns2).encode()).decode("utf-8")}, + {'tbl_id': 1, + 'schema_name': 'test_schema1', + 'name': 'test_view1', + 'tbl_type': 'virtual_view', + 'view_original_text': base64.b64encode(json.dumps(columns1).encode()).decode("utf-8")}, + ] + + extractor = PrestoViewMetadataExtractor() + extractor.init(self.conf) + actual_first_view = extractor.extract() + expected_first_view = TableMetadata('presto', 'gold', 'test_schema2', 'test_view2', None, + [ColumnMetadata(u'xyy', None, u'varchar', 0), + ColumnMetadata(u'ab', None, u'double', 1), + ColumnMetadata(u'aaa', None, u'int', 2), + ColumnMetadata(u'xyz', None, u'varchar', 3)], + True) + self.assertEqual(expected_first_view.__repr__(), actual_first_view.__repr__()) + + actual_second_view = extractor.extract() + expected_second_view = TableMetadata('presto', 'gold', 'test_schema1', 'test_view1', None, + [ColumnMetadata(u'xyz', None, u'varchar', 0), + ColumnMetadata(u'xyy', None, u'double', 1), + ColumnMetadata(u'aaa', None, u'int', 2), + ColumnMetadata(u'ab', None, u'varchar', 3)], + True) + self.assertEqual(expected_second_view.__repr__(), actual_second_view.__repr__()) + + self.assertIsNone(extractor.extract()) + + +if __name__ == '__main__': + unittest.main() diff --git a/databuilder/tests/unit/models/test_table_metadata.py b/databuilder/tests/unit/models/test_table_metadata.py index 8449d83fb4..d2fdee0d2a 100644 --- a/databuilder/tests/unit/models/test_table_metadata.py +++ b/databuilder/tests/unit/models/test_table_metadata.py @@ -24,7 +24,7 @@ def setUp(self): ColumnMetadata('ds', None, 'varchar', 5)]) self.expected_nodes_deduped = [ - {'name': 'test_table1', 'KEY': 'hive://gold.test_schema1/test_table1', 'LABEL': 'Table'}, + {'name': 'test_table1', 'KEY': 'hive://gold.test_schema1/test_table1', 'LABEL': 'Table', 'is_view': False}, {'description': 'test_table1', 'KEY': 'hive://gold.test_schema1/test_table1/_description', 'LABEL': 'Description'}, {'sort_order': 0, 'type': 'bigint', 'name': 'test_id1',