Skip to content

Commit

Permalink
allow more flexibility for method 2 to handle all kinds of export req…
Browse files Browse the repository at this point in the history
…uests
  • Loading branch information
jindongyang94 committed Jul 16, 2019
1 parent 3531f94 commit fefe092
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 40 deletions.
4 changes: 2 additions & 2 deletions credentials
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[default]
aws_access_key_id = AKIASK4R3RDM5YB5PZMN
aws_secret_access_key = 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
aws_access_key_id =
aws_secret_access_key =
94 changes: 56 additions & 38 deletions integration2.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,20 @@ def describe_db_instances(self, filters=None):


# Actual Program -----------------------------------------------
def run(instance_filters=None):
def run(instance_filters=None, database_filters=None, table_filters=None):
"""
instance_filters (dict): for now it can be anything we are going to use to filter the instance:
-instance_filters (dict): for now it can be anything we are going to use to filter the instance:
1. db-cluster-id 2. db-instance-id
A filter name and value pair that is used to return a more specific list of results from a describe operation.
Filters can be used to match a set of resources by specific criteria, such as IDs.
The filters supported by a describe operation are documented with the describe operation.
E.g. [{"Name" :"tag:keyname", "Values":[""] }] - Must explicitly specify "Names" and "Values" pair.
-database_filters (list): simply only append the database names to this list so we only access those databases. By default,
it will access all
-table_filters (list): simply only append table names to this list so we only export those tables. By default it will export all.
"""
rds = RDSHelper()
dbs = rds.describe_db_instances(filters=instance_filters)
Expand Down Expand Up @@ -125,47 +130,60 @@ def extract_name_query(title, qry):
# List all available databases in the same instance
database_names = extract_name_query('listing databases', 'SELECT * FROM pg_database')
print(database_names)


# Filtering available databases
default_databases = ['postgres', 'rdsadmin', 'template1', 'template0']
database_names = list(filter(lambda x: x not in default_databases, database_names))
if database_filters:
database_names = list(filter(lambda x: x in database_filters, database_names))

for database_name in database_names:
if database_name.lower() not in ['postgres', 'rdsadmin', 'template1', 'template0']:
# Change database connection
print("Accessing", database_name, "...")
con = psycopg2.connect(dbname=database_name, host=host, port=port, user=dbuser)
cur = con.cursor()

# List all available tables in the same instance
table_query = "SELECT table_name FROM information_schema.tables WHERE table_schema='public' AND table_type='BASE TABLE'"
table_names = extract_name_query('listing tables', table_query)
print(table_names)

for table_name in table_names:
# Save individual tables to CSV first - as we are sending one table at a time, we can del the csv files
# as soon as we have uploaded them
print("Accessing", table_name, "...")
export_query = "COPY " + table_name + " TO STDOUT WITH CSV HEADER"
csvname = table_name + ".csv"
with open(csvname, "w") as csvfile:
cur.copy_expert(export_query, csvfile)

folder_path = ("%s/%s/%s") % (DATALAKE_NAME, instance, database_name)

s3 = S3Helper()
s3.create_folder(folder_path, location)
table_path = ("%s/%s/%s.csv") % (instance, database_name, table_name)
s3_path = ("s3://%s/%s") % (DATALAKE_NAME, table_path)

#Upload the file to the respective bucket
s3.upload(csvname, DATALAKE_NAME, table_path)

print('FILE PUT AT:', s3_path)

#Deleting file after use
os.remove(csvname)
print('File Deleted')
# Change database connection
print("Accessing", database_name, "...")
con = psycopg2.connect(dbname=database_name, host=host, port=port, user=dbuser)
cur = con.cursor()

# List all available tables in the same instance
table_query = "SELECT table_name FROM information_schema.tables WHERE table_schema='public' AND table_type='BASE TABLE'"
table_names = extract_name_query('listing tables', table_query)
print(table_names)

# Filtering available tables
if table_filters:
table_names = list(filter(lambda x: x in table_names, table_names))

for table_name in table_names:
# Save individual tables to CSV first - as we are sending one table at a time, we can del the csv files
# as soon as we have uploaded them
print("Accessing", table_name, "...")
export_query = "COPY " + table_name + " TO STDOUT WITH CSV HEADER"
csvname = table_name + ".csv"
with open(csvname, "w") as csvfile:
cur.copy_expert(export_query, csvfile)

folder_path = ("%s/%s/%s") % (DATALAKE_NAME, instance, database_name)

s3 = S3Helper()
s3.create_folder(folder_path, location)
table_path = ("%s/%s/%s.csv") % (instance, database_name, table_name)
s3_path = ("s3://%s/%s") % (DATALAKE_NAME, table_path)

#Upload the file to the respective bucket
s3.upload(csvname, DATALAKE_NAME, table_path)

print('FILE PUT AT:', s3_path)

#Deleting file after use
os.remove(csvname)
print('File Deleted')



if __name__ == "__main__":
# instance_tags = {}
# correct_databases = []
# correct_tables = []

run()


Expand Down

0 comments on commit fefe092

Please sign in to comment.