Skip to content

Commit

Permalink
initial work for marking datasets as sparse based on remote metadata
Browse files Browse the repository at this point in the history
the packageTypeCounts endpoint for blackfynn is what I have been
looking for to make it possible to determine apriori whether datasets
should be set for sparse pull or not

this follows on the fact that at the moment if you forget to set
datasets-sparse in the user config memory usage explodes during pull
detecting sparseness in advance avoids the need to fix the memory
usage issue right at the moment, though it does need to be fixed
  • Loading branch information
tgbugs committed May 22, 2020
1 parent a4a0591 commit ca76041
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 11 deletions.
5 changes: 2 additions & 3 deletions sparcur/auth-config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@
'hypothesis-api-key': {'environment-variables': 'HYP_API_KEY HYP_API_TOKEN'},
'hypothesis-group': {'environment-variables': 'HYP_GROUP'},
'hypothesis-user': {'environment-variables': 'HYP_USER'},
'protocols-io-api-creds-file': None,
'protocols-io-api-store-file': None,
'datasets-noexport': None,
'datasets-sparse': None,
'datasets-no': None,
}}
'sparse-limit': {'default': 10000,
'environment-variables': 'SPARCUR_SPARSE_LIMIT SPARSE_LIMIT'},}}
12 changes: 12 additions & 0 deletions sparcur/blackfynn_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -501,10 +501,22 @@ def restructure(j):
break


@property
def packageTypeCounts(self):
session = self._api.session
resp = session.get(f'https://api.blackfynn.io/datasets/{self.id}/packageTypeCounts')
if resp.ok:
j = resp.json()
return j
else:
resp.raise_for_status()


# monkey patch Dataset to implement packages endpoint
Dataset._packages = _packages
Dataset.packages = packages
Dataset.packagesByName = packagesByName
Dataset.packageTypeCounts = packageTypeCounts


@property
Expand Down
54 changes: 50 additions & 4 deletions sparcur/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
pull retrieve remote file structure
options: --empty
: --sparse-limit
refresh retrieve remote file sizes and fild ids (can also fetch using the new data)
Expand Down Expand Up @@ -138,7 +139,7 @@
-R --refresh refresh matching files
-r --rate=HZ sometimes we can go too fast when fetching [default: 5]
-l --limit=SIZE_MB the maximum size to download in megabytes [default: 2]
use negative numbers to indicate no limit
use zero or negative numbers to indicate no limit
-L --level=LEVEL how deep to go in a refresh
used by any command that acceps <path>...
-p --pretend if the defult is to act, dont, opposite of fetch
Expand All @@ -155,6 +156,8 @@
-z --only-no-file-id only pull files missing file_id
-o --overwrite fetch even if the file exists
--project-path=<PTH> set the project path manually
--sparse-limit=COUNT package count that forces a sparse pull [default: {auth.get('sparse-limit')}]
use zero or negative numbers to indicate no limit
-t --tab-table print simple table using tabs for copying
-A --latest run derived pipelines from latest json
Expand Down Expand Up @@ -257,6 +260,12 @@ def limit(self):
if l >= 0:
return l

@property
def sparse_limit(self):
l = int(self._args['--sparse-limit'])
if l >= 0:
return l

@property
def level(self):
return int(self._args['--level']) if self._args['--level'] else None
Expand Down Expand Up @@ -652,6 +661,38 @@ def clone(self):
self.cwd = Path.cwd() # have to update self.cwd so pull sees the right thing
self.pull()

def _total_package_counts(self):
from sparcur.datasources import BlackfynnDatasetData

def total_packages(cache):
bdd = BlackfynnDatasetData(cache)
try:
blob = bdd.fromCache()
except FileNotFoundError as e:
# FIXME TODO also check cached rmeta dates during pull
log.warning(e)
blob = bdd()

return sum(blob['package_counts'].values())

totals = []
for cache in self.datasets:
totals.append((cache, total_packages(cache)))

return totals

def _sparse_from_metadata(self, sparse_limit:int=None):
if sparse_limit is None:
sparse_limit = self.options.sparse_limit

if sparse_limit is None:
return []

totals = self._total_package_counts()
#tps = sorted([ for d in self.datasets], key= lambda ab: ab[1])
sparse = [r.id for r, pc in totals if pc >= sparse_limit]
return sparse

