-
Notifications
You must be signed in to change notification settings - Fork 0
/
integration2.py
190 lines (146 loc) · 7.48 KB
/
integration2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
import subprocess
import os
import boto3
import psycopg2
"""
The idea of this script is to find the respective database instances using Boto3, and then find the
respective databases in the instance and finally find the respective tables in each database and do a iterative
export and dump one table at a time to prevent overloading of memory.
This process can be expedited by parallel processing but I am unsure of how to do so yet. Would figure out a way
if this becomes a pertinent issue.
Upload the file downloaded to s3 to the correct respective folders and buckets based on company
name. It is important to note that the files with the same name would be replaced. This would
help in not saving redundant files but might not be useful if we want to version.
S3 files would be named as follows:
s3://{BucketName-(Data Lake)}/{InstanceName}/{DBName}/{TableName}.csv
"""
# This method allows me to connect to export csv files for each table.
# This method does not require the maintenance of a JSON file at all, just different AWS credentials
# needed for different servers if different users have different access to the databases.
DATALAKE_NAME = 'hubble-datalake'
## Class Methods (Should encapsulate all s3 and rds methods to make the work easier to undestand) ----------------------------------
class S3Helper:
def __init__(self):
self.client = boto3.client("s3")
self.s3 = boto3.resource('s3')
def create_folder(self, path, location):
"""
The idea of this function is to encapsulate all kinds of folder creation in s3
1. Create bucket (if bucket does not exist)
2. Create folders
"""
path_arr = path.rstrip("/").split("/")
# If the path given is only the bucket name.
if len(path_arr) == 1:
return _check_bucket(location)
parent = path_arr[0]
self._check_bucket(parent)
bucket = self.s3.Bucket(parent)
status = bucket.put_object(Key="/".join(path_arr[1:]) + "/")
return status
def upload(self, file, bucketname, pathname):
self.s3.meta.client.upload_file(file, bucketname, pathname)
def _check_bucket(self, location):
# Check if data lake exists
bucketlist = self.client.list_buckets()['Buckets']
print (bucketlist)
bucketnames = list(map(lambda x: x['Name'], bucketlist))
print (bucketnames)
datalake = list(filter(lambda x: x.lower() == DATALAKE_NAME, bucketnames))
print (datalake)
# We can create a datalake for each region as well, but for now we don't need to do that yet.
# datalake_name = DATALAKE_NAME + "-" + location
if not datalake:
# Create a bucket based on given region
self.client.create_bucket(Bucket = DATALAKE_NAME
)
return True
class RDSHelper():
def __init__(self, *args, **kwargs):
self.client = boto3.client("rds")
def describe_db_instances(self, filters=None):
if not filters:
dbs = self.client.describe_db_instances()['DBInstances']
else:
dbs = self.client.describe_db_instances(Filters=filters)
return dbs
# Actual Program -----------------------------------------------
def run(instance_filters=None, database_filters=None, table_filters=None):
"""
-instance_filters (dict): for now it can be anything we are going to use to filter the instance:
1. db-cluster-id 2. db-instance-id
A filter name and value pair that is used to return a more specific list of results from a describe operation.
Filters can be used to match a set of resources by specific criteria, such as IDs.
The filters supported by a describe operation are documented with the describe operation.
E.g. [{"Name" :"tag:keyname", "Values":[""] }] - Must explicitly specify "Names" and "Values" pair.
-database_filters (list): simply only append the database names to this list so we only access those databases. By default,
it will access all
-table_filters (list): simply only append table names to this list so we only export those tables. By default it will export all.
"""
rds = RDSHelper()
dbs = rds.describe_db_instances(filters=instance_filters)
for db in dbs:
instance = db['DBInstanceIdentifier']
dbuser = db['MasterUsername']
endpoint = db['Endpoint']
host = endpoint['Address']
port = endpoint['Port']
location = str(db['DBInstanceArn'].split(':')[3])
print('instance:', instance)
print('dbuser:', dbuser)
print('endpoint:', endpoint)
print('host:', host)
print('port:', port)
print('location:', location)
con = psycopg2.connect(dbname='postgres', host=host, port=port, user=dbuser)
cur = con.cursor()
def extract_name_query(title, qry):
print('%s' % (title))
cur.execute(qry)
results = cur.fetchall()
result_names = list(map(lambda x: x[0], results))
return result_names
# List all available databases in the same instance
database_names = extract_name_query('listing databases', 'SELECT * FROM pg_database')
print(database_names)
# Filtering available databases
default_databases = ['postgres', 'rdsadmin', 'template1', 'template0']
database_names = list(filter(lambda x: x not in default_databases, database_names))
if database_filters:
database_names = list(filter(lambda x: x in database_filters, database_names))
for database_name in database_names:
# Change database connection
print("Accessing", database_name, "...")
con = psycopg2.connect(dbname=database_name, host=host, port=port, user=dbuser)
cur = con.cursor()
# List all available tables in the same instance
table_query = "SELECT table_name FROM information_schema.tables WHERE table_schema='public' AND table_type='BASE TABLE'"
table_names = extract_name_query('listing tables', table_query)
print(table_names)
# Filtering available tables
if table_filters:
table_names = list(filter(lambda x: x in table_names, table_names))
for table_name in table_names:
# Save individual tables to CSV first - as we are sending one table at a time, we can del the csv files
# as soon as we have uploaded them
print("Accessing", table_name, "...")
export_query = "COPY " + table_name + " TO STDOUT WITH CSV HEADER"
csvname = table_name + ".csv"
with open(csvname, "w") as csvfile:
cur.copy_expert(export_query, csvfile)
folder_path = ("%s/%s/%s") % (DATALAKE_NAME, instance, database_name)
s3 = S3Helper()
s3.create_folder(folder_path, location)
table_path = ("%s/%s/%s.csv") % (instance, database_name, table_name)
s3_path = ("s3://%s/%s") % (DATALAKE_NAME, table_path)
#Upload the file to the respective bucket
s3.upload(csvname, DATALAKE_NAME, table_path)
print('FILE PUT AT:', s3_path)
#Deleting file after use
os.remove(csvname)
print('File Deleted')
if __name__ == "__main__":
# instance_tags = {}
# correct_databases = []
# correct_tables = []
run()