-
Notifications
You must be signed in to change notification settings - Fork 7
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #2777 from GSA-TTS/main
- Loading branch information
Showing
33 changed files
with
1,336 additions
and
135 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,10 +1,64 @@ | ||
# Census Historical Migration | ||
# Census to FAC data migration | ||
|
||
## Overview | ||
|
||
This is implemented as a Django app to leverage existing management commands and settings. It includes Python and shell scripts to: | ||
|
||
* Load raw census data as CSV files into an S3 bucket | ||
* Create Postgres tables from these CSV files | ||
* Perform any data clean up required to create a table from a CSV file | ||
* Run the historic data migrator | ||
* Run the historic workbook generator | ||
|
||
## Infrastructure changes | ||
|
||
* Create a new S3 bucket in Cloud.gov spaces as well as in the local environment | ||
* Create a new Postgres instance both in CG and locally | ||
|
||
## Utilities | ||
|
||
* fac_s3.py - Uploads folders or files to an S3 bucket. | ||
|
||
```bash | ||
python manage.py fac_s3 fac-census-to-gsafac-s3 --upload --src census_historical_migration/data | ||
``` | ||
|
||
* csv_to_postgres.py - Inserts data into Postgres tables using the contents of the CSV files in the S3 bucket. The first row of each file is assumed to have the column names (we convert to lowercase). The name of the table is determined by examining the name of the file. The sample source files do not have delimters for empty fields at the end of a line - so we assume these are nulls. | ||
|
||
```bash | ||
python manage.py csv_to_postgres --folder data --chunksize 10000 | ||
python manage.py csv_to_postgres --clean True | ||
``` | ||
|
||
* models.py These correspond to the incoming CSV files | ||
* routers.py This tells django to use a different postgres instance. | ||
|
||
* data A folder that contains sample data that we can use for development. | ||
|
||
## Prerequisites | ||
|
||
* A Django app that reads the tables created here as unmanaged models and populates SF-SAC tables by creating workbooks, etc. to simulate a real submission | ||
|
||
## How to load test Census data into Postgres | ||
|
||
1. Download test Census data from https://drive.google.com/drive/folders/1TY-7yWsMd8DsVEXvwrEe_oWW1iR2sGoy into census_historical_migration/data folder. | ||
NOTE: Never check in the census_historical_migration/data folder into GitHub. | ||
|
||
2. In the FAC/backend folder, run the following to load CSV files from census_historical_migration/data folder into fac-census-to-gsafac-s3 bucket. | ||
```bash | ||
docker compose run web python manage.py fac_s3 fac-census-to-gsafac-s3 --upload --src census_historical_migration/data | ||
``` | ||
|
||
3. In the FAC/backend folder, run the following to read the CSV files from fac-census-to-gsafac-s3 bucket and load into Postgres. | ||
```bash | ||
docker compose run web python manage.py csv_to_postgres --folder data --chunksize 10000 | ||
``` | ||
|
||
### How to run the historic data migrator: | ||
``` | ||
docker compose run web python manage.py historic_data_migrator --email [email protected] \ | ||
--year 22 \ | ||
--dbkey 100010 | ||
--years 22 \ | ||
--dbkeys 100010 | ||
``` | ||
- The email address currently must be a User in the system. As this has only been run locally so far, it would often be a test account in my local sandbox env. | ||
- `year` and `dbkey` are optional. The script will use default values for these if they aren't provided. | ||
|
139 changes: 139 additions & 0 deletions
139
backend/census_historical_migration/management/commands/csv_to_postgres.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,139 @@ | ||
import logging | ||
import boto3 | ||
import pandas as pd | ||
|
||
|
||
from io import BytesIO | ||
from botocore.exceptions import ClientError | ||
from django.core.management.base import BaseCommand | ||
from django.conf import settings | ||
from django.apps import apps | ||
|
||
logger = logging.getLogger(__name__) | ||
logger.setLevel(logging.WARNING) | ||
census_to_gsafac_models = list( | ||
apps.get_app_config("census_historical_migration").get_models() | ||
) | ||
census_to_gsafac_model_names = [m._meta.model_name for m in census_to_gsafac_models] | ||
s3_client = boto3.client( | ||
"s3", | ||
aws_access_key_id=settings.AWS_PRIVATE_ACCESS_KEY_ID, | ||
aws_secret_access_key=settings.AWS_PRIVATE_SECRET_ACCESS_KEY, | ||
endpoint_url=settings.AWS_S3_ENDPOINT_URL, | ||
) | ||
census_to_gsafac_bucket_name = settings.AWS_CENSUS_TO_GSAFAC_BUCKET_NAME | ||
DELIMITER = "," | ||
|
||
|
||
class Command(BaseCommand): | ||
help = """ | ||
Populate Postgres database from csv files | ||
Usage: | ||
manage.py csv_to_postgres --folder <folder_name> --clean <True|False> | ||
""" | ||
|
||
def add_arguments(self, parser): | ||
parser.add_argument("--folder", help="S3 folder name (required)", type=str) | ||
parser.add_argument( | ||
"--clean", help="Clean the data (default: False)", type=bool, default=False | ||
) | ||
parser.add_argument( | ||
"--sample", | ||
help="Sample the data (default: False)", | ||
type=bool, | ||
default=False, | ||
) | ||
parser.add_argument("--load") | ||
parser.add_argument( | ||
"--chunksize", | ||
help="Chunk size for processing data (default: 10_000)", | ||
type=int, | ||
default=10_000, | ||
) | ||
|
||
def handle(self, *args, **options): | ||
folder = options.get("folder") | ||
if not folder: | ||
print("Please specify a folder name") | ||
return | ||
if options.get("clean"): | ||
self.delete_data() | ||
return | ||
if options.get("sample"): | ||
self.sample_data() | ||
return | ||
chunk_size = options.get("chunksize") | ||
self.process_csv_files(folder, chunk_size) | ||
|
||
def process_csv_files(self, folder, chunk_size): | ||
items = self.list_s3_objects(census_to_gsafac_bucket_name, folder) | ||
for item in items: | ||
if item["Key"].endswith("/"): | ||
continue | ||
model_name = self.get_model_name(item["Key"]) | ||
if model_name: | ||
model_index = census_to_gsafac_model_names.index(model_name) | ||
model_obj = census_to_gsafac_models[model_index] | ||
file = self.get_s3_object( | ||
census_to_gsafac_bucket_name, item["Key"], model_obj | ||
) | ||
if file: | ||
self.load_data(file, model_obj, chunk_size) | ||
|
||
self.display_row_counts(census_to_gsafac_models) | ||
|
||
def display_row_counts(self, models): | ||
for mdl in models: | ||
row_count = mdl.objects.all().count() | ||
print(f"{row_count} in ", mdl) | ||
|
||
def delete_data(self): | ||
for mdl in census_to_gsafac_models: | ||
print("Deleting ", mdl) | ||
mdl.objects.all().delete() | ||
|
||
def sample_data(self): | ||
for mdl in census_to_gsafac_models: | ||
print("Sampling ", mdl) | ||
rows = mdl.objects.all()[:1] | ||
for row in rows: | ||
for col in mdl._meta.fields: | ||
print(f"{col.name}: {getattr(row, col.name)}") | ||
|
||
def list_s3_objects(self, bucket_name, folder): | ||
return s3_client.list_objects(Bucket=bucket_name, Prefix=folder)["Contents"] | ||
|
||
def get_s3_object(self, bucket_name, key, model_obj): | ||
file = BytesIO() | ||
try: | ||
s3_client.download_fileobj(Bucket=bucket_name, Key=key, Fileobj=file) | ||
except ClientError: | ||
logger.error("Could not download {}".format(model_obj)) | ||
return None | ||
print(f"Obtained {model_obj} from S3") | ||
return file | ||
|
||
def get_model_name(self, name): | ||
print("Processing ", name) | ||
file_name = name.split("/")[-1].split(".")[0] | ||
for model_name in census_to_gsafac_model_names: | ||
if file_name.lower().startswith(model_name): | ||
print("model_name = ", model_name) | ||
return model_name | ||
print("Could not find a matching model for ", name) | ||
return None | ||
|
||
def load_data(self, file, model_obj, chunk_size): | ||
print("Starting load data to postgres") | ||
file.seek(0) | ||
rows_loaded = 0 | ||
for df in pd.read_csv(file, iterator=True, chunksize=chunk_size): | ||
# Each row is a dictionary. The columns are the | ||
# correct names for our model. So, this should be a | ||
# clean way to load the model from a row. | ||
for _, row in df.iterrows(): | ||
obj = model_obj(**row) | ||
obj.save() | ||
rows_loaded += df.shape[0] | ||
print(f"Loaded {rows_loaded} rows in ", model_obj) | ||
return None |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.