forked from amundsen-io/amundsen
-
Notifications
You must be signed in to change notification settings - Fork 0
/
snowflake_sample_dag.py
181 lines (150 loc) · 7.62 KB
/
snowflake_sample_dag.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0
"""
This is a sample Airflow DAG that demos extracting metadata from Snowflake and loading it into
Neo4j and Elasticsearch
"""
import textwrap
import uuid
from datetime import datetime, timedelta
from airflow import DAG # noqa
from airflow import macros # noqa
from airflow.operators.python_operator import PythonOperator # noqa
from elasticsearch import Elasticsearch
from pyhocon import ConfigFactory
from databuilder.extractor.neo4j_extractor import Neo4jExtractor
from databuilder.extractor.neo4j_search_data_extractor import Neo4jSearchDataExtractor
from databuilder.extractor.snowflake_metadata_extractor import SnowflakeMetadataExtractor
from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor
from databuilder.job.job import DefaultJob
from databuilder.loader.file_system_elasticsearch_json_loader import FSElasticsearchJSONLoader
from databuilder.loader.file_system_neo4j_csv_loader import FsNeo4jCSVLoader
from databuilder.publisher import neo4j_csv_publisher
from databuilder.publisher.elasticsearch_publisher import ElasticsearchPublisher
from databuilder.publisher.neo4j_csv_publisher import Neo4jCsvPublisher
from databuilder.task.task import DefaultTask
from databuilder.transformer.base_transformer import NoopTransformer
dag_args = {
'concurrency': 10,
# One dagrun at a time
'max_active_runs': 1,
# 4AM, 4PM PST
'schedule_interval': '0 11 * * *',
'catchup': False
}
default_args = {
'owner': 'amundsen',
'start_date': datetime(2020, 8, 19),
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 3,
'priority_weight': 10,
'retry_delay': timedelta(minutes=5),
'execution_timeout': timedelta(minutes=120)
}
# NEO4J cluster endpoints
NEO4J_ENDPOINT = 'bolt://neo4j:7687'
neo4j_endpoint = NEO4J_ENDPOINT
neo4j_user = 'neo4j'
neo4j_password = 'test'
es = Elasticsearch([
{'host': 'elasticsearch'},
])
# TODO: user provides a list of schema for indexing
SUPPORTED_SCHEMAS = ['public']
SUPPORTED_SCHEMA_SQL_IN_CLAUSE = "('{schemas}')".format(schemas="', '".join(SUPPORTED_SCHEMAS))
# SNOWFLAKE CONFIGs
SNOWFLAKE_DATABASE_KEY = 'YOUR_SNOWFLAKE_DB_NAME'
# todo: connection string needs to change
def connection_string():
# Refer this doc: https://docs.snowflake.com/en/user-guide/sqlalchemy.html#connection-parameters
# for supported connection parameters and configurations
user = 'username'
password = 'password'
account = 'YourSnowflakeAccountHere'
# specify a warehouse to connect to.
warehouse = 'yourwarehouse'
return f'snowflake://{user}:{password}@{account}/{SNOWFLAKE_DATABASE_KEY}?warehouse={warehouse}'
def create_snowflake_table_metadata_job():
"""
Launches databuilder job that extracts table and column metadata from Snowflake database and publishes
to Neo4j.
"""
where_clause_suffix = textwrap.dedent("""
WHERE c.TABLE_SCHEMA IN {schemas}
AND lower(c.COLUMN_NAME) not like 'dw_%';
""").format(schemas=SUPPORTED_SCHEMA_SQL_IN_CLAUSE)
tmp_folder = '/var/tmp/amundsen/table_metadata'
node_files_folder = f'{tmp_folder}/nodes/'
relationship_files_folder = f'{tmp_folder}/relationships/'
job_config = ConfigFactory.from_dict({
f'extractor.snowflake.extractor.sqlalchemy.{SQLAlchemyExtractor.CONN_STRING}': connection_string(),
f'extractor.snowflake.{SnowflakeMetadataExtractor.SNOWFLAKE_DATABASE_KEY}': SNOWFLAKE_DATABASE_KEY,
f'extractor.snowflake.{SnowflakeMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY}': where_clause_suffix,
f'loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.NODE_DIR_PATH}': node_files_folder,
f'loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.RELATION_DIR_PATH}': relationship_files_folder,
f'publisher.neo4j.{neo4j_csv_publisher.NODE_FILES_DIR}': node_files_folder,
f'publisher.neo4j.{neo4j_csv_publisher.RELATION_FILES_DIR}': relationship_files_folder,
f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_END_POINT_KEY}': neo4j_endpoint,
f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_USER}': neo4j_user,
f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_PASSWORD}': neo4j_password,
f'publisher.neo4j.{neo4j_csv_publisher.JOB_PUBLISH_TAG}': 'some_unique_tag' # TO-DO unique tag must be added
})
job = DefaultJob(conf=job_config,
task=DefaultTask(extractor=SnowflakeMetadataExtractor(), loader=FsNeo4jCSVLoader()),
publisher=Neo4jCsvPublisher())
job.launch()
def create_snowflake_es_publisher_job():
"""
Launches databuilder job that extracts data from Neo4J backend and pushes them as search documents
to Elasticsearch index
"""
# loader saves data to this location and publisher reads it from here
extracted_search_data_path = '/var/tmp/amundsen/search_data.json'
task = DefaultTask(loader=FSElasticsearchJSONLoader(),
extractor=Neo4jSearchDataExtractor(),
transformer=NoopTransformer())
# elastic search client instance
elasticsearch_client = es
# unique name of new index in Elasticsearch
elasticsearch_new_index_key = 'tables' + str(uuid.uuid4())
# related to mapping type from /databuilder/publisher/elasticsearch_publisher.py#L38
elasticsearch_new_index_key_type = 'table'
# alias for Elasticsearch used in amundsensearchlibrary/search_service/config.py as an index
elasticsearch_index_alias = 'table_search_index'
job_config = ConfigFactory.from_dict({
f'extractor.search_data.extractor.neo4j.{Neo4jExtractor.GRAPH_URL_CONFIG_KEY}': neo4j_endpoint,
f'extractor.search_data.extractor.neo4j.{Neo4jExtractor.MODEL_CLASS_CONFIG_KEY}':
'databuilder.models.table_elasticsearch_document.TableESDocument',
f'extractor.search_data.extractor.neo4j.{Neo4jExtractor.NEO4J_AUTH_USER}': neo4j_user,
f'extractor.search_data.extractor.neo4j.{Neo4jExtractor.NEO4J_AUTH_PW}': neo4j_password,
f'loader.filesystem.elasticsearch.{FSElasticsearchJSONLoader.FILE_PATH_CONFIG_KEY}': extracted_search_data_path,
f'loader.filesystem.elasticsearch.{FSElasticsearchJSONLoader.FILE_MODE_CONFIG_KEY}': 'w',
f'publisher.elasticsearch.{ElasticsearchPublisher.FILE_PATH_CONFIG_KEY}': extracted_search_data_path,
f'publisher.elasticsearch.{ElasticsearchPublisher.FILE_MODE_CONFIG_KEY}': 'r',
f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_CLIENT_CONFIG_KEY}':
elasticsearch_client,
f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_NEW_INDEX_CONFIG_KEY}':
elasticsearch_new_index_key,
f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_DOC_TYPE_CONFIG_KEY}':
elasticsearch_new_index_key_type,
f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_ALIAS_CONFIG_KEY}':
elasticsearch_index_alias
})
job = DefaultJob(conf=job_config,
task=task,
publisher=ElasticsearchPublisher())
job.launch()
with DAG('amundsen_databuilder', default_args=default_args, **dag_args) as dag:
snowflake_table_metadata_job = PythonOperator(
task_id='snowflake_table_metadata_extract_job',
python_callable=create_snowflake_table_metadata_job
)
snowflake_es_publisher_job = PythonOperator(
task_id='snowflake_es_publisher_job',
python_callable=create_snowflake_es_publisher_job
)
# elastic search update run after table metadata has been updated
snowflake_table_metadata_job >> snowflake_es_publisher_job