Skip to content

Commit

Permalink
Lazy statepoint loading (#239)
Browse files Browse the repository at this point in the history
* Refactor uses of job._statepoint to job.statepoint.

* Implement lazy loading of statepoints.

* Use cheaper condition for determining if the job id exists.

* Fix flake8.

* Validate that the state point manifest hash matches the job id when loading the state point lazily.

* Added changelog line.

* Add explicit benchmark for iterating and loading state point.

Since the new code path does not cover that path anymore.

* Use _sp_cache to cache list of known valid job ids.

* Deregister jobs from the statepoint cache when they are removed from the project.

* Fix issues with registration/deregistration.

* Update comment.

* Remove deregistration logic.

* Add comment explaining behavior of Project._sp_cache.

* Update docstrings and deprecated methods.

* Update changelog.

* Revise implementation details and docstrings.

* Update docstrings to use state point.

* Update comment.

* Update _register method.

* Use job.id instead of job._id and job_id instead of jobid (excluding cases that would change public APIs).

* Use descriptive variable names and comments.

* Use descriptive variable names.

* Improve cache registration behavior and comments.

* Remove extra blank lines.

* Improve cache filling behavior.

* Revert project._sp_cache so it is not pre-populated with None values. It doesn't provide a benefit to do so.

* Add test of error when opening without a state point or job id.

* improve wording of docstring of FN_MANIFEST

* Fix documentation issues.

* Update docstrings and comments.

Co-authored-by: Carl Simon Adorf <[email protected]>
Co-authored-by: Corwin Kerr <[email protected]>
  • Loading branch information
3 people authored Dec 30, 2020
1 parent c1cf74e commit fd14f48
Show file tree
Hide file tree
Showing 5 changed files with 179 additions and 63 deletions.
2 changes: 2 additions & 0 deletions benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,8 @@ def run(key, timer, repeat=3, number=10):

run("iterate_single_pass", Timer("list(project)", setup), number=1)

run("iterate_load_sp", Timer("[job.sp() for job in project]", setup), 3, 10)

run(
"search_lean_filter",
Timer(
Expand Down
1 change: 1 addition & 0 deletions changelog.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Changed
+++++++

- Optimized job hash and equality checks (#442).
- State points are loaded lazily when ``Job`` is opened by id (#238, #239).


[1.5.1] -- 2020-12-19
Expand Down
117 changes: 92 additions & 25 deletions signac/contrib/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@


class _sp_save_hook:
"""Hook to handle job migration when statepoints are changed.
"""Hook to handle job migration when state points are changed.
When a job's state point is changed, in addition
to the contents of the file being modified this hook
Expand Down Expand Up @@ -54,28 +54,29 @@ def save(self):
class Job:
"""The job instance is a handle to the data of a unique state point.
Application developers should usually not need to directly
instantiate this class, but use :meth:`~signac.Project.open_job`
instead.
Application developers should not directly instantiate this class, but
use :meth:`~signac.Project.open_job` instead.
Jobs can be opened by ``statepoint`` or ``_id``. If both values are
provided, it is the user's responsibility to ensure that the values
correspond.
Parameters
----------
project : :class:`~signac.Project`
Project handle.
statepoint : dict
State point for the job.
State point for the job. (Default value = None)
_id : str
A file-like object to write to.
The job identifier. (Default value = None)
"""

FN_MANIFEST = "signac_statepoint.json"
"""The job's manifest filename.
The job manifest, this means a human-readable dump of the job's
state point is stored in each workspace directory.
The job manifest is a human-readable file containing the job's state
point that is stored in each job's workspace directory.
"""

FN_DOCUMENT = "signac_job_document.json"
Expand All @@ -84,12 +85,21 @@ class Job:
KEY_DATA = "signac_data"
"The job's datastore key."

def __init__(self, project, statepoint, _id=None):
def __init__(self, project, statepoint=None, _id=None):
self._project = project

# Set statepoint and id
self._statepoint = SyncedAttrDict(statepoint, parent=_sp_save_hook(self))
self._id = calc_id(self._statepoint()) if _id is None else _id
if statepoint is None and _id is None:
raise ValueError("Either statepoint or _id must be provided.")

if statepoint is not None:
# Set state point if provided
self._statepoint = SyncedAttrDict(statepoint, parent=_sp_save_hook(self))
else:
# State point will be loaded lazily
self._statepoint = None

# Set id. The id is computed from the state point if not provided.
self._id = calc_id(self.statepoint()) if _id is None else _id

# Prepare job working directory
self._wd = os.path.join(project.workspace(), self._id)
Expand All @@ -104,6 +114,10 @@ def __init__(self, project, statepoint, _id=None):
# Prepare current working directory for context management
self._cwd = []

if statepoint is not None:
# Update the project's state point cache immediately if opened by state point
self._project._register(self)

@deprecated(
deprecated_in="1.3",
removed_in="2.0",
Expand Down Expand Up @@ -147,7 +161,7 @@ def __str__(self):

def __repr__(self):
return "{}(project={}, statepoint={})".format(
self.__class__.__name__, repr(self._project), self._statepoint
self.__class__.__name__, repr(self._project), self.statepoint
)

def workspace(self):
Expand Down Expand Up @@ -206,7 +220,7 @@ def reset_statepoint(self, new_statepoint):
else:
raise
# Update this instance
self._statepoint._data = dst._statepoint._data
self.statepoint._data = dst.statepoint._data
self._id = dst._id
self._wd = dst._wd
self._fn_doc = dst._fn_doc
Expand Down Expand Up @@ -265,6 +279,35 @@ def update_statepoint(self, update, overwrite=False):
statepoint.update(update)
self.reset_statepoint(statepoint)

def _read_manifest(self):
"""Read and parse the manifest file, if it exists.
Returns
-------
manifest : dict
State point data.
Raises
------
JobsCorruptedError
If an error occurs while parsing the state point manifest.
OSError
If an error occurs while reading the state point manifest.
"""
fn_manifest = os.path.join(self._wd, self.FN_MANIFEST)
try:
with open(fn_manifest, "rb") as file:
manifest = json.loads(file.read().decode())
except OSError as error:
if error.errno != errno.ENOENT:
raise error
except ValueError:
# This catches JSONDecodeError, a subclass of ValueError
raise JobsCorruptedError([self.id])
else:
return manifest

@property
def statepoint(self):
"""Get the job's state point.
Expand All @@ -287,6 +330,14 @@ def statepoint(self):
Returns the job's state point.
"""
if self._statepoint is None:
# Load state point manifest lazily
statepoint = self._check_manifest()
self._statepoint = SyncedAttrDict(statepoint, parent=_sp_save_hook(self))

# Update the project's state point cache when loaded lazily
self._project._register(self)

return self._statepoint

@statepoint.setter
Expand Down Expand Up @@ -438,7 +489,7 @@ def _init(self, force=False):
This method is called by :meth:`~.init` and is responsible
for actually creating the job workspace directory and
writing out the statepoint file.
writing out the state point manifest file.
Parameters
----------
Expand Down Expand Up @@ -469,7 +520,7 @@ def _init(self, force=False):

try:
# Prepare the data before file creation and writing
blob = json.dumps(self._statepoint, indent=2)
blob = json.dumps(self.statepoint, indent=2)

try:
# Open the file for writing only if it does not exist yet.
Expand All @@ -488,14 +539,27 @@ def _init(self, force=False):
else:
self._check_manifest()

# Update the project's state point cache if the manifest is valid
self._project._register(self)

def _check_manifest(self):
"""Check whether the manifest file exists and is correct."""
fn_manifest = os.path.join(self._wd, self.FN_MANIFEST)
try:
with open(fn_manifest, "rb") as file:
assert calc_id(json.loads(file.read().decode())) == self.id
except (AssertionError, ValueError):
"""Check whether the manifest file exists and is correct.
Returns
-------
manifest : dict
State point data.
Raises
------
JobsCorruptedError
If the manifest hash is not equal to the job id.
"""
manifest = self._read_manifest()
if calc_id(manifest) != self.id:
raise JobsCorruptedError([self.id])
return manifest

def init(self, force=False):
"""Initialize the job's workspace directory.
Expand Down Expand Up @@ -618,6 +682,9 @@ def move(self, project):
raise error
self.__dict__.update(dst.__dict__)

# Update the destination project's state point cache
project._register(self)

def sync(self, other, strategy=None, exclude=None, doc_sync=None, **kwargs):
r"""Perform a one-way synchronization of this job with the other job.
Expand Down Expand Up @@ -740,7 +807,7 @@ def __exit__(self, err_type, err_value, tb):

def __setstate__(self, state):
self.__dict__.update(state)
self._statepoint._parent.jobs.append(self)
self.statepoint._parent.jobs.append(self)

def __deepcopy__(self, memo):
cls = self.__class__
Expand Down
Loading

0 comments on commit fd14f48

Please sign in to comment.