def pull(self):
# TODO folder meta -> org
from pyontutils.utils import Async, deferred
Expand All @@ -660,10 +701,15 @@ def pull(self):
dirs = self.directories
cwd = self.cwd
skip = auth.get_list('datasets-no')
sparse = self._sparse_from_metadata()
breakpoint()
if self.project_path.parent.name == 'big':
only = auth.get_list('datasets-sparse')
else:
sparse = auth.get_list('datasets-sparse')
more_sparse = auth.get_list('datasets-sparse')
if more_sparse:
sparse.extend(more_sparse)
sparse = sorted(set(sparse))

if not dirs:
dirs = cwd,
Expand Down Expand Up @@ -753,7 +799,8 @@ def hrm(child):
if self.options.debug or self.options.jobs == 1:
_blank = [hrm(cd) for cd in d.children]
else:
_blank = Async()(deferred(hrm)(cd) for cd in d.children)
_blank = Async(rate=self.options.rate)(
deferred(hrm)(cd) for cd in d.children)

# FIXME something after this point is retaining stale filepaths on dataset rename ...
#d = r.local # in case a folder moved
Expand All @@ -768,7 +815,6 @@ def hrm(child):
maybe_new_ids = set(new_ids) - set(existing_ids)
if maybe_removed_ids:
# FIXME pull sometimes has fake file extensions
from pyontutils.utils import Async, deferred
from pathlib import PurePath
maybe_removed = [existing_d[id] for id in maybe_removed_ids]
maybe_removed_stems = {PurePath(p.parent) / p.stem:p
Expand Down
45 changes: 41 additions & 4 deletions sparcur/datasources.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from urllib.parse import urlparse, parse_qs
import rdflib
import requests
import augpathlib as aug
from pyontutils.config import auth as pauth
from pyontutils.core import OntId
from pyontutils.utils import byCol
Expand Down Expand Up @@ -258,24 +259,59 @@ class BlackfynnDatasetData:
cache_path = auth.get_path('cache-path')
cache_base = cache_path / 'blackfynn-meta'

def __init__(self, remote):
self.remote = remote
self.bfobject = remote.bfobject
self.ntfs_safe_id = remote.id.split(':')[-1] # sigh
def __init__(self, remote_cache_or_id):
if isinstance(remote_cache_or_id, aug.RemotePath):
self._remote = remote_cache_or_id
self._bfobject = self._remote.bfobject
self.id = remote.id
elif isinstance(remote_cache_or_id, aug.CachePath):
self._c_cache_path = remote_cache_or_id
self.id = self._c_cache_path.id
else:
self.id = remote_cache_or_id

self.ntfs_safe_id = self.id.split(':')[-1] # sigh
self.cache = self.cache_base / self.ntfs_safe_id
if not self.cache_base.exists():
self.cache_base.mkdir()

@property
def _cache_path(self):
if not hasattr(self, '_c_cache_path'):
raise NotImplementedError('if you need access to the cache '
'pass one in at construction time')

return self._c_cache_path

@property
def remote(self):
if not hasattr(self, '_remote'):
self._remote = self._cache_path.remote

return self._remote

@property
def bfobject(self):
if not hasattr(self, '_bfobject'):
self._bfobject = self.remote.bfobject

return self._bfobject

def fromCache(self):
""" retrieve cached results without hitting the network """
# TODO FIXME error on no cache?
if not self.cache.exists():
msg = 'No cached metadata for {self.id}. Run `spc rmeta` to populate.'
raise FileNotFoundError(msg)

with open(self.cache, 'rt') as f:
return json.load(f)

def __call__(self):
# FIXME TODO switch to use dict transformers
# self.bfobject.relationships()
meta = self.bfobject.meta # why this is not default in the python api the world may never know
package_counts = self.bfobject.packageTypeCounts
cont = meta['content']
blob = {'id': self.remote.id,
'name': cont['name'], # title
Expand All @@ -290,6 +326,7 @@ def __call__(self):
#'teams': self.bfobject.teams,
#'users': self.bfobject.users,
'contributors': self.bfobject.contributors,
'package_counts': package_counts,
}

if 'license' in cont:
Expand Down

0 comments on commit ca76041

Please sign in to comment.