Skip to content

Commit

Permalink
Merge pull request #101 from newgene/manually-mark-dump-success
Browse files Browse the repository at this point in the history
add method to manually mark a datasource a successfully dumped
  • Loading branch information
newgene authored Mar 9, 2023
2 parents 69b7b13 + 6806e2c commit d23f306
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 14 deletions.
4 changes: 4 additions & 0 deletions biothings/hub/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1116,6 +1116,7 @@ def configure_commands(self):
if self.managers.get("dump_manager"):
self.commands["dump"] = self.managers["dump_manager"].dump_src
self.commands["dump_all"] = self.managers["dump_manager"].dump_all
self.commands["mark_dump_success"] = self.managers["dump_manager"].mark_success
# upload commands
if self.managers.get("upload_manager"):
self.commands["upload"] = self.managers["upload_manager"].upload_src
Expand Down Expand Up @@ -1512,6 +1513,9 @@ def configure_api_endpoints(self):
self.api_endpoints["source"].append(
EndpointDefinition(name="dump", method="put", suffix="dump")
)
self.api_endpoints["source"].append(
EndpointDefinition(name="mark_dump_success", method="put", suffix="mark_dump_success")
)
if "upload" in cmdnames:
self.api_endpoints["source"].append(
EndpointDefinition(name="upload", method="put", suffix="upload")
Expand Down
58 changes: 44 additions & 14 deletions biothings/hub/dataload/dumper.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import subprocess
import time
from concurrent.futures import ProcessPoolExecutor
from copy import deepcopy
from datetime import datetime, timezone
from functools import partial
from typing import Any, Callable, Dict, Generator, Iterable, List, Optional, Tuple, Union
Expand Down Expand Up @@ -268,7 +269,8 @@ def prepare_src_dump(self):
self.src_dump = get_src_dump()
self.src_doc = self.src_dump.find_one({'_id': self.src_name}) or {}

def register_status(self, status, transient=False, **extra):
def register_status(self, status, transient=False, dry_run=False, **extra):
src_doc = deepcopy(self.src_doc)
try:
# if status is "failed" and depending on where it failed,
# we may not be able to get the new_data_folder (if dumper didn't reach
Expand All @@ -281,20 +283,18 @@ def register_status(self, status, transient=False, **extra):
# it has not been set by the dumper before while exploring
# remote site. maybe we're just running post step ?
# back-compatibility; use "release" at root level if not found under "download"
release = self.src_doc.get("download", {}).get("release") or self.src_doc.get(
"release"
)
release = src_doc.get("download", {}).get("release") or src_doc.get("release")
self.logger.error(
"No release set, assuming: data_folder: %s, release: %s" % (data_folder, release)
)
# make sure to remove old "release" field to get back on track
for field in ["release", "data_folder"]:
if self.src_doc.get(field):
if src_doc.get(field):
self.logger.warning(
"Found '%s'='%s' at root level, convert to new format"
% (field, self.src_doc[field])
% (field, src_doc[field])
)
self.src_doc.pop(field)
src_doc.pop(field)

current_download_info = {
'_id': self.src_name,
Expand All @@ -312,7 +312,7 @@ def register_status(self, status, transient=False, **extra):
last_success = current_download_info["download"]["started_at"]
else:
# If failed, we will get the last_success from the last download instead.
last_download_info = self.src_doc.setdefault("download", {})
last_download_info = src_doc.setdefault("download", {})
last_success = last_download_info.get("last_success", None)
if not last_success and last_download_info.get("status") == 'success':
# If last_success from the last download doesn't exist or is None, and last
Expand All @@ -321,18 +321,22 @@ def register_status(self, status, transient=False, **extra):
if last_success:
current_download_info["download"]["last_success"] = last_success

self.src_doc.update(current_download_info)
src_doc.update(current_download_info)

# only register time when it's a final state
if transient:
self.src_doc["download"]["pid"] = os.getpid()
src_doc["download"]["pid"] = os.getpid()
else:
self.src_doc["download"]["time"] = timesofar(self.t0)
src_doc["download"]["time"] = timesofar(self.t0)
if "download" in extra:
self.src_doc["download"].update(extra["download"])
src_doc["download"].update(extra["download"])
else:
self.src_doc.update(extra)
self.src_dump.save(self.src_doc)
src_doc.update(extra)

# when dry run, we should not change the src_doc, and src_dump
if not dry_run:
self.src_doc = deepcopy(src_doc)
self.src_dump.save(src_doc)

async def dump(self, steps=None, force=False, job_manager=None, check_only=False, **kwargs):
'''
Expand Down Expand Up @@ -423,6 +427,19 @@ def postdumped(f):
if self.client:
self.release_client()

def mark_success(self, dry_run=True):
'''
Mark the datasource as successful dumped.
It's useful in case the datasource is unstable, and need to be manually downloaded.
'''
self.register_status("success", dry_run=dry_run)
self.logger.info("Done!")
result = {
"_id": self.src_doc["_id"],
"download": self.src_doc["download"],
}
return result

def get_predicates(self):
"""
Return a list of predicates (functions returning true/false, as in math logic)
Expand Down Expand Up @@ -1444,6 +1461,19 @@ def dump_src(
logging.error("Error while dumping '%s': %s" % (src, e))
raise

def mark_success(self, src, dry_run=True):
result = []
if src in self.register:
klasses = self.register[src]
else:
raise DumperException(
"Can't find '%s' in registered sources (whether as main or sub-source)" % src
)
for _, klass in enumerate(klasses):
inst = self.create_instance(klass)
result.append(inst.mark_success(dry_run=dry_run))
return result

def call(self, src, method_name, *args, **kwargs):
"""
Create a dumper for datasource "src" and call method "method_name" on it,
Expand Down

0 comments on commit d23f306

Please sign in to comment.