Skip to content

Commit

Permalink
massive improvements in spc clone time, setup.py ver and dep bumps
Browse files Browse the repository at this point in the history
A fresh pull of all 200 remote datasets now takes about 3 minutes.

NOTE: `spc pull` should NOT BE USED unless you know exactly what
you are doing. In the future this functionality will be restored
with better performance, but for now it is almost always faster
delete the contents of the dataset folder and express ds.rchildren.

It only took me about 9 months to finally figure out that I had
actually fixed many of the pulling performance bottlenecks and that we
can almost entirely get rid of the current implementation of pull.

As it turns out it I got almost everything sorted out so that it is
possible to just call `list(dataset_cache.rchildren)` and the entire
entire tree will populate itself. When we fix the cache constructor
this becomes `[rc.materialize() for rc in d.rchildren]` or similar,
depending on exactly what we name that method. Better yet, if we do
it using a bare for loop then the memory overhead will be zero.

The other piece that makes this faster is the completed sparse pull
implementation. We now use the remote package count with a default
cutoff of 10k packages to cause a dataset to be sparse, namely that
only its metadata files and their parend directories are pulled. The
implementation of that is a bit slow, but still about 2 orders of
magnitude faster than the alternative. The approach for implementing
is_sparse also points the way toward being able to mark folders with
additional operational information, e.g. that they should not be
exported or that they should not be pulled at all.

Some tweaks to how spc rmeta works were also made so that existing
metadata will not be repulled in a bulk clone. This work also makes
the BlackfynnCache aware of the dataset metadata pulled from rmeta,
so we should be able to start comparing ttl file and bf:internal
metadata in the near future.
  • Loading branch information
tgbugs committed May 22, 2020
1 parent ca76041 commit 5434f4e
Show file tree
Hide file tree
Showing 8 changed files with 143 additions and 23 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ branches:
python:
- 3.6
- 3.7
- 3.8

