Skip to content

Commit

Permalink
Storage ops followup type checking (#1274)
Browse files Browse the repository at this point in the history
* storage ops: follow up to #1257:
- fix refactor typo
- add type hints for all storageops apis (add mypy_boto3_s3 and types_aiobotocore_s3 for type hints)
  • Loading branch information
ikreymer authored Oct 11, 2023
1 parent f1dcc7e commit 41c054d
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 29 deletions.
2 changes: 1 addition & 1 deletion backend/btrixcloud/crawls.py
Original file line number Diff line number Diff line change
Expand Up @@ -916,7 +916,7 @@ async def stream_crawl_logs(
if crawl.finished:
wacz_files = await ops.get_wacz_files(crawl_id, org)
resp = await storage_ops.sync_stream_wacz_logs(
org, wacz_files, log_levels, contexts, crawl_manager
org, wacz_files, log_levels, contexts
)
return StreamingResponse(resp)

Expand Down
6 changes: 3 additions & 3 deletions backend/btrixcloud/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -633,9 +633,9 @@ class S3Storage(BaseModel):
endpoint_url: str
access_key: str
secret_key: str
access_endpoint_url: Optional[str]
region: Optional[str] = ""
use_access_for_presign: Optional[bool] = True
access_endpoint_url: str
region: str = ""
use_access_for_presign: bool = True


# ============================================================================
Expand Down
97 changes: 72 additions & 25 deletions backend/btrixcloud/storages.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""
Storage API
"""
from typing import Optional, Union, Iterator, Iterable, List, Dict
from typing import Optional, Union, Iterator, Iterable, List, Dict, AsyncIterator
from urllib.parse import urlsplit
from contextlib import asynccontextmanager

Expand All @@ -19,7 +19,18 @@
import aiobotocore.session
import boto3

from .models import CrawlFile, Organization, DefaultStorage, S3Storage, User
from mypy_boto3_s3.client import S3Client
from mypy_boto3_s3.type_defs import CompletedPartTypeDef
from types_aiobotocore_s3 import S3Client as AIOS3Client

from .models import (
CrawlFile,
CrawlFileOut,
Organization,
DefaultStorage,
S3Storage,
User,
)
from .zip import (
sync_get_zip_file,
sync_get_log_stream,
Expand Down Expand Up @@ -55,7 +66,7 @@ def __init__(self, org_ops, crawl_manager):
# expand when additional storage options are supported
raise TypeError("Only s3 storage supported for now")

def _create_s3_storage(self, storage):
def _create_s3_storage(self, storage: dict[str, str]) -> S3Storage:
"""create S3Storage object"""
endpoint_url = storage["endpoint_url"]
bucket_name = storage.get("bucket_name")
Expand All @@ -78,13 +89,13 @@ def _create_s3_storage(self, storage):
use_access_for_presign=use_access_for_presign,
)

def has_storage(self, name):
def has_storage(self, name) -> bool:
"""assert the specified storage exists"""
return name in self.storages

async def update_storage(
self, storage: Union[S3Storage, DefaultStorage], org: Organization, user: User
):
) -> dict[str, bool]:
"""update storage for org"""
if storage.type == "default":
if not self.has_storage(storage.name):
Expand All @@ -109,7 +120,9 @@ async def update_storage(
return {"updated": True}

@asynccontextmanager
async def get_s3_client(self, storage, use_access=False):
async def get_s3_client(
self, storage: S3Storage, use_access=False
) -> AsyncIterator[tuple[AIOS3Client, str, str]]:
"""context manager for s3 client"""
endpoint_url = (
storage.endpoint_url if not use_access else storage.access_endpoint_url
Expand All @@ -133,7 +146,9 @@ async def get_s3_client(self, storage, use_access=False):
) as client:
yield client, bucket, key

def get_sync_s3_client(self, storage, use_access=False):
def get_sync_s3_client(
self, storage: S3Storage, use_access=False
) -> tuple[S3Client, str, str, str]:
"""context manager for s3 client"""
endpoint_url = storage.endpoint_url

Expand All @@ -159,7 +174,7 @@ def get_sync_s3_client(self, storage, use_access=False):

return client, bucket, key, public_endpoint_url

async def verify_storage_upload(self, storage, filename):
async def verify_storage_upload(self, storage: S3Storage, filename: str) -> None:
"""Test credentials and storage endpoint by uploading an empty test file"""

async with self.get_s3_client(storage) as (client, bucket, key):
Expand All @@ -169,7 +184,9 @@ async def verify_storage_upload(self, storage, filename):
resp = await client.put_object(Bucket=bucket, Key=key, Body=data)
assert resp["ResponseMetadata"]["HTTPStatusCode"] == 200

def get_org_storage(self, org, storage_name="default", check_name_first=False):
def get_org_storage(
self, org: Organization, storage_name="default", check_name_first=False
) -> S3Storage:
"""get storage for org, either looking for default storage name first
or custom storage from the org. Check default storage first if flag
set to true"""
Expand All @@ -186,29 +203,38 @@ def get_org_storage(self, org, storage_name="default", check_name_first=False):

return s3storage

async def do_upload_single(self, org, filename, data, storage_name="default"):
async def do_upload_single(
self, org: Organization, filename: str, data, storage_name="default"
) -> None:
"""do upload to specified key"""
s3storage = self.get_org_storage(org, storage_name)

async with self.get_s3_client(s3storage) as (client, bucket, key):
key += filename

return await client.put_object(Bucket=bucket, Key=key, Body=data)
await client.put_object(Bucket=bucket, Key=key, Body=data)

def get_sync_client(self, org, storage_name="default", use_access=False):
def get_sync_client(
self, org: Organization, storage_name="default", use_access=False
) -> tuple[S3Client, str, str, str]:
"""get sync client"""
s3storage = self.get_org_storage(org, storage_name)

return self.get_sync_s3_client(s3storage, use_access=use_access)

# pylint: disable=too-many-arguments,too-many-locals
async def do_upload_multipart(
self, org, filename, file_, min_size, storage_name="default"
):
self,
org: Organization,
filename: str,
file_: AsyncIterator,
min_size: int,
storage_name="default",
) -> bool:
"""do upload to specified key using multipart chunking"""
s3storage = self.get_org_storage(org, storage_name)

async def get_next_chunk(file_, min_size):
async def get_next_chunk(file_, min_size) -> bytes:
total = 0
bufs = []

Expand Down Expand Up @@ -252,7 +278,12 @@ async def get_next_chunk(file_, min_size):
flush=True,
)

parts.append({"PartNumber": part_number, "ETag": resp["ETag"]})
part: CompletedPartTypeDef = {
"PartNumber": part_number,
"ETag": resp["ETag"],
}

parts.append(part)

part_number += 1

Expand Down Expand Up @@ -280,7 +311,9 @@ async def get_next_chunk(file_, min_size):

return False

async def get_presigned_url(self, org, crawlfile, duration=3600):
async def get_presigned_url(
self, org: Organization, crawlfile: CrawlFile, duration=3600
) -> str:
"""generate pre-signed url for crawl file"""

s3storage = self.get_org_storage(org, crawlfile.def_storage_name, True)
Expand All @@ -307,13 +340,17 @@ async def get_presigned_url(self, org, crawlfile, duration=3600):

return presigned_url

async def delete_crawl_file_object(self, org, crawlfile):
async def delete_crawl_file_object(
self, org: Organization, crawlfile: CrawlFile
) -> bool:
"""delete crawl file from storage."""
return await self.delete_file(
org, crawlfile.filename, crawlfile.def_storage_name
)

async def delete_file(self, org, filename, def_storage_name="default"):
async def delete_file(
self, org: Organization, filename: str, def_storage_name="default"
) -> bool:
"""delete specified file from storage"""
status_code = None

Expand All @@ -330,7 +367,13 @@ async def delete_file(self, org, filename, def_storage_name="default"):

return status_code == 204

async def sync_stream_wacz_logs(self, org, wacz_files, log_levels, contexts):
async def sync_stream_wacz_logs(
self,
org: Organization,
wacz_files: List[CrawlFile],
log_levels: List[str],
contexts: List[str],
) -> Iterator[bytes]:
"""Return filtered stream of logs from specified WACZs sorted by timestamp"""
client, bucket, key, _ = self.get_sync_client(org)

Expand Down Expand Up @@ -435,7 +478,9 @@ def organize_based_on_instance_number(

return stream_json_lines(heap_iter, log_levels, contexts)

def _sync_dl(self, all_files, client, bucket, key):
def _sync_dl(
self, all_files: List[CrawlFileOut], client: S3Client, bucket: str, key: str
) -> Iterator[bytes]:
"""generate streaming zip as sync"""
for file_ in all_files:
file_.path = file_.name
Expand All @@ -444,9 +489,9 @@ def _sync_dl(self, all_files, client, bucket, key):
"profile": "multi-wacz-package",
"resources": [file_.dict() for file_ in all_files],
}
datapackage = json.dumps(datapackage).encode("utf-8")
datapackage_str = json.dumps(datapackage).encode("utf-8")

def get_file(name):
def get_file(name) -> Iterator[bytes]:
response = client.get_object(Bucket=bucket, Key=key + name)
return response["Body"].iter_chunks(chunk_size=CHUNK_SIZE)

Expand All @@ -467,12 +512,14 @@ def member_files():
modified_at,
perms,
NO_COMPRESSION_64,
(datapackage,),
(datapackage_str,),
)

return stream_zip(member_files(), chunk_size=CHUNK_SIZE)

async def download_streaming_wacz(self, org, files):
async def download_streaming_wacz(
self, org: Organization, files: List[CrawlFileOut]
) -> Iterator[bytes]:
"""return an iter for downloading a stream nested wacz file
from list of files"""
client, bucket, key, _ = self.get_sync_client(org)
Expand Down
2 changes: 2 additions & 0 deletions backend/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,5 @@ https://github.com/ikreymer/stream-zip/archive/refs/heads/stream-uncompress.zip
boto3
backoff>=2.2.1
python-slugify>=8.0.1
mypy_boto3_s3
types_aiobotocore_s3

0 comments on commit 41c054d

Please sign in to comment.