Skip to content

Commit

Permalink
Merge pull request #100 from newgene/verify-existing-snapshot-repository
Browse files Browse the repository at this point in the history
Verify existing snapshot repository
  • Loading branch information
newgene authored Mar 9, 2023
2 parents c89a5e3 + 46f5851 commit 69b7b13
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 13 deletions.
39 changes: 26 additions & 13 deletions biothings/hub/dataindex/snapshooter.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,17 @@

import boto3
from biothings import config as btconfig
from biothings.hub import SNAPSHOOTER_CATEGORY, SNAPSHOTMANAGER_CATEGORY
from biothings.hub import SNAPSHOOTER_CATEGORY
from biothings.hub.databuild.buildconfig import AutoBuildConfig
from biothings.hub.datarelease import set_pending_to_release_note
from biothings.utils.common import merge
from biothings.utils.hub import template_out
from biothings.utils.hub_db import get_src_build
from biothings.utils.exceptions import RepositoryVerificationFailed
from biothings.utils.loggers import get_logger
from biothings.utils.manager import BaseManager
from elasticsearch import Elasticsearch
from elasticsearch.exceptions import TransportError

from config import logger as logging

Expand All @@ -26,7 +28,6 @@
from .snapshot_repo import Repository
from .snapshot_task import Snapshot


class ProcessInfo():
"""
JobManager Process Info.
Expand Down Expand Up @@ -68,9 +69,10 @@ def get(self):

class Bucket():

def __init__(self, client, bucket):
def __init__(self, client, bucket, region=None):
self.client = client # boto3.S3.Client [X]
self.bucket = bucket # bucket name
self.region = region

def exists(self):
bucket = self.client.Bucket(self.bucket)
Expand Down Expand Up @@ -195,7 +197,7 @@ def setup_log(self, index):
log_folder = os.path.join(btconfig.LOG_FOLDER, 'build', log_name, "snapshot")
self.logger, _ = get_logger(index, log_folder=log_folder, force=True)

def snapshot(self, index, snapshot=None):
def snapshot(self, index, snapshot=None, recreate_repo=False):
self.setup_log(index)

async def _snapshot(snapshot):
Expand All @@ -212,12 +214,15 @@ async def _snapshot(snapshot):
self.pinfo.get_pinfo(step, snapshot),
partial(
getattr(self, state.func),
cfg, index, snapshot
cfg, index, snapshot, recreate_repo=recreate_repo
))
try:
dx = await job
dx = StepResult(dx)

except RepositoryVerificationFailed as ex:
self.logger.exception(ex)
state.failed(snapshot, detail=ex.args)
raise ex
except Exception as exc:
self.logger.exception(exc)
state.failed({}, exc)
Expand All @@ -235,29 +240,37 @@ async def _snapshot(snapshot):
future.add_done_callback(self.logger.debug)
return future

def pre_snapshot(self, cfg, index, snapshot):

bucket = Bucket(self.cloud, cfg.bucket)
def pre_snapshot(self, cfg, index, snapshot, **kwargs):
bucket = Bucket(self.cloud, cfg.bucket, region=cfg["region"])
repo = Repository(self.client, cfg.repo)

self.logger.info(bucket)
self.logger.info(repo)

if kwargs.get("recreate_repo"):
self.logger.info("Delete old repository")
repo.delete()

if not repo.exists():
if not bucket.exists():
bucket.create(cfg.get("acl"))
self.logger.info(bucket)
repo.create(**cfg)
self.logger.info(repo)

try:
repo.verify(config=cfg)
except TransportError as tex:
raise RepositoryVerificationFailed({"error": tex.error, "detail": tex.info['error']})

return {
"__REPLACE__": True,
"conf": {"repository": cfg.data},
"indexer_env": self.idxenv,
"environment": self.name
}

def _snapshot(self, cfg, index, snapshot):
def _snapshot(self, cfg, index, snapshot, **kwargs):

snapshot = Snapshot(
self.client,
Expand Down Expand Up @@ -294,7 +307,7 @@ def _snapshot(self, cfg, index, snapshot):
"created_at": datetime.now().astimezone()
}

def post_snapshot(self, cfg, index, snapshot):
def post_snapshot(self, cfg, index, snapshot, **kwargs):
build_id = self._doc(index)['_id']
set_pending_to_release_note(build_id)
return {}
Expand Down Expand Up @@ -388,13 +401,13 @@ def poll(self, state, func):
# Features
# -----------

def snapshot(self, snapshot_env, index, snapshot=None):
def snapshot(self, snapshot_env, index, snapshot=None, recreate_repo=False):
"""
Create a snapshot named "snapshot" (or, by default, same name as the index)
from "index" according to environment definition (repository, etc...) "env".
"""
env = self.register[snapshot_env]
return env.snapshot(index, snapshot)
return env.snapshot(index, snapshot, recreate_repo=recreate_repo)

def snapshot_a_build(self, build_doc):
"""
Expand Down
28 changes: 28 additions & 0 deletions biothings/hub/dataindex/snapshot_repo.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@

import elasticsearch

from biothings.utils.exceptions import RepositoryVerificationFailed


class Repository():

Expand All @@ -25,6 +27,32 @@ def create(self, **body):
def delete(self):
self.client.snapshot.delete_repository(self.name)

def verify(self, config):
"""A repository is consider properly setup and working, when:
- passes verification of ElasticSearch
- it's settings must match with the snapshot's config.
"""

# Check if the repo pass ElasticSearch's verification
self.client.snapshot.verify_repository(self.name)

# Check if the repo's settings match with the snapshot's config
repo_settings = self.client.snapshot.get_repository(self.name)[self.name]
incorrect_data = {}

for field in ["type", "settings"]:
if config[field] != repo_settings[field]:
incorrect_data[field] = {"config": config[field], "repo": repo_settings[field]}

if incorrect_data:
raise RepositoryVerificationFailed({
"error": "repository_verification_exception",
"detail": {
"message": "the repository's settings is not match with snapshot config.",
"diff": incorrect_data,
},
})

def __str__(self):
return (
f"<Repository {'READY' if self.exists() else 'MISSING'}"
Expand Down
6 changes: 6 additions & 0 deletions biothings/utils/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import json


class RepositoryVerificationFailed(Exception):
def __str__(self):
return json.dumps(self.args)

0 comments on commit 69b7b13

Please sign in to comment.