diff --git a/biothings/hub/dataindex/snapshooter.py b/biothings/hub/dataindex/snapshooter.py index 54f64e979..605770cb0 100644 --- a/biothings/hub/dataindex/snapshooter.py +++ b/biothings/hub/dataindex/snapshooter.py @@ -9,15 +9,17 @@ import boto3 from biothings import config as btconfig -from biothings.hub import SNAPSHOOTER_CATEGORY, SNAPSHOTMANAGER_CATEGORY +from biothings.hub import SNAPSHOOTER_CATEGORY from biothings.hub.databuild.buildconfig import AutoBuildConfig from biothings.hub.datarelease import set_pending_to_release_note from biothings.utils.common import merge from biothings.utils.hub import template_out from biothings.utils.hub_db import get_src_build +from biothings.utils.exceptions import RepositoryVerificationFailed from biothings.utils.loggers import get_logger from biothings.utils.manager import BaseManager from elasticsearch import Elasticsearch +from elasticsearch.exceptions import TransportError from config import logger as logging @@ -26,7 +28,6 @@ from .snapshot_repo import Repository from .snapshot_task import Snapshot - class ProcessInfo(): """ JobManager Process Info. @@ -68,9 +69,10 @@ def get(self): class Bucket(): - def __init__(self, client, bucket): + def __init__(self, client, bucket, region=None): self.client = client # boto3.S3.Client [X] self.bucket = bucket # bucket name + self.region = region def exists(self): bucket = self.client.Bucket(self.bucket) @@ -195,7 +197,7 @@ def setup_log(self, index): log_folder = os.path.join(btconfig.LOG_FOLDER, 'build', log_name, "snapshot") self.logger, _ = get_logger(index, log_folder=log_folder, force=True) - def snapshot(self, index, snapshot=None): + def snapshot(self, index, snapshot=None, recreate_repo=False): self.setup_log(index) async def _snapshot(snapshot): @@ -212,12 +214,15 @@ async def _snapshot(snapshot): self.pinfo.get_pinfo(step, snapshot), partial( getattr(self, state.func), - cfg, index, snapshot + cfg, index, snapshot, recreate_repo=recreate_repo )) try: dx = await job dx = StepResult(dx) - + except RepositoryVerificationFailed as ex: + self.logger.exception(ex) + state.failed(snapshot, detail=ex.args) + raise ex except Exception as exc: self.logger.exception(exc) state.failed({}, exc) @@ -235,14 +240,17 @@ async def _snapshot(snapshot): future.add_done_callback(self.logger.debug) return future - def pre_snapshot(self, cfg, index, snapshot): - - bucket = Bucket(self.cloud, cfg.bucket) + def pre_snapshot(self, cfg, index, snapshot, **kwargs): + bucket = Bucket(self.cloud, cfg.bucket, region=cfg["region"]) repo = Repository(self.client, cfg.repo) self.logger.info(bucket) self.logger.info(repo) + if kwargs.get("recreate_repo"): + self.logger.info("Delete old repository") + repo.delete() + if not repo.exists(): if not bucket.exists(): bucket.create(cfg.get("acl")) @@ -250,6 +258,11 @@ def pre_snapshot(self, cfg, index, snapshot): repo.create(**cfg) self.logger.info(repo) + try: + repo.verify(config=cfg) + except TransportError as tex: + raise RepositoryVerificationFailed({"error": tex.error, "detail": tex.info['error']}) + return { "__REPLACE__": True, "conf": {"repository": cfg.data}, @@ -257,7 +270,7 @@ def pre_snapshot(self, cfg, index, snapshot): "environment": self.name } - def _snapshot(self, cfg, index, snapshot): + def _snapshot(self, cfg, index, snapshot, **kwargs): snapshot = Snapshot( self.client, @@ -294,7 +307,7 @@ def _snapshot(self, cfg, index, snapshot): "created_at": datetime.now().astimezone() } - def post_snapshot(self, cfg, index, snapshot): + def post_snapshot(self, cfg, index, snapshot, **kwargs): build_id = self._doc(index)['_id'] set_pending_to_release_note(build_id) return {} @@ -388,13 +401,13 @@ def poll(self, state, func): # Features # ----------- - def snapshot(self, snapshot_env, index, snapshot=None): + def snapshot(self, snapshot_env, index, snapshot=None, recreate_repo=False): """ Create a snapshot named "snapshot" (or, by default, same name as the index) from "index" according to environment definition (repository, etc...) "env". """ env = self.register[snapshot_env] - return env.snapshot(index, snapshot) + return env.snapshot(index, snapshot, recreate_repo=recreate_repo) def snapshot_a_build(self, build_doc): """ diff --git a/biothings/hub/dataindex/snapshot_repo.py b/biothings/hub/dataindex/snapshot_repo.py index 0c6dcab1f..c9ba79d77 100644 --- a/biothings/hub/dataindex/snapshot_repo.py +++ b/biothings/hub/dataindex/snapshot_repo.py @@ -1,6 +1,8 @@ import elasticsearch +from biothings.utils.exceptions import RepositoryVerificationFailed + class Repository(): @@ -25,6 +27,32 @@ def create(self, **body): def delete(self): self.client.snapshot.delete_repository(self.name) + def verify(self, config): + """A repository is consider properly setup and working, when: + - passes verification of ElasticSearch + - it's settings must match with the snapshot's config. + """ + + # Check if the repo pass ElasticSearch's verification + self.client.snapshot.verify_repository(self.name) + + # Check if the repo's settings match with the snapshot's config + repo_settings = self.client.snapshot.get_repository(self.name)[self.name] + incorrect_data = {} + + for field in ["type", "settings"]: + if config[field] != repo_settings[field]: + incorrect_data[field] = {"config": config[field], "repo": repo_settings[field]} + + if incorrect_data: + raise RepositoryVerificationFailed({ + "error": "repository_verification_exception", + "detail": { + "message": "the repository's settings is not match with snapshot config.", + "diff": incorrect_data, + }, + }) + def __str__(self): return ( f"