diff --git a/benchmark.py b/benchmark.py index 605fd6066..7b7fe8055 100755 --- a/benchmark.py +++ b/benchmark.py @@ -212,6 +212,8 @@ def run(key, timer, repeat=3, number=10): run("iterate_single_pass", Timer("list(project)", setup), number=1) + run("iterate_load_sp", Timer("[job.sp() for job in project]", setup), 3, 10) + run( "search_lean_filter", Timer( diff --git a/changelog.txt b/changelog.txt index 7af1f8851..f0691bc45 100644 --- a/changelog.txt +++ b/changelog.txt @@ -14,6 +14,7 @@ Changed +++++++ - Optimized job hash and equality checks (#442). + - State points are loaded lazily when ``Job`` is opened by id (#238, #239). [1.5.1] -- 2020-12-19 diff --git a/signac/contrib/job.py b/signac/contrib/job.py index 70331aeb7..2be6a58da 100644 --- a/signac/contrib/job.py +++ b/signac/contrib/job.py @@ -25,7 +25,7 @@ class _sp_save_hook: - """Hook to handle job migration when statepoints are changed. + """Hook to handle job migration when state points are changed. When a job's state point is changed, in addition to the contents of the file being modified this hook @@ -54,28 +54,29 @@ def save(self): class Job: """The job instance is a handle to the data of a unique state point. - Application developers should usually not need to directly - instantiate this class, but use :meth:`~signac.Project.open_job` - instead. + Application developers should not directly instantiate this class, but + use :meth:`~signac.Project.open_job` instead. + + Jobs can be opened by ``statepoint`` or ``_id``. If both values are + provided, it is the user's responsibility to ensure that the values + correspond. Parameters ---------- project : :class:`~signac.Project` Project handle. - statepoint : dict - State point for the job. - + State point for the job. (Default value = None) _id : str - A file-like object to write to. + The job identifier. (Default value = None) """ FN_MANIFEST = "signac_statepoint.json" """The job's manifest filename. - The job manifest, this means a human-readable dump of the job's - state point is stored in each workspace directory. + The job manifest is a human-readable file containing the job's state + point that is stored in each job's workspace directory. """ FN_DOCUMENT = "signac_job_document.json" @@ -84,12 +85,21 @@ class Job: KEY_DATA = "signac_data" "The job's datastore key." - def __init__(self, project, statepoint, _id=None): + def __init__(self, project, statepoint=None, _id=None): self._project = project - # Set statepoint and id - self._statepoint = SyncedAttrDict(statepoint, parent=_sp_save_hook(self)) - self._id = calc_id(self._statepoint()) if _id is None else _id + if statepoint is None and _id is None: + raise ValueError("Either statepoint or _id must be provided.") + + if statepoint is not None: + # Set state point if provided + self._statepoint = SyncedAttrDict(statepoint, parent=_sp_save_hook(self)) + else: + # State point will be loaded lazily + self._statepoint = None + + # Set id. The id is computed from the state point if not provided. + self._id = calc_id(self.statepoint()) if _id is None else _id # Prepare job working directory self._wd = os.path.join(project.workspace(), self._id) @@ -104,6 +114,10 @@ def __init__(self, project, statepoint, _id=None): # Prepare current working directory for context management self._cwd = [] + if statepoint is not None: + # Update the project's state point cache immediately if opened by state point + self._project._register(self) + @deprecated( deprecated_in="1.3", removed_in="2.0", @@ -147,7 +161,7 @@ def __str__(self): def __repr__(self): return "{}(project={}, statepoint={})".format( - self.__class__.__name__, repr(self._project), self._statepoint + self.__class__.__name__, repr(self._project), self.statepoint ) def workspace(self): @@ -206,7 +220,7 @@ def reset_statepoint(self, new_statepoint): else: raise # Update this instance - self._statepoint._data = dst._statepoint._data + self.statepoint._data = dst.statepoint._data self._id = dst._id self._wd = dst._wd self._fn_doc = dst._fn_doc @@ -265,6 +279,35 @@ def update_statepoint(self, update, overwrite=False): statepoint.update(update) self.reset_statepoint(statepoint) + def _read_manifest(self): + """Read and parse the manifest file, if it exists. + + Returns + ------- + manifest : dict + State point data. + + Raises + ------ + JobsCorruptedError + If an error occurs while parsing the state point manifest. + OSError + If an error occurs while reading the state point manifest. + + """ + fn_manifest = os.path.join(self._wd, self.FN_MANIFEST) + try: + with open(fn_manifest, "rb") as file: + manifest = json.loads(file.read().decode()) + except OSError as error: + if error.errno != errno.ENOENT: + raise error + except ValueError: + # This catches JSONDecodeError, a subclass of ValueError + raise JobsCorruptedError([self.id]) + else: + return manifest + @property def statepoint(self): """Get the job's state point. @@ -287,6 +330,14 @@ def statepoint(self): Returns the job's state point. """ + if self._statepoint is None: + # Load state point manifest lazily + statepoint = self._check_manifest() + self._statepoint = SyncedAttrDict(statepoint, parent=_sp_save_hook(self)) + + # Update the project's state point cache when loaded lazily + self._project._register(self) + return self._statepoint @statepoint.setter @@ -438,7 +489,7 @@ def _init(self, force=False): This method is called by :meth:`~.init` and is responsible for actually creating the job workspace directory and - writing out the statepoint file. + writing out the state point manifest file. Parameters ---------- @@ -469,7 +520,7 @@ def _init(self, force=False): try: # Prepare the data before file creation and writing - blob = json.dumps(self._statepoint, indent=2) + blob = json.dumps(self.statepoint, indent=2) try: # Open the file for writing only if it does not exist yet. @@ -488,14 +539,27 @@ def _init(self, force=False): else: self._check_manifest() + # Update the project's state point cache if the manifest is valid + self._project._register(self) + def _check_manifest(self): - """Check whether the manifest file exists and is correct.""" - fn_manifest = os.path.join(self._wd, self.FN_MANIFEST) - try: - with open(fn_manifest, "rb") as file: - assert calc_id(json.loads(file.read().decode())) == self.id - except (AssertionError, ValueError): + """Check whether the manifest file exists and is correct. + + Returns + ------- + manifest : dict + State point data. + + Raises + ------ + JobsCorruptedError + If the manifest hash is not equal to the job id. + + """ + manifest = self._read_manifest() + if calc_id(manifest) != self.id: raise JobsCorruptedError([self.id]) + return manifest def init(self, force=False): """Initialize the job's workspace directory. @@ -618,6 +682,9 @@ def move(self, project): raise error self.__dict__.update(dst.__dict__) + # Update the destination project's state point cache + project._register(self) + def sync(self, other, strategy=None, exclude=None, doc_sync=None, **kwargs): r"""Perform a one-way synchronization of this job with the other job. @@ -740,7 +807,7 @@ def __exit__(self, err_type, err_value, tb): def __setstate__(self, state): self.__dict__.update(state) - self._statepoint._parent.jobs.append(self) + self.statepoint._parent.jobs.append(self) def __deepcopy__(self, memo): cls = self.__class__ diff --git a/signac/contrib/project.py b/signac/contrib/project.py index eac2ce8d4..c731e8af2 100644 --- a/signac/contrib/project.py +++ b/signac/contrib/project.py @@ -231,6 +231,11 @@ def __init__(self, config=None, _ignore_schema_version=False): # Internal caches self._index_cache = {} + # Note that the state point cache is a superset of the jobs in the + # project, and its contents cannot be invalidated. The cached mapping + # of "id: statepoint" is valid even after a job has been removed, and + # can be used to re-open a job by id as long as that id remains in the + # cache. self._sp_cache = {} self._sp_cache_misses = 0 self._sp_cache_warned = False @@ -632,27 +637,32 @@ def open_job(self, statepoint=None, id=None): than one match. """ - if (id is None) == (statepoint is None): - raise ValueError("You need to either provide the state point or the id.") + if (statepoint is None) == (id is None): + raise ValueError("Either statepoint or id must be provided, but not both.") if id is None: - # second best case - job = self.Job(project=self, statepoint=statepoint) - if job.id not in self._sp_cache: - self._sp_cache[job.id] = job.statepoint() - return job - elif id in self._sp_cache: - # optimal case + # Second best case (Job will update self._sp_cache on init) + return self.Job(project=self, statepoint=statepoint) + try: + # Optimal case (id is in the state point cache) return self.Job(project=self, statepoint=self._sp_cache[id], _id=id) - else: - # worst case (no state point and cache miss) + except KeyError: + # Worst case (no statepoint and cache miss, Job will register + # itself in self._sp_cache on statepoint access) if len(id) < 32: + # Resolve partial job ids (first few characters) into a full job id job_ids = self._find_job_ids() matches = [_id for _id in job_ids if _id.startswith(id)] if len(matches) == 1: id = matches[0] elif len(matches) > 1: raise LookupError(id) - return self.Job(project=self, statepoint=self._get_statepoint(id), _id=id) + else: + # By elimination, len(matches) == 0 + raise KeyError(id) + elif not self._contains_job_id(id): + # id does not exist in the project data space + raise KeyError(id) + return self.Job(project=self, _id=id) def _job_dirs(self): """Generate ids of jobs in the workspace. @@ -702,6 +712,22 @@ def num_jobs(self): __len__ = num_jobs + def _contains_job_id(self, job_id): + """Determine whether a job id is in the project's data space. + + Parameters + ---------- + job_id : str + The job id to test for initialization. + + Returns + ------- + bool + True if the job id is initialized for this project. + + """ + return os.path.exists(os.path.join(self._wd, job_id)) + def __contains__(self, job): """Determine whether job is in the project's data space. @@ -716,7 +742,7 @@ def __contains__(self, job): True if the job is initialized for this project. """ - return os.path.exists(os.path.join(self._wd, job.id)) + return self._contains_job_id(job.id) @deprecated(deprecated_in="1.3", removed_in="2.0", current_version=__version__) def build_job_search_index(self, index, _trust=False): @@ -1232,31 +1258,41 @@ def _get_statepoint(self, job_id, fn=None): """ if not self._sp_cache: + # Triggers if no state points have been added to the cache, and all + # the values are None. self._read_cache() try: - if job_id in self._sp_cache: - return self._sp_cache[job_id] - else: - self._sp_cache_misses += 1 - if ( - not self._sp_cache_warned - and self._sp_cache_misses > self._sp_cache_miss_warning_threshold - ): - logger.debug( - "High number of state point cache misses. Consider " - "to update cache with the Project.update_cache() method." - ) - self._sp_cache_warned = True - statepoint = self._get_statepoint_from_workspace(job_id) - except KeyError as error: + # State point cache hit + return self._sp_cache[job_id] + except KeyError: + # State point cache missed + self._sp_cache_misses += 1 + if ( + not self._sp_cache_warned + and self._sp_cache_misses > self._sp_cache_miss_warning_threshold + ): + logger.debug( + "High number of state point cache misses. Consider " + "to update cache with the Project.update_cache() method." + ) + self._sp_cache_warned = True try: - statepoint = self.read_statepoints(fn=fn)[job_id] - except OSError as io_error: - if io_error.errno != errno.ENOENT: - raise io_error - else: - raise error - self._sp_cache[job_id] = statepoint + statepoint = self._get_statepoint_from_workspace(job_id) + # Update the project's state point cache from this cache miss + self._sp_cache[job_id] = statepoint + except KeyError as error: + # Fall back to a file containing all state points because the state + # point could not be read from the job workspace. + try: + statepoints = self.read_statepoints(fn=fn) + # Update the project's state point cache + self._sp_cache.update(statepoints) + statepoint = statepoints[job_id] + except OSError as io_error: + if io_error.errno != errno.ENOENT: + raise io_error + else: + raise error return statepoint @deprecated( @@ -1737,6 +1773,7 @@ def repair(self, fn_statepoints=None, index=None, job_ids=None): # Load internal cache from all available external sources. self._read_cache() try: + # Updates the state point cache from the provided file self._sp_cache.update(self.read_statepoints(fn=fn_statepoints)) except OSError as error: if error.errno != errno.ENOENT or fn_statepoints is not None: @@ -1894,8 +1931,9 @@ def update_cache(self): logger.info("Update cache...") start = time.time() cache = self._read_cache() + cached_ids = set(self._sp_cache) self._update_in_memory_cache() - if cache is None or set(cache) != set(self._sp_cache): + if cache is None or set(cache) != cached_ids: fn_cache = self.fn(self.FN_CACHE) fn_cache_tmp = fn_cache + "~" try: diff --git a/tests/test_project.py b/tests/test_project.py index ccc07baf4..5c8ef79ac 100644 --- a/tests/test_project.py +++ b/tests/test_project.py @@ -417,6 +417,10 @@ def test_open_job_by_id(self): finally: logging.disable(logging.NOTSET) + def test_open_job_no_id_or_statepoint(self): + with pytest.raises(ValueError): + self.project.open_job() + def test_open_job_by_abbreviated_id(self): statepoints = [{"a": i} for i in range(5)] [self.project.open_job(sp).init() for sp in statepoints] @@ -458,7 +462,9 @@ def test_corrupted_statepoint_file(self): try: logging.disable(logging.CRITICAL) with pytest.raises(JobsCorruptedError): - self.project.open_job(id=job.id) + # Accessing the job state point triggers validation of the + # state point manifest file + self.project.open_job(id=job.id).statepoint finally: logging.disable(logging.NOTSET) @@ -533,7 +539,9 @@ def test_repair_corrupted_workspace(self): # Iterating through the jobs should now result in an error. with pytest.raises(JobsCorruptedError): for job in self.project: - pass + # Accessing the job state point triggers validation of the + # state point manifest file + job.statepoint with pytest.raises(JobsCorruptedError): self.project.repair()