-
Notifications
You must be signed in to change notification settings - Fork 20
/
main.py
123 lines (86 loc) · 3.71 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
''' Main module. '''
import os
import sys
import time
from typing import Tuple
import pyodbc
from faker import Faker
CONNECTION_STRING: str = 'DRIVER={{ODBC Driver 18 for SQL Server}};SERVER={server};DATABASE={database};UID={username};PWD={password};TrustServerCertificate=yes;'
RECORD_COUNT: int = 10000
SQL_INSERT_DATA: str = 'INSERT INTO users (id, name, city) VALUES (?, ?, ?);'
def main():
''' App entrypoint. '''
# Wait for mssql database server to fully spawn.
time.sleep(20)
source_db_conn, dest_db_conn = connect_to_databases()
with source_db_conn, dest_db_conn:
source_db_cur = source_db_conn.cursor()
dest_db_cur = dest_db_conn.cursor()
with source_db_cur, dest_db_cur:
print('Create users table and populate data in source database.')
source_db_cur.execute(extract_sql('sql/source_db_setup.sql'))
populate_data(RECORD_COUNT, source_db_cur)
source_db_conn.commit()
print('Create users table in destination database.')
dest_db_cur.execute(extract_sql('sql/dest_db_setup.sql'))
dest_db_conn.commit()
transfer_data(source_db_cur, dest_db_cur, dest_db_conn)
print('Display users data of destination database.')
display_users(dest_db_cur)
def connect_to_databases() -> Tuple:
''' Extracts databases credentials from the environment and returns their connections. '''
source_db_conn = get_connection(
os.environ['SOURCE_DB_HOST'],
os.environ['SOURCE_DB_NAME'],
os.environ['SOURCE_DB_USERNAME'],
os.environ['SOURCE_DB_PASSWORD']
)
dest_db_conn = get_connection(
os.environ['DESTINATION_DB_HOST'],
os.environ['DESTINATION_DB_NAME'],
os.environ['DESTINATION_DB_USERNAME'],
os.environ['DESTINATION_DB_PASSWORD']
)
return source_db_conn, dest_db_conn
def get_connection(db_host: str, db_name: str, db_username: str, db_password: str) -> pyodbc.Connection:
''' Create database connection and returns connection. '''
connection_str = CONNECTION_STRING.format(
server=db_host,
database=db_name,
username=db_username,
password=db_password
)
return pyodbc.connect(connection_str, timeout=300)
def populate_data(RECORD_COUNT: int, db_cursor: pyodbc.Cursor):
''' Generate user data. '''
fake = Faker()
row = lambda n: (n + 1, fake.format('name'), fake.format('city'))
for i in range(RECORD_COUNT):
db_cursor.execute(SQL_INSERT_DATA, row(i))
def extract_sql(file: str) -> str:
''' Reads an SQL file and returns it's contents. '''
with open(file, 'rt') as file:
contents = file.read()
return contents
def transfer_data(source_db_cursor: pyodbc.Cursor, dest_db_cursor: pyodbc.Cursor, dest_db_conn: pyodbc.Connection):
''' Extracts users data from source database and stores them in destination database. '''
print('Extracting users data from source database.')
source_db_cursor.execute('SELECT * FROM users')
rows = source_db_cursor.fetchall()
print('Transferring users data to destination database.')
for row in rows:
dest_db_cursor.execute(SQL_INSERT_DATA, (row.id, row.name, row.city))
print(f'{len(rows)} rows transferred\n')
dest_db_conn.commit()
def display_users(db_cursor: pyodbc.Cursor):
''' Displays users data. '''
db_cursor.execute('SELECT * FROM users ORDER BY id')
transferred_data = db_cursor.fetchall()
template = '{:<5} {:<15} {:<10}'
print(template.format('ID', 'NAME', 'CITY'))
print('-' * 32)
for row in transferred_data:
print(template.format(row.id, row.name, row.city))
if __name__ == '__main__':
main()
sys.exit(0)