Skip to content

Commit

Permalink
Linter demands unreachable exception. OK, fine
Browse files Browse the repository at this point in the history
  • Loading branch information
moradology committed Jun 4, 2024
1 parent e7b60e0 commit 487f039
Showing 1 changed file with 13 additions and 7 deletions.
20 changes: 13 additions & 7 deletions pangeo_forge_recipes/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,9 @@ class TransferFilesWithConcurrency(beam.DoFn):
secrets: Optional[Dict] = None
open_kwargs: Optional[Dict] = None
fsspec_sync_patch: bool = False
max_retries: int
initial_backoff: float
backoff_factor: float
max_retries: int = 5
initial_backoff: float = 1.0
backoff_factor: float = 2.0

def process(self, indexed_urls):
with ThreadPoolExecutor(max_workers=self.max_concurrency) as executor:
Expand All @@ -184,18 +184,24 @@ def transfer_file(self, index: int, url: str) -> Tuple[int, str]:
while retries <= self.max_retries:
try:
open_kwargs = self.open_kwargs or {}
self.transfer_target.cache_file(url, self.secrets, self.fsspec_sync_patch, **open_kwargs)
self.transfer_target.cache_file(
url, self.secrets, self.fsspec_sync_patch, **open_kwargs
)
return (index, self.transfer_target._full_path(url))
except Exception as e:
if retries == self.max_retries:
logger.error(f"Max retries reached for {url}: {e}")
raise e
else:
backoff_time = self.initial_backoff * (self.backoff_factor ** retries)
logger.warning(f"Error transferring file {url}: {e}. Retrying in {backoff_time} seconds...")
backoff_time = self.initial_backoff * (self.backoff_factor**retries)
logger.warning(
f"Error transferring file {url}: {e}. Retrying in {backoff_time} seconds..."
)
time.sleep(backoff_time)
retries += 1

raise RuntimeError(f"Failed to transfer file {url} after {self.max_retries} attempts.")


@dataclass
class CheckpointFileTransfer(beam.PTransform):
Expand Down Expand Up @@ -255,7 +261,7 @@ def expand(self, pcoll):
fsspec_sync_patch=self.fsspec_sync_patch,
max_retries=self.max_retries,
initial_backoff=self.initial_backoff,
backoff_factor=self.backoff_factor
backoff_factor=self.backoff_factor,
)
)
)
Expand Down

0 comments on commit 487f039

Please sign in to comment.