install:
- pip install --upgrade pytest pytest-cov
Expand Down
8 changes: 4 additions & 4 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,23 +32,23 @@ def find_version(filename):
'License :: OSI Approved :: MIT License',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
],
keywords='SPARC curation biocuration ontology blackfynn protc protocols hypothesis',
packages=['sparcur', 'sparcur.export'],
python_requires='>=3.6',
tests_require=tests_require,
install_requires=[
'augpathlib>=0.0.16',
'augpathlib>=0.0.18',
'beautifulsoup4',
'blackfynn>=3.0.0',
'dicttoxml',
'idlib',
'ipython',
'idlib>=0.0.1.dev4',
"ipython; python_version < '3.7'",
'jsonschema>=3.0.1', # need the draft 6 validator
'protcur>=0.0.5',
'pyontutils>=0.1.22',
'pysercomb>=0.0.5',
'robobrowser',
'terminaltables',
'xlsx2csv',
],
Expand Down
2 changes: 1 addition & 1 deletion sparcur/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.0.1.dev0'
__version__ = '0.0.1.dev1'
12 changes: 10 additions & 2 deletions sparcur/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,7 @@ def _rchildren(self,
yield child
yield from child._rchildren(create_cache=create_cache)
elif isinstance(self.bfobject, Dataset):
sparse = sparse or self.cache.is_sparse()
deleted = []
if sparse:
filenames = self._sparse_stems
Expand Down Expand Up @@ -680,7 +681,11 @@ def _rchildren(self,
else:
raise exc.UnhandledTypeError # TODO

def children_pull(self, existing_caches=tuple(), only=tuple(), skip=tuple(), sparse=tuple()):
def children_pull(self,
existing_caches=tuple(),
only=tuple(),
skip=tuple(),
sparse=tuple()):
""" ONLY USE FOR organization level """
# FIXME this is really a recursive pull for organization level only ...
sname = lambda gen: sorted(gen, key=lambda c: c.name)
Expand Down Expand Up @@ -712,7 +717,10 @@ def refresh(c):

if not self._debug:
yield from (rc for d in Async(rate=self._async_rate)(
deferred(child.bootstrap)(recursive=True, only=only, skip=skip, sparse=sparse)
deferred(child.bootstrap)(recursive=True,
only=only,
skip=skip,
sparse=sparse)
for child in sname(self.children)
#if child.id in skipexisting
# TODO when dataset's have a 'anything in me updated'
Expand Down
78 changes: 66 additions & 12 deletions sparcur/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@
from sparcur.utils import GetTimeNow
from sparcur.utils import log, logd, SimpleFileHandler, bind_file_handler
from sparcur.utils import python_identifier, want_prefixes, symlink_latest
from sparcur.paths import Path, BlackfynnCache, PathMeta, StashPath
from sparcur.paths import Path, BlackfynnCache, StashPath
from sparcur.state import State
from sparcur.derives import Derives as De
from sparcur.backends import BlackfynnRemote
Expand Down Expand Up @@ -521,7 +521,17 @@ def project_id(self):

@property
def datasets(self):
yield from self.anchor.children # ok to yield from cache now that it is the bridge
# XXX DO NOT YIELD DIRECTLY FROM self.anchor.children
# unless you are cloning or something like that
# because cache.children completely ignores existing
# files and folders and there really isn't a safe way
# to use it once files already exist because then
# viewing the cached children would move all the folders
# around, file under sigh, yes fix CachePath construction
for local in self.datasets_local:
yield local.cache

#yield from self.anchor.children # NOT OK TO YIELD FROM THIS URG

@property
def datasets_remote(self):
Expand Down Expand Up @@ -658,8 +668,49 @@ def clone(self):
self.anchor = anchor
self.project_path = self.anchor.local
with anchor:
self.cwd = Path.cwd() # have to update self.cwd so pull sees the right thing
self.pull()
# update self.cwd so pull sees the right thing
self.cwd = Path.cwd()
self._clone_pull()

def _clone_pull(self):
""" wow this really shows that the pull -> bootstrap workflow
is completely broken and stupidly slow """

# populate just the dataset folders
datasets = list(self.anchor.children)
# populate the dataset metadata
self.rmeta(use_cache_path=True, exist_ok=True)

# mark sparse datasets so that we don't have to
# fiddle with detecting sparseness during bootstrap
sparse_limit = self.options.sparse_limit
[d._sparse_materialize(sparse_limit=sparse_limit) for d in datasets]
sparse = [d for d in datasets if d.is_sparse()] # sanity check


skip = auth.get_list('datasets-no') # FIXME this should be materialized as well

# pull all the files
from joblib import Parallel, delayed
import logging

def asdf(ca, d,
level='DEBUG' if self.options.verbose else 'INFO',
log_name=log.name):
log = logging.getLogger(log_name)
log.setLevel(level)

rc = d._remote_class
if not hasattr(rc, '_cache_anchor'):
rc.anchorTo(ca)

list(d.rchildren)


Parallel(n_jobs=12)(delayed(asdf)(self.anchor, d)
for d in datasets
# FIXME skip should be materialized in xattrs as well
if d.id not in skip)

def _total_package_counts(self):
from sparcur.datasources import BlackfynnDatasetData
Expand Down Expand Up @@ -702,7 +753,6 @@ def pull(self):
cwd = self.cwd
skip = auth.get_list('datasets-no')
sparse = self._sparse_from_metadata()
breakpoint()
if self.project_path.parent.name == 'big':
only = auth.get_list('datasets-sparse')
else:
Expand Down Expand Up @@ -1421,11 +1471,12 @@ def apinat(self):
graph = agraph.graph()
graph.write(path=path_out)

def rmeta(self):
def rmeta(self, use_cache_path=False, exist_ok=False):
from pyontutils.utils import Async, deferred
from sparcur.datasources import BlackfynnDatasetData
dsr = self.datasets_remote
prepared = [BlackfynnDatasetData(r) for r in dsr]
dsr = self.datasets if use_cache_path else self.datasets_remote
all_ = [BlackfynnDatasetData(r) for r in dsr]
prepared = [bdd for bdd in all_ if not (exist_ok and bdd.cache_path.exists())]
hz = self.options.rate
if not self.options.debug:
blobs = Async(rate=hz)(deferred(d)() for d in prepared)
Expand Down Expand Up @@ -1809,13 +1860,16 @@ def size(self, dirs=None, ext=None):
if not intrs:
intrs = self.cwdintr,

rows = [['path', 'id', 'dirs', 'files', 'size', 'hr'],
*sorted([[d.name, d.id, c['dirs'], c['files'], c['size'], c['size'].hr]
rows = [['path', 'id', 'sparse', 'dirs', 'files', 'size', 'hr'],
*sorted([[d.name,
d.id,
'x' if d.path.cache.is_sparse() else '',
c['dirs'], c['files'], c['size'], c['size'].hr]
for d in intrs
for c in (d.datasetdata.counts,)], key=lambda r: -r[-2])]

return self._print_table(rows, title='Size Report',
align=['l', 'l', 'r', 'r', 'r', 'r'], ext=ext)
align=['l', 'l', 'c', 'r', 'r', 'r', 'r'], ext=ext)

def test(self, ext=None):
rows = [['hello', 'world'], [1, 2]]
Expand Down Expand Up @@ -2201,7 +2255,7 @@ def mismatch(self):
paths = [p for p, *_ in oops]

def sf(cmeta):
nmeta = PathMeta(id=cmeta.old_id)
nmeta = aug.PathMeta(id=cmeta.old_id)
assert nmeta.id, f'No old_id for {pathmeta}'
return nmeta

Expand Down
2 changes: 1 addition & 1 deletion sparcur/datasources.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ def __init__(self, remote_cache_or_id):
if isinstance(remote_cache_or_id, aug.RemotePath):
self._remote = remote_cache_or_id
self._bfobject = self._remote.bfobject
self.id = remote.id
self.id = self._remote.id
elif isinstance(remote_cache_or_id, aug.CachePath):
self._c_cache_path = remote_cache_or_id
self.id = self._c_cache_path.id
Expand Down
61 changes: 59 additions & 2 deletions sparcur/paths.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,9 @@
from dateutil import parser
from augpathlib import PrimaryCache, EatCache, SqliteCache, SymlinkCache
from augpathlib import RepoPath, LocalPath
from augpathlib import RemotePath # FIXME just for reimport
from sparcur import backends
from sparcur import exceptions as exc
from sparcur.utils import log
from augpathlib import PathMeta


def cleanup(func):
Expand All @@ -42,6 +40,7 @@ class BlackfynnCache(PrimaryCache, EatCache):
xattr_prefix = 'bf'
_backup_cache = SqliteCache
_not_exists_cache = SymlinkCache

cypher = hashlib.sha256 # set the remote hash cypher on the cache class

uri_human = backends.BlackfynnRemote.uri_human
Expand Down Expand Up @@ -128,6 +127,58 @@ def file_id(self):
def cache_key(self):
return f'{self.id}-{self.file_id}'

def _dataset_metadata(self, force_cache=False):
""" get metadata about a dataset from the remote metadata store """
# TODO figure out the right default for force_cache
dataset = self.dataset
if dataset == self:
if not hasattr(self, '_c__dataset_metadata'):
bdd = BlackfynnDatasetData(self)
try:
blob = bdd.fromCache()
except FileNotFoundError as e:
# FIXME TODO also check cached rmeta dates during pull
if force_cache:
raise e
else:
log.warning(e)
blob = bdd()

self._c__dataset_metadata = blob

return self._c__dataset_metadata

else:
return dataset._dataset_metadata()

def _package_count(self):
if self.is_dataset():
return sum(self._dataset_metadata()['package_counts'].values())
else:
raise NotImplementedError('unused at the moment')

def _sparse_materialize(self, *args, sparse_limit=None):
""" use data from the remote mark or clear datasets as sparse """
if sparse_limit is None:
sparse_limit = auth.get('sparse-limit') # yay for yaml having int type
#sparse_limit = _sl if _sl is None else int_(sl)

if self.is_dataset():
package_count = self._package_count()
sparse_remote = (False
if sparse_limit is None else
package_count >= sparse_limit)
sparse_cache = self.is_sparse()
if sparse_remote:
if not sparse_cache:
self._mark_sparse()
elif sparse_cache: # strange case where number of packages decreases
self._clear_sparse()

else:
msg = 'at the moment only datasets can be marked as sparse'
raise NotImplementedError(msg)

@property
def data(self):
""" get the 'cached' data which isn't really cached at the moment
Expand Down Expand Up @@ -168,6 +219,9 @@ def data(self):
yield from self.local_object_cache_path._data_setter(gen)
self.local_object_cache_path.cache_init(self.meta) # FIXME self.meta be stale here?!

def _meta_is_root(self, meta):
return meta.id.startswith('N:organization:')

def _bootstrap_recursive(self, only=tuple(), skip=tuple(), sparse=False):
""" only on the first call to this function should sparse be a tuple """
# bootstrap the rest if we told it to
Expand Down Expand Up @@ -281,3 +335,6 @@ def remote(self):
backends.BlackfynnRemote.cache_key = BlackfynnCache.cache_key
backends.BlackfynnRemote._sparse_stems = BlackfynnCache._sparse_stems
backends.BlackfynnRemote._sparse_include = BlackfynnCache._sparse_include

# end imports (woo circular deps)
from sparcur.datasources import BlackfynnDatasetData
2 changes: 1 addition & 1 deletion test/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from sparcur import config
from sparcur import exceptions as exc
from sparcur.paths import Path
from sparcur.paths import LocalPath, PrimaryCache, RemotePath
from sparcur.paths import LocalPath, PrimaryCache
from sparcur.paths import SymlinkCache
from sparcur.state import State
from sparcur.datasets import DatasetDescriptionFile
Expand Down

0 comments on commit 5434f4e

Please sign in to comment.