Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lazy statepoint loading #239

Merged
merged 37 commits into from
Dec 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
8b14b15
Refactor uses of job._statepoint to job.statepoint.
bdice Oct 31, 2019
24d3b8c
Implement lazy loading of statepoints.
bdice Oct 31, 2019
6c57f11
Use cheaper condition for determining if the job id exists.
bdice Oct 31, 2019
f4a31e8
Fix flake8.
bdice Oct 31, 2019
737170a
Validate that the state point manifest hash matches the job id when l…
bdice Nov 1, 2019
82683b7
Merge branch 'master' into feature/lazy-statepoint-loading
bdice Nov 16, 2019
a40c22a
Added changelog line.
bdice Nov 18, 2019
51b6c26
Add explicit benchmark for iterating and loading state point.
csadorf Nov 18, 2019
9bcceaf
Merge branch 'master' into feature/lazy-statepoint-loading
bdice Nov 19, 2019
c51b413
Use _sp_cache to cache list of known valid job ids.
bdice Nov 19, 2019
9824ec7
Merge branch 'feature/lazy-statepoint-loading' of https://github.com/…
bdice Nov 19, 2019
67ba8da
Deregister jobs from the statepoint cache when they are removed from …
bdice Nov 19, 2019
be044f0
Fix issues with registration/deregistration.
bdice Nov 19, 2019
88c5369
Merge remote-tracking branch 'origin/master' into feature/lazy-statep…
bdice Dec 22, 2020
5c4e284
Update comment.
bdice Dec 28, 2020
bf9108e
Merge remote-tracking branch 'origin/master' into feature/lazy-statep…
bdice Dec 28, 2020
52643e0
Remove deregistration logic.
bdice Dec 28, 2020
6c5dcd2
Add comment explaining behavior of Project._sp_cache.
bdice Dec 28, 2020
0e7d6c3
Update docstrings and deprecated methods.
bdice Dec 28, 2020
10926f9
Update changelog.
bdice Dec 28, 2020
ce5ce6e
Revise implementation details and docstrings.
bdice Dec 28, 2020
0b65502
Update docstrings to use state point.
bdice Dec 28, 2020
206928c
Update comment.
bdice Dec 28, 2020
80acc1b
Update _register method.
bdice Dec 28, 2020
3f93f14
Use job.id instead of job._id and job_id instead of jobid (excluding …
bdice Dec 28, 2020
5304fff
Use descriptive variable names and comments.
bdice Dec 28, 2020
ebad980
Use descriptive variable names.
bdice Dec 29, 2020
eed20a4
Improve cache registration behavior and comments.
bdice Dec 29, 2020
76b5933
Remove extra blank lines.
bdice Dec 29, 2020
baf36e5
Merge branch 'master' into feature/lazy-statepoint-loading
bdice Dec 29, 2020
b029e6c
Improve cache filling behavior.
bdice Dec 29, 2020
9a12405
Revert project._sp_cache so it is not pre-populated with None values.…
bdice Dec 29, 2020
6494150
Add test of error when opening without a state point or job id.
bdice Dec 29, 2020
347b99e
Merge branch 'master' into feature/lazy-statepoint-loading
bdice Dec 29, 2020
7ca5d97
improve wording of docstring of FN_MANIFEST
cbkerr Dec 30, 2020
a8815d2
Fix documentation issues.
bdice Dec 30, 2020
ecdc7dc
Update docstrings and comments.
bdice Dec 30, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,8 @@ def run(key, timer, repeat=3, number=10):

run("iterate_single_pass", Timer("list(project)", setup), number=1)

run("iterate_load_sp", Timer("[job.sp() for job in project]", setup), 3, 10)

run(
"search_lean_filter",
Timer(
Expand Down
1 change: 1 addition & 0 deletions changelog.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Changed
+++++++

- Optimized job hash and equality checks (#442).
- State points are loaded lazily when ``Job`` is opened by id (#238, #239).


[1.5.1] -- 2020-12-19
Expand Down
117 changes: 92 additions & 25 deletions signac/contrib/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@


class _sp_save_hook:
"""Hook to handle job migration when statepoints are changed.
"""Hook to handle job migration when state points are changed.

When a job's state point is changed, in addition
to the contents of the file being modified this hook
Expand Down Expand Up @@ -54,28 +54,29 @@ def save(self):
class Job:
"""The job instance is a handle to the data of a unique state point.

Application developers should usually not need to directly
instantiate this class, but use :meth:`~signac.Project.open_job`
instead.
Application developers should not directly instantiate this class, but
use :meth:`~signac.Project.open_job` instead.

Jobs can be opened by ``statepoint`` or ``_id``. If both values are
provided, it is the user's responsibility to ensure that the values
correspond.

Parameters
----------
project : :class:`~signac.Project`
Project handle.

statepoint : dict
State point for the job.

State point for the job. (Default value = None)
_id : str
A file-like object to write to.
The job identifier. (Default value = None)

"""

FN_MANIFEST = "signac_statepoint.json"
"""The job's manifest filename.

The job manifest, this means a human-readable dump of the job's
state point is stored in each workspace directory.
The job manifest is a human-readable file containing the job's state
point that is stored in each job's workspace directory.
"""

FN_DOCUMENT = "signac_job_document.json"
Expand All @@ -84,12 +85,21 @@ class Job:
KEY_DATA = "signac_data"
"The job's datastore key."

def __init__(self, project, statepoint, _id=None):
def __init__(self, project, statepoint=None, _id=None):
self._project = project

# Set statepoint and id
self._statepoint = SyncedAttrDict(statepoint, parent=_sp_save_hook(self))
self._id = calc_id(self._statepoint()) if _id is None else _id
if statepoint is None and _id is None:
raise ValueError("Either statepoint or _id must be provided.")
bdice marked this conversation as resolved.
Show resolved Hide resolved

if statepoint is not None:
# Set state point if provided
self._statepoint = SyncedAttrDict(statepoint, parent=_sp_save_hook(self))
else:
# State point will be loaded lazily
self._statepoint = None

# Set id. The id is computed from the state point if not provided.
self._id = calc_id(self.statepoint()) if _id is None else _id

# Prepare job working directory
self._wd = os.path.join(project.workspace(), self._id)
Expand All @@ -104,6 +114,10 @@ def __init__(self, project, statepoint, _id=None):
# Prepare current working directory for context management
self._cwd = []

if statepoint is not None:
# Update the project's state point cache immediately if opened by state point
self._project._register(self)
bdice marked this conversation as resolved.
Show resolved Hide resolved

@deprecated(
deprecated_in="1.3",
removed_in="2.0",
Expand Down Expand Up @@ -147,7 +161,7 @@ def __str__(self):

def __repr__(self):
return "{}(project={}, statepoint={})".format(
self.__class__.__name__, repr(self._project), self._statepoint
self.__class__.__name__, repr(self._project), self.statepoint
)

def workspace(self):
Expand Down Expand Up @@ -206,7 +220,7 @@ def reset_statepoint(self, new_statepoint):
else:
raise
# Update this instance
self._statepoint._data = dst._statepoint._data
self.statepoint._data = dst.statepoint._data
self._id = dst._id
self._wd = dst._wd
self._fn_doc = dst._fn_doc
Expand Down Expand Up @@ -265,6 +279,35 @@ def update_statepoint(self, update, overwrite=False):
statepoint.update(update)
self.reset_statepoint(statepoint)

def _read_manifest(self):
"""Read and parse the manifest file, if it exists.

Returns
-------
manifest : dict
State point data.

Raises
------
JobsCorruptedError
If an error occurs while parsing the state point manifest.
OSError
If an error occurs while reading the state point manifest.

"""
fn_manifest = os.path.join(self._wd, self.FN_MANIFEST)
try:
with open(fn_manifest, "rb") as file:
manifest = json.loads(file.read().decode())
except OSError as error:
if error.errno != errno.ENOENT:
raise error
klywang marked this conversation as resolved.
Show resolved Hide resolved
except ValueError:
# This catches JSONDecodeError, a subclass of ValueError
raise JobsCorruptedError([self.id])
else:
return manifest

@property
def statepoint(self):
"""Get the job's state point.
Expand All @@ -287,6 +330,14 @@ def statepoint(self):
Returns the job's state point.

"""
if self._statepoint is None:
# Load state point manifest lazily
statepoint = self._check_manifest()
self._statepoint = SyncedAttrDict(statepoint, parent=_sp_save_hook(self))

# Update the project's state point cache when loaded lazily
self._project._register(self)

return self._statepoint

@statepoint.setter
Expand Down Expand Up @@ -438,7 +489,7 @@ def _init(self, force=False):

This method is called by :meth:`~.init` and is responsible
for actually creating the job workspace directory and
writing out the statepoint file.
writing out the state point manifest file.

Parameters
----------
Expand Down Expand Up @@ -469,7 +520,7 @@ def _init(self, force=False):

try:
# Prepare the data before file creation and writing
blob = json.dumps(self._statepoint, indent=2)
blob = json.dumps(self.statepoint, indent=2)

try:
# Open the file for writing only if it does not exist yet.
Expand All @@ -488,14 +539,27 @@ def _init(self, force=False):
else:
self._check_manifest()

# Update the project's state point cache if the manifest is valid
self._project._register(self)

def _check_manifest(self):
"""Check whether the manifest file exists and is correct."""
fn_manifest = os.path.join(self._wd, self.FN_MANIFEST)
try:
with open(fn_manifest, "rb") as file:
assert calc_id(json.loads(file.read().decode())) == self.id
except (AssertionError, ValueError):
"""Check whether the manifest file exists and is correct.

Returns
-------
manifest : dict
State point data.

Raises
------
JobsCorruptedError
If the manifest hash is not equal to the job id.

"""
manifest = self._read_manifest()
if calc_id(manifest) != self.id:
raise JobsCorruptedError([self.id])
return manifest

def init(self, force=False):
"""Initialize the job's workspace directory.
Expand Down Expand Up @@ -618,6 +682,9 @@ def move(self, project):
raise error
self.__dict__.update(dst.__dict__)

# Update the destination project's state point cache
project._register(self)

def sync(self, other, strategy=None, exclude=None, doc_sync=None, **kwargs):
r"""Perform a one-way synchronization of this job with the other job.

Expand Down Expand Up @@ -740,7 +807,7 @@ def __exit__(self, err_type, err_value, tb):

def __setstate__(self, state):
self.__dict__.update(state)
self._statepoint._parent.jobs.append(self)
self.statepoint._parent.jobs.append(self)

def __deepcopy__(self, memo):
cls = self.__class__
Expand Down
Loading