forked from bcgov/wps
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Undoes changes from bcgov#3623 No longer need to separate SFMS code as a service. Some minor ruff autoformatting and cspell additions.
- Loading branch information
Showing
12 changed files
with
63 additions
and
632 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,5 @@ | ||
""" Router for SFMS """ | ||
"""Router for SFMS""" | ||
|
||
import io | ||
import logging | ||
from datetime import datetime, date | ||
|
@@ -21,10 +22,11 @@ | |
prefix="/sfms", | ||
) | ||
|
||
SFMS_HOURLIES_PERMISSIONS = 'public-read' | ||
SFMS_HOURLIES_PERMISSIONS = "public-read" | ||
|
||
|
||
class FileLikeObject(io.IOBase): | ||
""" Very basic wrapper of the SpooledTemporaryFile to expose the file-like object interface. | ||
"""Very basic wrapper of the SpooledTemporaryFile to expose the file-like object interface. | ||
The aiobotocore library expects a file-like object, but we can't pass the SpooledTemporaryFile | ||
object directly to aiobotocore. aiobotocore looks for a "tell" method, which isn't present | ||
|
@@ -48,34 +50,16 @@ def seek(self, offset: int, whence: int = io.SEEK_SET): | |
|
||
|
||
def get_meta_data(request: Request) -> dict: | ||
""" Create the meta-data for the s3 object. | ||
"""Create the meta-data for the s3 object. | ||
# NOTE: No idea what timezone this is going to be. Is it UTC? Is it PST? Is it PDT? | ||
""" | ||
last_modified = datetime.fromisoformat(request.headers.get( | ||
'Last-modified')) | ||
create_time = datetime.fromisoformat(request.headers.get( | ||
'Create-time')) | ||
return { | ||
'last_modified': last_modified.isoformat(), | ||
'create_time': create_time.isoformat()} | ||
|
||
@router.get('/ready') | ||
async def get_ready(): | ||
""" A simple endpoint for OpenShift readiness """ | ||
return Response() | ||
|
||
|
||
@router.get('/health') | ||
async def get_health(): | ||
""" A simple endpoint for Openshift Healthchecks. """ | ||
return Response() | ||
|
||
|
||
@router.post('/upload') | ||
async def upload(file: UploadFile, | ||
request: Request, | ||
background_tasks: BackgroundTasks, | ||
_=Depends(sfms_authenticate)): | ||
last_modified = datetime.fromisoformat(request.headers.get("Last-modified")) | ||
create_time = datetime.fromisoformat(request.headers.get("Create-time")) | ||
return {"last_modified": last_modified.isoformat(), "create_time": create_time.isoformat()} | ||
|
||
|
||
@router.post("/upload") | ||
async def upload(file: UploadFile, request: Request, background_tasks: BackgroundTasks, _=Depends(sfms_authenticate)): | ||
""" | ||
Trigger the SFMS process to run on the provided file. | ||
The header MUST include the SFMS secret key. | ||
|
@@ -89,20 +73,17 @@ async def upload(file: UploadFile, | |
-F '[email protected];type=image/tiff' | ||
``` | ||
""" | ||
logger.info('sfms/upload/') | ||
logger.info("sfms/upload/") | ||
# Get an async S3 client. | ||
async with get_client() as (client, bucket): | ||
# We save the Last-modified and Create-time as metadata in the object store - just | ||
# in case we need to know about it in the future. | ||
key = get_target_filename(file.filename) | ||
logger.info('Uploading file "%s" to "%s"', file.filename, key) | ||
meta_data = get_meta_data(request) | ||
await client.put_object(Bucket=bucket, | ||
Key=key, | ||
Body=FileLikeObject(file.file), | ||
Metadata=meta_data) | ||
await client.put_object(Bucket=bucket, Key=key, Body=FileLikeObject(file.file), Metadata=meta_data) | ||
await file.close() | ||
logger.info('Done uploading file') | ||
logger.info("Done uploading file") | ||
try: | ||
# We don't want to hold back the response to the client, so we'll publish the message | ||
# as a background task. | ||
|
@@ -122,10 +103,9 @@ async def upload(file: UploadFile, | |
# and can't be given that level of responsibility. | ||
return Response(status_code=200) | ||
|
||
@router.post('/upload/hourlies') | ||
async def upload_hourlies(file: UploadFile, | ||
request: Request, | ||
_=Depends(sfms_authenticate)): | ||
|
||
@router.post("/upload/hourlies") | ||
async def upload_hourlies(file: UploadFile, request: Request, _=Depends(sfms_authenticate)): | ||
""" | ||
Trigger the SFMS process to run on the provided file for hourlies. | ||
The header MUST include the SFMS secret key. | ||
|
@@ -139,7 +119,7 @@ async def upload_hourlies(file: UploadFile, | |
-F '[email protected];type=image/tiff' | ||
``` | ||
""" | ||
logger.info('sfms/upload/hourlies') | ||
logger.info("sfms/upload/hourlies") | ||
|
||
if is_ffmc_file(file.filename): | ||
# Get an async S3 client. | ||
|
@@ -149,40 +129,34 @@ async def upload_hourlies(file: UploadFile, | |
key = get_hourly_filename(file.filename) | ||
logger.info('Uploading file "%s" to "%s"', file.filename, key) | ||
meta_data = get_meta_data(request) | ||
await client.put_object(Bucket=bucket, | ||
Key=key, | ||
ACL=SFMS_HOURLIES_PERMISSIONS, | ||
Body=FileLikeObject(file.file), | ||
Metadata=meta_data) | ||
await client.put_object(Bucket=bucket, Key=key, ACL=SFMS_HOURLIES_PERMISSIONS, Body=FileLikeObject(file.file), Metadata=meta_data) | ||
await file.close() | ||
logger.info('Done uploading file') | ||
logger.info("Done uploading file") | ||
return Response(status_code=200) | ||
|
||
|
||
@router.get('/hourlies', response_model=HourlyTIFs) | ||
@router.get("/hourlies", response_model=HourlyTIFs) | ||
async def get_hourlies(for_date: date): | ||
""" | ||
Retrieve hourly FFMC TIF files for the given date. | ||
Retrieve hourly FFMC TIF files for the given date. | ||
Files are named in the format: "fine_fuel_moisture_codeYYYYMMDDHH.tif", where HH is the two digit day hour in PST. | ||
""" | ||
logger.info('sfms/hourlies') | ||
logger.info("sfms/hourlies") | ||
|
||
async with get_client() as (client, bucket): | ||
logger.info('Retrieving hourlies for "%s"', for_date) | ||
bucket = config.get('OBJECT_STORE_BUCKET') | ||
response = await client.list_objects_v2(Bucket=bucket, Prefix=f'sfms/uploads/hourlies/{str(for_date)}') | ||
if 'Contents' in response: | ||
hourlies = [HourlyTIF(url=f'https://nrs.objectstore.gov.bc.ca/{bucket}/{hourly["Key"]}') for hourly in response['Contents']] | ||
logger.info(f'Retrieved {len(hourlies)} hourlies') | ||
bucket = config.get("OBJECT_STORE_BUCKET") | ||
response = await client.list_objects_v2(Bucket=bucket, Prefix=f"sfms/uploads/hourlies/{str(for_date)}") | ||
if "Contents" in response: | ||
hourlies = [HourlyTIF(url=f'https://nrs.objectstore.gov.bc.ca/{bucket}/{hourly["Key"]}') for hourly in response["Contents"]] | ||
logger.info(f"Retrieved {len(hourlies)} hourlies") | ||
return HourlyTIFs(hourlies=hourlies) | ||
logger.info(f'No hourlies found for {for_date}') | ||
logger.info(f"No hourlies found for {for_date}") | ||
return HourlyTIFs(hourlies=[]) | ||
|
||
|
||
@router.post('/manual') | ||
async def upload_manual(file: UploadFile, | ||
request: Request, | ||
background_tasks: BackgroundTasks): | ||
|
||
@router.post("/manual") | ||
async def upload_manual(file: UploadFile, request: Request, background_tasks: BackgroundTasks): | ||
""" | ||
Trigger the SFMS process to run on the provided file. | ||
The header MUST include the SFMS secret key. | ||
|
@@ -198,31 +172,27 @@ async def upload_manual(file: UploadFile, | |
-F '[email protected];type=image/tiff' | ||
``` | ||
""" | ||
logger.info('sfms/manual') | ||
forecast_or_actual = request.headers.get('ForecastOrActual') | ||
issue_date = datetime.fromisoformat(str(request.headers.get('IssueDate'))) | ||
secret = request.headers.get('Secret') | ||
if not secret or secret != config.get('SFMS_SECRET'): | ||
logger.info("sfms/manual") | ||
forecast_or_actual = request.headers.get("ForecastOrActual") | ||
issue_date = datetime.fromisoformat(str(request.headers.get("IssueDate"))) | ||
secret = request.headers.get("Secret") | ||
if not secret or secret != config.get("SFMS_SECRET"): | ||
return Response(status_code=401) | ||
# Get an async S3 client. | ||
async with get_client() as (client, bucket): | ||
# We save the Last-modified and Create-time as metadata in the object store - just | ||
# in case we need to know about it in the future. | ||
key = os.path.join('sfms', 'uploads', forecast_or_actual, issue_date.isoformat()[:10], file.filename) | ||
key = os.path.join("sfms", "uploads", forecast_or_actual, issue_date.isoformat()[:10], file.filename) | ||
# create the filename | ||
logger.info('Uploading file "%s" to "%s"', file.filename, key) | ||
meta_data = get_meta_data(request) | ||
await client.put_object(Bucket=bucket, | ||
Key=key, | ||
Body=FileLikeObject(file.file), | ||
Metadata=meta_data) | ||
await client.put_object(Bucket=bucket, Key=key, Body=FileLikeObject(file.file), Metadata=meta_data) | ||
await file.close() | ||
logger.info('Done uploading file') | ||
logger.info("Done uploading file") | ||
return add_msg_to_queue(file, key, forecast_or_actual, meta_data, issue_date, background_tasks) | ||
|
||
|
||
def add_msg_to_queue(file: UploadFile, key: str, forecast_or_actual: str, meta_data: dict, | ||
issue_date: datetime, background_tasks: BackgroundTasks): | ||
def add_msg_to_queue(file: UploadFile, key: str, forecast_or_actual: str, meta_data: dict, issue_date: datetime, background_tasks: BackgroundTasks): | ||
try: | ||
# We don't want to hold back the response to the client, so we'll publish the message | ||
# as a background task. | ||
|
@@ -231,14 +201,14 @@ def add_msg_to_queue(file: UploadFile, key: str, forecast_or_actual: str, meta_d | |
if is_hfi_file(filename=file.filename): | ||
logger.info("HFI file: %s, putting processing message on queue", file.filename) | ||
for_date = get_date_part(file.filename) | ||
message = SFMSFile(key=key, | ||
run_type=forecast_or_actual, | ||
last_modified=datetime.fromisoformat(meta_data.get('last_modified')), | ||
create_time=datetime.fromisoformat(meta_data.get('create_time')), | ||
run_date=issue_date, | ||
for_date=date(year=int(for_date[0:4]), | ||
month=int(for_date[4:6]), | ||
day=int(for_date[6:8]))) | ||
message = SFMSFile( | ||
key=key, | ||
run_type=forecast_or_actual, | ||
last_modified=datetime.fromisoformat(meta_data.get("last_modified")), | ||
create_time=datetime.fromisoformat(meta_data.get("create_time")), | ||
run_date=issue_date, | ||
for_date=date(year=int(for_date[0:4]), month=int(for_date[4:6]), day=int(for_date[6:8])), | ||
) | ||
background_tasks.add_task(publish, stream_name, sfms_file_subject, message, subjects) | ||
except Exception as exception: | ||
logger.error(exception, exc_info=True) | ||
|
@@ -251,30 +221,22 @@ def add_msg_to_queue(file: UploadFile, key: str, forecast_or_actual: str, meta_d | |
return Response(status_code=200) | ||
|
||
|
||
@router.post('/manual/msgOnly') | ||
async def upload_manual_msg(message: ManualSFMS, | ||
background_tasks: BackgroundTasks, | ||
secret: str | None = Header(default=None)): | ||
@router.post("/manual/msgOnly") | ||
async def upload_manual_msg(message: ManualSFMS, background_tasks: BackgroundTasks, secret: str | None = Header(default=None)): | ||
""" | ||
Trigger the SFMS process to run on a tif file that already exists in s3. | ||
Client provides, key, for_date, runtype, run_date and an | ||
SFMS message is queued up on the message queue. | ||
""" | ||
logger.info('sfms/manual/msgOnly') | ||
logger.info("sfms/manual/msgOnly") | ||
logger.info("Received request to process tif: %s", message.key) | ||
if not secret or secret != config.get('SFMS_SECRET'): | ||
if not secret or secret != config.get("SFMS_SECRET"): | ||
return Response(status_code=401) | ||
|
||
async with get_client() as (client, bucket): | ||
tif_object = await client.get_object(Bucket=bucket, | ||
Key=message.key) | ||
logger.info('Found requested object: %s', tif_object) | ||
tif_object = await client.get_object(Bucket=bucket, Key=message.key) | ||
logger.info("Found requested object: %s", tif_object) | ||
last_modified = datetime.fromisoformat(tif_object["Metadata"]["last_modified"]) | ||
create_time = datetime.fromisoformat(tif_object["Metadata"]["create_time"]) | ||
message = SFMSFile(key=message.key, | ||
run_type=message.runtype, | ||
last_modified=last_modified, | ||
create_time=create_time, | ||
run_date=message.run_date, | ||
for_date=message.for_date) | ||
message = SFMSFile(key=message.key, run_type=message.runtype, last_modified=last_modified, create_time=create_time, run_date=message.run_date, for_date=message.for_date) | ||
background_tasks.add_task(publish, stream_name, sfms_file_subject, message, subjects) |
Oops, something went wrong.