Skip to content

Commit

Permalink
Implements Table Detail and Popular Tables APIs (#15)
Browse files Browse the repository at this point in the history
  • Loading branch information
verdan authored and Hans Adriaans committed Jun 30, 2022
1 parent 2640350 commit 0192708
Show file tree
Hide file tree
Showing 3 changed files with 353 additions and 4 deletions.
58 changes: 57 additions & 1 deletion metadata/docs/proxy/atlas_proxy.md
Original file line number Diff line number Diff line change
@@ -1 +1,57 @@
[TBD]
Create a new atlas client instance. (update the host and credentials information)
```python
from atlasclient.client import Atlas
client = Atlas(host='localhost', port=21000, username='admin', password='admin')
```

### Create a Super Type Entity
Since Atlas stores most of the metadata about tables, databases, columns etc.,
we need to have a super Entity Type, that can be used to filter out the Tables only.

[Atlas Proxy](https://github.com/lyft/amundsenmetadatalibrary/blob/master/metadata_service/proxy/atlas_proxy.py) uses
`Table` as super entity type.
```python
TABLE_ENTITY = 'Table'
```

Create a new type, defined above via `TABLE_ENTITY` using the script below.
```python
typedef_dict = {
"entityDefs": [
{
"name": TABLE_ENTITY,
"superTypes": ["DataSet"],
}
]
}

client.typedefs.create(data=typedef_dict)
```

### Add required fields
We need to add some extra fields to atlas in order to get all the information needed for the amundsen frontend.
Adding those extra attributes in the super type entity definition would be handy to keep them in once place.

[TBD - How to add attributes definition]

### Assign superType to entity definitions
Assign newly created TABLE_ENTITY entity as super type to the entity definitions you want to behave like tables.
in the code snippet below, `'hive_table' and 'rdbms_table'` would be affected.
```python
# Below are the entity which would behave like table entities for Amundsen Atlas Proxy
atlas_tables = ['hive_table', 'rdbms_table']
entities_to_update = []
for t in client.typedefs:
for e in t.entityDefs:
if e.name in atlas_tables:
superTypes = e.superTypes # Get a property first to inflate the relational objects
ent_dict = e._data
ent_dict["superTypes"] = superTypes
ent_dict["superTypes"].append(TABLE_ENTITY)
entities_to_update.append(ent_dict)

typedef_dict = {
"entityDefs": entities_to_update
}
client.typedefs.update(data=typedef_dict)
```
118 changes: 115 additions & 3 deletions metadata/metadata_service/proxy/atlas_proxy.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,28 @@
import logging
import re
from typing import Union, List, Dict, Any

from atlasclient.client import Atlas
from atlasclient.exceptions import BadRequest

from metadata_service.entity.popular_table import PopularTable
from metadata_service.entity.user_detail import User as UserEntity
from metadata_service.entity.table_detail import Table
from metadata_service.entity.user_detail import User as UserEntity
from metadata_service.exception import NotFoundException
from metadata_service.proxy import BaseProxy
from metadata_service.util import UserResourceRel

LOGGER = logging.getLogger(__name__)


class AtlasProxy(BaseProxy):
"""
Atlas Proxy client for the amundsen metadata
"""
TABLE_ENTITY = 'Table'
DATASET_ENTITY = 'DataSet'
DB_KEY = 'db'
NAME_KEY = 'qualifiedName'

def __init__(self, *,
host: str,
Expand All @@ -24,11 +34,81 @@ def __init__(self, *,
"""
self._driver = Atlas(host=host, port=port, username=user, password=password)

def _get_ids_from_basic_search(self, *, params: Dict) -> List[str]:
"""
Search for the entities based on the params provided as argument.
:param params: the dictionary of parameters to be used for the basic search
:return: The flat list of GUIDs of entities founds based on the params.
"""
ids = list()
search_results = self._driver.search_basic(**params)
for result in search_results:
# result.entities would directly be accessible after below PR
# Fix: https://github.com/jpoullet2000/atlasclient/pull/59
# noinspection PyProtectedMember
for entity in result._data.get('entities', list()):
ids.append(entity['guid'])
return ids

def _extract_info_from_uri(self, *, table_uri: str) -> Dict:
"""
Extracts the table information from table_uri coming from frontend.
:param table_uri:
:return: Dictionary object, containing following information:
db: Database Name
cluster: Cluster information
schema: Schema Name
name: Unique Table Identifier
"""
pattern = re.compile(r"""
^ (?P<db>.*?)
:\/\/
(?P<cluster>.*)
\.
(?P<schema>.*?)
\/
(?P<name>.*?)
$
""", re.X)
result = pattern.match(table_uri)
return result.groupdict() if result else dict()

def get_user_detail(self, *, user_id: str) -> Union[UserEntity, None]:
pass

def get_table(self, *, table_uri: str) -> Table:
pass
"""
Gathers all the information needed for the Table Detail Page.
It tries to get the table information from
:param table_uri:
:return:
"""
table_info = self._extract_info_from_uri(table_uri=table_uri)

try:
# The reason to use the DATASET_ENTITY here instead of TABLE_ENTITY
# is to access the older data (if any) which was generated before
# introducing the TABLE_ENTITY in atlas.
entity = self._driver.entity_unique_attribute(
self.DATASET_ENTITY,
qualifiedName=table_info.get('name')).entity
except Exception as ex:
LOGGER.exception('Table not found. {}'.format(str(ex)))
raise NotFoundException('Table URI( {table_uri} ) does not exist'.format(table_uri=table_uri))

try:
table = Table(database=table_info['db'],
cluster=table_info['cluster'],
schema=table_info['schema'],
name=table_info['name'],
columns=entity['relationshipAttributes']['columns'],
last_updated_timestamp=entity['updateTime'])

return table
except KeyError as ex:
LOGGER.exception('Error while accessing table information. {}'.format(str(ex)))
raise BadRequest('Some of the required attributes are missing in : ( {table_uri} )'.
format(table_uri=table_uri))

def delete_owner(self, *, table_uri: str, owner: str) -> None:
pass
Expand Down Expand Up @@ -64,7 +144,39 @@ def get_column_description(self, *,

def get_popular_tables(self, *,
num_entries: int = 10) -> List[PopularTable]:
return []
"""
FixMe: For now it simply returns ALL the tables available,
Need to generate the formula for popular tables only.
:param num_entries:
:return:
"""
popular_tables = list()
params = {'typeName': self.TABLE_ENTITY, 'excludeDeletedEntities': True}
guids = self._get_ids_from_basic_search(params=params)

entity_collection = self._driver.entity_bulk(guid=guids)
for _collection in entity_collection:
for entity in _collection.entities:
attrs = entity.attributes
# At the moment, relationship attributes are not available in the entity
# and hence, we need to make another request to get details of database
# Fix: https://github.com/jpoullet2000/atlasclient/pull/60
database = attrs.get(self.DB_KEY)
if database:
db_attrs = self._driver.entity_guid(database['guid']).entity['attributes']
db_name = db_attrs.get(self.NAME_KEY)
db_cluster = db_attrs.get('cluster')
else:
db_name = ''
db_cluster = ''

popular_table = PopularTable(database=db_name,
cluster=db_cluster,
schema=attrs.get('schema'),
name=attrs.get(self.NAME_KEY),
description=attrs.get('description'))
popular_tables.append(popular_table)
return popular_tables

def get_latest_updated_ts(self) -> int:
pass
Expand Down
181 changes: 181 additions & 0 deletions metadata/tests/unit/proxy/test_atlas_proxy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
import unittest

from atlasclient.exceptions import BadRequest
from mock import patch, MagicMock

from metadata_service.entity.popular_table import PopularTable
from metadata_service.entity.table_detail import (Table)
from metadata_service.exception import NotFoundException
from metadata_service.proxy.atlas_proxy import AtlasProxy


class TestAtlasProxy(unittest.TestCase):

def setUp(self):
with patch('metadata_service.proxy.atlas_proxy.Atlas'):
self.proxy = AtlasProxy(host='DOES_NOT_MATTER', port=0000)
self.proxy._driver = MagicMock()

self.db = 'TEST_DB'
self.cluster = 'TEST_CLUSTER'
self.schema = 'TEST_SCHEMA'
self.name = 'TEST_TABLE'
self.table_uri = f'{self.db}://{self.cluster}.{self.schema}/{self.name}'

entity1_relationships = {
'relationshipAttributes': {
'columns': []
}
}
self.entity1 = {
'guid': '1',
'updateTime': 123,
'attributes': {
'qualifiedName': 'Table1_Qualified',
'schema': self.schema,
'name': 'Table1',
'db': {
'guid': '-100',
'qualifiedName': self.db
}
}
}
self.entity1.update(entity1_relationships)
self.entity1['attributes'].update(entity1_relationships)

entity2_relationships = {
'relationshipAttributes': {
'columns': []
}
}
self.entity2 = {
'guid': '2',
'updateTime': 234,
'attributes': {
'qualifiedName': 'Table2_Qualified',
'schema': self.schema,
'name': 'Table1',
'db': {
'guid': '-100',
'qualifiedName': self.db
}
}
}
self.entity2.update(entity2_relationships)
self.entity2['attributes'].update(entity2_relationships)

self.entities = {
'entities': [
self.entity1,
self.entity2,
]
}

def test_extract_table_uri_info(self):
table_info = self.proxy._extract_info_from_uri(table_uri=self.table_uri)
self.assertDictEqual(table_info, {
'db': self.db,
'cluster': self.cluster,
'schema': self.schema,
'name': self.name
})

def test_get_ids_from_basic_search(self):
basic_search_response = MagicMock()
basic_search_response._data = self.entities

self.proxy._driver.search_basic = MagicMock(return_value=[basic_search_response])
response = self.proxy._get_ids_from_basic_search(params={})
expected = ['1', '2']
self.assertListEqual(response, expected)

def test_get_table(self):
unique_attr_response = MagicMock()
unique_attr_response.entity = self.entity1

self.proxy._driver.entity_unique_attribute = MagicMock(return_value=unique_attr_response)
response = self.proxy.get_table(table_uri=self.table_uri)

expected = Table(database=self.db,
cluster=self.cluster,
schema=self.schema,
name=self.name,
columns=self.entity1['relationshipAttributes']['columns'],
last_updated_timestamp=self.entity1['updateTime'])
self.assertEqual(str(expected), str(response))

def test_get_table_not_found(self):
with self.assertRaises(NotFoundException):
self.proxy._driver.entity_unique_attribute = MagicMock(side_effect=Exception('Boom!'))
self.proxy.get_table(table_uri=self.table_uri)

def test_get_table_missing_info(self):
with self.assertRaises(BadRequest):
local_entity = self.entity1
local_entity.pop('relationshipAttributes')
unique_attr_response = MagicMock()
unique_attr_response.entity = local_entity

self.proxy._driver.entity_unique_attribute = MagicMock(return_value=unique_attr_response)
self.proxy.get_table(table_uri=self.table_uri)

@patch.object(AtlasProxy, '_get_ids_from_basic_search')
def test_get_popular_tables(self, mock_basic_search):
entity1 = MagicMock()
entity1.attributes = self.entity1['attributes']

entity2 = MagicMock()
entity2.attributes = self.entity2['attributes']

bulk_ent_collection = MagicMock()
bulk_ent_collection.entities = [entity1, entity2]

self.proxy._driver.entity_bulk = MagicMock(return_value=[bulk_ent_collection])

db_entity = MagicMock()
db_entity.entity = {'attributes': {
'qualifiedName': self.db,
'cluster': self.cluster
}}

self.proxy._driver.entity_guid = MagicMock(return_value=db_entity)

response = self.proxy.get_popular_tables(num_entries=2)

expected = [
PopularTable(database=self.db, cluster=self.cluster, schema=self.schema,
name=self.entity1['attributes']['qualifiedName']),
PopularTable(database=self.db, cluster=self.cluster, schema=self.schema,
name=self.entity2['attributes']['qualifiedName']),
]

self.assertEqual(response.__repr__(), expected.__repr__())

@patch.object(AtlasProxy, '_get_ids_from_basic_search')
def test_get_popular_tables_without_db(self, mock_basic_search):
attrs_ent1 = self.entity1['attributes']
attrs_ent1.pop('db')
entity1 = MagicMock()
entity1.attributes = attrs_ent1

attrs_ent2 = self.entity2['attributes']
attrs_ent2.pop('db')
entity2 = MagicMock()
entity2.attributes = attrs_ent2

bulk_ent_collection = MagicMock()
bulk_ent_collection.entities = [entity1, entity2]

self.proxy._driver.entity_bulk = MagicMock(return_value=[bulk_ent_collection])
response = self.proxy.get_popular_tables(num_entries=2)

expected = [
PopularTable(database='', cluster='', schema=self.schema, name=self.entity1['attributes']['qualifiedName']),
PopularTable(database='', cluster='', schema=self.schema, name=self.entity2['attributes']['qualifiedName']),
]

self.assertEqual(response.__repr__(), expected.__repr__())


if __name__ == '__main__':
unittest.main()

0 comments on commit 0192708

Please sign in to comment.