From 5434f4ee1a13bcf39d30965faa722b40399833c3 Mon Sep 17 00:00:00 2001 From: Tom Gillespie Date: Thu, 21 May 2020 22:17:10 -0700 Subject: [PATCH] massive improvements in spc clone time, setup.py ver and dep bumps A fresh pull of all 200 remote datasets now takes about 3 minutes. NOTE: `spc pull` should NOT BE USED unless you know exactly what you are doing. In the future this functionality will be restored with better performance, but for now it is almost always faster delete the contents of the dataset folder and express ds.rchildren. It only took me about 9 months to finally figure out that I had actually fixed many of the pulling performance bottlenecks and that we can almost entirely get rid of the current implementation of pull. As it turns out it I got almost everything sorted out so that it is possible to just call `list(dataset_cache.rchildren)` and the entire entire tree will populate itself. When we fix the cache constructor this becomes `[rc.materialize() for rc in d.rchildren]` or similar, depending on exactly what we name that method. Better yet, if we do it using a bare for loop then the memory overhead will be zero. The other piece that makes this faster is the completed sparse pull implementation. We now use the remote package count with a default cutoff of 10k packages to cause a dataset to be sparse, namely that only its metadata files and their parend directories are pulled. The implementation of that is a bit slow, but still about 2 orders of magnitude faster than the alternative. The approach for implementing is_sparse also points the way toward being able to mark folders with additional operational information, e.g. that they should not be exported or that they should not be pulled at all. Some tweaks to how spc rmeta works were also made so that existing metadata will not be repulled in a bulk clone. This work also makes the BlackfynnCache aware of the dataset metadata pulled from rmeta, so we should be able to start comparing ttl file and bf:internal metadata in the near future. --- .travis.yml | 1 + setup.py | 8 ++--- sparcur/__init__.py | 2 +- sparcur/backends.py | 12 +++++-- sparcur/cli.py | 78 +++++++++++++++++++++++++++++++++++------- sparcur/datasources.py | 2 +- sparcur/paths.py | 61 +++++++++++++++++++++++++++++++-- test/common.py | 2 +- 8 files changed, 143 insertions(+), 23 deletions(-) diff --git a/.travis.yml b/.travis.yml index 16dc9fd4..b1c43f45 100644 --- a/.travis.yml +++ b/.travis.yml @@ -14,6 +14,7 @@ branches: python: - 3.6 - 3.7 + - 3.8 install: - pip install --upgrade pytest pytest-cov diff --git a/setup.py b/setup.py index 6756d527..7fa2b9ab 100644 --- a/setup.py +++ b/setup.py @@ -32,23 +32,23 @@ def find_version(filename): 'License :: OSI Approved :: MIT License', 'Programming Language :: Python :: 3.6', 'Programming Language :: Python :: 3.7', + 'Programming Language :: Python :: 3.8', ], keywords='SPARC curation biocuration ontology blackfynn protc protocols hypothesis', packages=['sparcur', 'sparcur.export'], python_requires='>=3.6', tests_require=tests_require, install_requires=[ - 'augpathlib>=0.0.16', + 'augpathlib>=0.0.18', 'beautifulsoup4', 'blackfynn>=3.0.0', 'dicttoxml', - 'idlib', - 'ipython', + 'idlib>=0.0.1.dev4', + "ipython; python_version < '3.7'", 'jsonschema>=3.0.1', # need the draft 6 validator 'protcur>=0.0.5', 'pyontutils>=0.1.22', 'pysercomb>=0.0.5', - 'robobrowser', 'terminaltables', 'xlsx2csv', ], diff --git a/sparcur/__init__.py b/sparcur/__init__.py index a171cfed..c3f87c14 100644 --- a/sparcur/__init__.py +++ b/sparcur/__init__.py @@ -1 +1 @@ -__version__ = '0.0.1.dev0' +__version__ = '0.0.1.dev1' diff --git a/sparcur/backends.py b/sparcur/backends.py index 7f554b02..2bcde97e 100644 --- a/sparcur/backends.py +++ b/sparcur/backends.py @@ -597,6 +597,7 @@ def _rchildren(self, yield child yield from child._rchildren(create_cache=create_cache) elif isinstance(self.bfobject, Dataset): + sparse = sparse or self.cache.is_sparse() deleted = [] if sparse: filenames = self._sparse_stems @@ -680,7 +681,11 @@ def _rchildren(self, else: raise exc.UnhandledTypeError # TODO - def children_pull(self, existing_caches=tuple(), only=tuple(), skip=tuple(), sparse=tuple()): + def children_pull(self, + existing_caches=tuple(), + only=tuple(), + skip=tuple(), + sparse=tuple()): """ ONLY USE FOR organization level """ # FIXME this is really a recursive pull for organization level only ... sname = lambda gen: sorted(gen, key=lambda c: c.name) @@ -712,7 +717,10 @@ def refresh(c): if not self._debug: yield from (rc for d in Async(rate=self._async_rate)( - deferred(child.bootstrap)(recursive=True, only=only, skip=skip, sparse=sparse) + deferred(child.bootstrap)(recursive=True, + only=only, + skip=skip, + sparse=sparse) for child in sname(self.children) #if child.id in skipexisting # TODO when dataset's have a 'anything in me updated' diff --git a/sparcur/cli.py b/sparcur/cli.py index d3da69d5..f35b1a6d 100755 --- a/sparcur/cli.py +++ b/sparcur/cli.py @@ -221,7 +221,7 @@ from sparcur.utils import GetTimeNow from sparcur.utils import log, logd, SimpleFileHandler, bind_file_handler from sparcur.utils import python_identifier, want_prefixes, symlink_latest -from sparcur.paths import Path, BlackfynnCache, PathMeta, StashPath +from sparcur.paths import Path, BlackfynnCache, StashPath from sparcur.state import State from sparcur.derives import Derives as De from sparcur.backends import BlackfynnRemote @@ -521,7 +521,17 @@ def project_id(self): @property def datasets(self): - yield from self.anchor.children # ok to yield from cache now that it is the bridge + # XXX DO NOT YIELD DIRECTLY FROM self.anchor.children + # unless you are cloning or something like that + # because cache.children completely ignores existing + # files and folders and there really isn't a safe way + # to use it once files already exist because then + # viewing the cached children would move all the folders + # around, file under sigh, yes fix CachePath construction + for local in self.datasets_local: + yield local.cache + + #yield from self.anchor.children # NOT OK TO YIELD FROM THIS URG @property def datasets_remote(self): @@ -658,8 +668,49 @@ def clone(self): self.anchor = anchor self.project_path = self.anchor.local with anchor: - self.cwd = Path.cwd() # have to update self.cwd so pull sees the right thing - self.pull() + # update self.cwd so pull sees the right thing + self.cwd = Path.cwd() + self._clone_pull() + + def _clone_pull(self): + """ wow this really shows that the pull -> bootstrap workflow + is completely broken and stupidly slow """ + + # populate just the dataset folders + datasets = list(self.anchor.children) + # populate the dataset metadata + self.rmeta(use_cache_path=True, exist_ok=True) + + # mark sparse datasets so that we don't have to + # fiddle with detecting sparseness during bootstrap + sparse_limit = self.options.sparse_limit + [d._sparse_materialize(sparse_limit=sparse_limit) for d in datasets] + sparse = [d for d in datasets if d.is_sparse()] # sanity check + + + skip = auth.get_list('datasets-no') # FIXME this should be materialized as well + + # pull all the files + from joblib import Parallel, delayed + import logging + + def asdf(ca, d, + level='DEBUG' if self.options.verbose else 'INFO', + log_name=log.name): + log = logging.getLogger(log_name) + log.setLevel(level) + + rc = d._remote_class + if not hasattr(rc, '_cache_anchor'): + rc.anchorTo(ca) + + list(d.rchildren) + + + Parallel(n_jobs=12)(delayed(asdf)(self.anchor, d) + for d in datasets + # FIXME skip should be materialized in xattrs as well + if d.id not in skip) def _total_package_counts(self): from sparcur.datasources import BlackfynnDatasetData @@ -702,7 +753,6 @@ def pull(self): cwd = self.cwd skip = auth.get_list('datasets-no') sparse = self._sparse_from_metadata() - breakpoint() if self.project_path.parent.name == 'big': only = auth.get_list('datasets-sparse') else: @@ -1421,11 +1471,12 @@ def apinat(self): graph = agraph.graph() graph.write(path=path_out) - def rmeta(self): + def rmeta(self, use_cache_path=False, exist_ok=False): from pyontutils.utils import Async, deferred from sparcur.datasources import BlackfynnDatasetData - dsr = self.datasets_remote - prepared = [BlackfynnDatasetData(r) for r in dsr] + dsr = self.datasets if use_cache_path else self.datasets_remote + all_ = [BlackfynnDatasetData(r) for r in dsr] + prepared = [bdd for bdd in all_ if not (exist_ok and bdd.cache_path.exists())] hz = self.options.rate if not self.options.debug: blobs = Async(rate=hz)(deferred(d)() for d in prepared) @@ -1809,13 +1860,16 @@ def size(self, dirs=None, ext=None): if not intrs: intrs = self.cwdintr, - rows = [['path', 'id', 'dirs', 'files', 'size', 'hr'], - *sorted([[d.name, d.id, c['dirs'], c['files'], c['size'], c['size'].hr] + rows = [['path', 'id', 'sparse', 'dirs', 'files', 'size', 'hr'], + *sorted([[d.name, + d.id, + 'x' if d.path.cache.is_sparse() else '', + c['dirs'], c['files'], c['size'], c['size'].hr] for d in intrs for c in (d.datasetdata.counts,)], key=lambda r: -r[-2])] return self._print_table(rows, title='Size Report', - align=['l', 'l', 'r', 'r', 'r', 'r'], ext=ext) + align=['l', 'l', 'c', 'r', 'r', 'r', 'r'], ext=ext) def test(self, ext=None): rows = [['hello', 'world'], [1, 2]] @@ -2201,7 +2255,7 @@ def mismatch(self): paths = [p for p, *_ in oops] def sf(cmeta): - nmeta = PathMeta(id=cmeta.old_id) + nmeta = aug.PathMeta(id=cmeta.old_id) assert nmeta.id, f'No old_id for {pathmeta}' return nmeta diff --git a/sparcur/datasources.py b/sparcur/datasources.py index 0fba0afa..a8b59607 100644 --- a/sparcur/datasources.py +++ b/sparcur/datasources.py @@ -263,7 +263,7 @@ def __init__(self, remote_cache_or_id): if isinstance(remote_cache_or_id, aug.RemotePath): self._remote = remote_cache_or_id self._bfobject = self._remote.bfobject - self.id = remote.id + self.id = self._remote.id elif isinstance(remote_cache_or_id, aug.CachePath): self._c_cache_path = remote_cache_or_id self.id = self._c_cache_path.id diff --git a/sparcur/paths.py b/sparcur/paths.py index 09bb60c9..570d9c37 100644 --- a/sparcur/paths.py +++ b/sparcur/paths.py @@ -11,11 +11,9 @@ from dateutil import parser from augpathlib import PrimaryCache, EatCache, SqliteCache, SymlinkCache from augpathlib import RepoPath, LocalPath -from augpathlib import RemotePath # FIXME just for reimport from sparcur import backends from sparcur import exceptions as exc from sparcur.utils import log -from augpathlib import PathMeta def cleanup(func): @@ -42,6 +40,7 @@ class BlackfynnCache(PrimaryCache, EatCache): xattr_prefix = 'bf' _backup_cache = SqliteCache _not_exists_cache = SymlinkCache + cypher = hashlib.sha256 # set the remote hash cypher on the cache class uri_human = backends.BlackfynnRemote.uri_human @@ -128,6 +127,58 @@ def file_id(self): def cache_key(self): return f'{self.id}-{self.file_id}' + def _dataset_metadata(self, force_cache=False): + """ get metadata about a dataset from the remote metadata store """ + # TODO figure out the right default for force_cache + dataset = self.dataset + if dataset == self: + if not hasattr(self, '_c__dataset_metadata'): + bdd = BlackfynnDatasetData(self) + try: + blob = bdd.fromCache() + except FileNotFoundError as e: + # FIXME TODO also check cached rmeta dates during pull + if force_cache: + raise e + else: + log.warning(e) + blob = bdd() + + self._c__dataset_metadata = blob + + return self._c__dataset_metadata + + else: + return dataset._dataset_metadata() + + def _package_count(self): + if self.is_dataset(): + return sum(self._dataset_metadata()['package_counts'].values()) + else: + raise NotImplementedError('unused at the moment') + + def _sparse_materialize(self, *args, sparse_limit=None): + """ use data from the remote mark or clear datasets as sparse """ + if sparse_limit is None: + sparse_limit = auth.get('sparse-limit') # yay for yaml having int type + #sparse_limit = _sl if _sl is None else int_(sl) + + if self.is_dataset(): + package_count = self._package_count() + sparse_remote = (False + if sparse_limit is None else + package_count >= sparse_limit) + sparse_cache = self.is_sparse() + if sparse_remote: + if not sparse_cache: + self._mark_sparse() + elif sparse_cache: # strange case where number of packages decreases + self._clear_sparse() + + else: + msg = 'at the moment only datasets can be marked as sparse' + raise NotImplementedError(msg) + @property def data(self): """ get the 'cached' data which isn't really cached at the moment @@ -168,6 +219,9 @@ def data(self): yield from self.local_object_cache_path._data_setter(gen) self.local_object_cache_path.cache_init(self.meta) # FIXME self.meta be stale here?! + def _meta_is_root(self, meta): + return meta.id.startswith('N:organization:') + def _bootstrap_recursive(self, only=tuple(), skip=tuple(), sparse=False): """ only on the first call to this function should sparse be a tuple """ # bootstrap the rest if we told it to @@ -281,3 +335,6 @@ def remote(self): backends.BlackfynnRemote.cache_key = BlackfynnCache.cache_key backends.BlackfynnRemote._sparse_stems = BlackfynnCache._sparse_stems backends.BlackfynnRemote._sparse_include = BlackfynnCache._sparse_include + +# end imports (woo circular deps) +from sparcur.datasources import BlackfynnDatasetData diff --git a/test/common.py b/test/common.py index 8d1a9c4f..cc471cff 100644 --- a/test/common.py +++ b/test/common.py @@ -10,7 +10,7 @@ from sparcur import config from sparcur import exceptions as exc from sparcur.paths import Path -from sparcur.paths import LocalPath, PrimaryCache, RemotePath +from sparcur.paths import LocalPath, PrimaryCache from sparcur.paths import SymlinkCache from sparcur.state import State from sparcur.datasets import DatasetDescriptionFile