Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lazy statepoint loading #239

Merged
merged 37 commits into from
Dec 30, 2020
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
8b14b15
Refactor uses of job._statepoint to job.statepoint.
bdice Oct 31, 2019
24d3b8c
Implement lazy loading of statepoints.
bdice Oct 31, 2019
6c57f11
Use cheaper condition for determining if the job id exists.
bdice Oct 31, 2019
f4a31e8
Fix flake8.
bdice Oct 31, 2019
737170a
Validate that the state point manifest hash matches the job id when l…
bdice Nov 1, 2019
82683b7
Merge branch 'master' into feature/lazy-statepoint-loading
bdice Nov 16, 2019
a40c22a
Added changelog line.
bdice Nov 18, 2019
51b6c26
Add explicit benchmark for iterating and loading state point.
csadorf Nov 18, 2019
9bcceaf
Merge branch 'master' into feature/lazy-statepoint-loading
bdice Nov 19, 2019
c51b413
Use _sp_cache to cache list of known valid job ids.
bdice Nov 19, 2019
9824ec7
Merge branch 'feature/lazy-statepoint-loading' of https://github.com/…
bdice Nov 19, 2019
67ba8da
Deregister jobs from the statepoint cache when they are removed from …
bdice Nov 19, 2019
be044f0
Fix issues with registration/deregistration.
bdice Nov 19, 2019
88c5369
Merge remote-tracking branch 'origin/master' into feature/lazy-statep…
bdice Dec 22, 2020
5c4e284
Update comment.
bdice Dec 28, 2020
bf9108e
Merge remote-tracking branch 'origin/master' into feature/lazy-statep…
bdice Dec 28, 2020
52643e0
Remove deregistration logic.
bdice Dec 28, 2020
6c5dcd2
Add comment explaining behavior of Project._sp_cache.
bdice Dec 28, 2020
0e7d6c3
Update docstrings and deprecated methods.
bdice Dec 28, 2020
10926f9
Update changelog.
bdice Dec 28, 2020
ce5ce6e
Revise implementation details and docstrings.
bdice Dec 28, 2020
0b65502
Update docstrings to use state point.
bdice Dec 28, 2020
206928c
Update comment.
bdice Dec 28, 2020
80acc1b
Update _register method.
bdice Dec 28, 2020
3f93f14
Use job.id instead of job._id and job_id instead of jobid (excluding …
bdice Dec 28, 2020
5304fff
Use descriptive variable names and comments.
bdice Dec 28, 2020
ebad980
Use descriptive variable names.
bdice Dec 29, 2020
eed20a4
Improve cache registration behavior and comments.
bdice Dec 29, 2020
76b5933
Remove extra blank lines.
bdice Dec 29, 2020
baf36e5
Merge branch 'master' into feature/lazy-statepoint-loading
bdice Dec 29, 2020
b029e6c
Improve cache filling behavior.
bdice Dec 29, 2020
9a12405
Revert project._sp_cache so it is not pre-populated with None values.…
bdice Dec 29, 2020
6494150
Add test of error when opening without a state point or job id.
bdice Dec 29, 2020
347b99e
Merge branch 'master' into feature/lazy-statepoint-loading
bdice Dec 29, 2020
7ca5d97
improve wording of docstring of FN_MANIFEST
cbkerr Dec 30, 2020
a8815d2
Fix documentation issues.
bdice Dec 30, 2020
ecdc7dc
Update docstrings and comments.
bdice Dec 30, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,8 @@ def run(key, timer, repeat=3, number=10):

run("iterate_single_pass", Timer("list(project)", setup), number=1)

run("iterate_load_sp", Timer("[job.sp() for job in project]", setup), 3, 10)

run(
"search_lean_filter",
Timer(
Expand Down
1 change: 1 addition & 0 deletions changelog.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Changed
+++++++

- Optimized job hash and equality checks (#442).
- State points are loaded lazily when ``Job`` is opened by id (#238, #239).


[1.5.1] -- 2020-12-19
Expand Down
102 changes: 84 additions & 18 deletions signac/contrib/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@


class _sp_save_hook:
"""Hook to handle job migration when statepoints are changed.
"""Hook to handle job migration when state points are changed.

When a job's state point is changed, in addition
to the contents of the file being modified this hook
Expand Down Expand Up @@ -74,8 +74,8 @@ class Job:
FN_MANIFEST = "signac_statepoint.json"
"""The job's manifest filename.

The job manifest, this means a human-readable dump of the job's
state point is stored in each workspace directory.
The job manifest is a human-readable file containing the job's state
point, and is stored in each job's workspace directory.
"""

FN_DOCUMENT = "signac_job_document.json"
Expand All @@ -84,12 +84,21 @@ class Job:
KEY_DATA = "signac_data"
"The job's datastore key."

def __init__(self, project, statepoint, _id=None):
def __init__(self, project, statepoint=None, _id=None):
self._project = project

# Set statepoint and id
self._statepoint = SyncedAttrDict(statepoint, parent=_sp_save_hook(self))
self._id = calc_id(self._statepoint()) if _id is None else _id
if statepoint is None and _id is None:
raise ValueError("Either statepoint or _id must be provided.")
bdice marked this conversation as resolved.
Show resolved Hide resolved

if statepoint is not None:
# Set state point if provided
self._statepoint = SyncedAttrDict(statepoint, parent=_sp_save_hook(self))
else:
# State point will be loaded lazily
self._statepoint = None

# Set id. The id is computed from the state point if not provided.
self._id = calc_id(self.statepoint()) if _id is None else _id

# Prepare job working directory
self._wd = os.path.join(project.workspace(), self._id)
Expand All @@ -104,6 +113,10 @@ def __init__(self, project, statepoint, _id=None):
# Prepare current working directory for context management
self._cwd = []

if statepoint is not None:
# Update the project's state point cache immediately if opened by state point
self._project._register(self)
bdice marked this conversation as resolved.
Show resolved Hide resolved

@deprecated(
deprecated_in="1.3",
removed_in="2.0",
Expand Down Expand Up @@ -147,7 +160,7 @@ def __str__(self):

def __repr__(self):
return "{}(project={}, statepoint={})".format(
self.__class__.__name__, repr(self._project), self._statepoint
self.__class__.__name__, repr(self._project), self.statepoint
)

def workspace(self):
Expand Down Expand Up @@ -206,7 +219,7 @@ def reset_statepoint(self, new_statepoint):
else:
raise
# Update this instance
self._statepoint._data = dst._statepoint._data
self.statepoint._data = dst.statepoint._data
self._id = dst._id
self._wd = dst._wd
self._fn_doc = dst._fn_doc
Expand Down Expand Up @@ -265,6 +278,34 @@ def update_statepoint(self, update, overwrite=False):
statepoint.update(update)
self.reset_statepoint(statepoint)

def _read_manifest(self):
"""Read and parse the manifest file, if it exists.

Returns
-------
manifest : dict
State point data.

Raises
------
JobsCorruptedError
If an error occurs while reading/parsing the state point manifest.
OSError
If an error occurs while reading/parsing the state point manifest.

"""
fn_manifest = os.path.join(self._wd, self.FN_MANIFEST)
try:
with open(fn_manifest, "rb") as file:
manifest = json.loads(file.read().decode())
except OSError as error:
if error.errno != errno.ENOENT:
raise error
klywang marked this conversation as resolved.
Show resolved Hide resolved
except ValueError:
raise JobsCorruptedError([self.id])
else:
return manifest

@property
def statepoint(self):
"""Get the job's state point.
Expand All @@ -287,6 +328,14 @@ def statepoint(self):
Returns the job's state point.

"""
if self._statepoint is None:
# Load state point manifest lazily
statepoint = self._check_manifest()
self._statepoint = SyncedAttrDict(statepoint, parent=_sp_save_hook(self))

# Update the project's state point cache when loaded lazily
self._project._register(self)

return self._statepoint

@statepoint.setter
Expand Down Expand Up @@ -438,7 +487,7 @@ def _init(self, force=False):

This method is called by :meth:`~.init` and is responsible
for actually creating the job workspace directory and
writing out the statepoint file.
writing out the state point manifest file.

Parameters
----------
Expand Down Expand Up @@ -469,7 +518,7 @@ def _init(self, force=False):

try:
# Prepare the data before file creation and writing
blob = json.dumps(self._statepoint, indent=2)
blob = json.dumps(self.statepoint, indent=2)

try:
# Open the file for writing only if it does not exist yet.
Expand All @@ -488,14 +537,28 @@ def _init(self, force=False):
else:
self._check_manifest()

# Update the project's state point cache if the manifest is valid
self._project._register(self)

def _check_manifest(self):
"""Check whether the manifest file exists and is correct."""
fn_manifest = os.path.join(self._wd, self.FN_MANIFEST)
try:
with open(fn_manifest, "rb") as file:
assert calc_id(json.loads(file.read().decode())) == self.id
except (AssertionError, ValueError):
"""Check whether the manifest file exists and is correct.

Returns
-------
manifest : dict
State point data.

Raises
------
JobsCorruptedError
If the manifest hash is not equal to the job id, or if an error
occurs while reading/parsing the state point manifest.

"""
manifest = self._read_manifest()
if calc_id(manifest) != self.id:
raise JobsCorruptedError([self.id])
return manifest

def init(self, force=False):
"""Initialize the job's workspace directory.
Expand Down Expand Up @@ -618,6 +681,9 @@ def move(self, project):
raise error
self.__dict__.update(dst.__dict__)

# Update the destination project's state point cache
project._register(self)

def sync(self, other, strategy=None, exclude=None, doc_sync=None, **kwargs):
r"""Perform a one-way synchronization of this job with the other job.

Expand Down Expand Up @@ -740,7 +806,7 @@ def __exit__(self, err_type, err_value, tb):

def __setstate__(self, state):
self.__dict__.update(state)
self._statepoint._parent.jobs.append(self)
self.statepoint._parent.jobs.append(self)

def __deepcopy__(self, memo):
cls = self.__class__
Expand Down
110 changes: 74 additions & 36 deletions signac/contrib/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,11 @@ def __init__(self, config=None, _ignore_schema_version=False):

# Internal caches
self._index_cache = {}
# Note that the state point cache is a superset of the jobs in the
# project, and its contents cannot be invalidated. The cached mapping
# of "id: statepoint" is valid even after a job has been removed, and
# can be used to re-open a job by id as long as that id remains in the
# cache.
self._sp_cache = {}
self._sp_cache_misses = 0
self._sp_cache_warned = False
Expand Down Expand Up @@ -632,27 +637,32 @@ def open_job(self, statepoint=None, id=None):
than one match.

"""
if (id is None) == (statepoint is None):
raise ValueError("You need to either provide the state point or the id.")
if (statepoint is None) == (id is None):
raise ValueError("Either statepoint or id must be provided, but not both.")
if id is None:
# second best case
job = self.Job(project=self, statepoint=statepoint)
if job.id not in self._sp_cache:
self._sp_cache[job.id] = job.statepoint()
return job
elif id in self._sp_cache:
# optimal case
# Second best case (Job will update self._sp_cache on init)
return self.Job(project=self, statepoint=statepoint)
try:
# Optimal case (id is in the state point cache)
return self.Job(project=self, statepoint=self._sp_cache[id], _id=id)
else:
# worst case (no state point and cache miss)
except KeyError:
# Worst case (no statepoint and cache miss, Job will register
# itself in self._sp_cache on statepoint access)
if len(id) < 32:
# Resolve partial job ids (first few characters) into a full job id
job_ids = self._find_job_ids()
matches = [_id for _id in job_ids if _id.startswith(id)]
if len(matches) == 1:
id = matches[0]
elif len(matches) > 1:
raise LookupError(id)
return self.Job(project=self, statepoint=self._get_statepoint(id), _id=id)
else:
# By elimination, len(matches) == 0
raise KeyError(id)
elif not self._contains_job_id(id):
# id does not exist in the project data space
raise KeyError(id)
return self.Job(project=self, _id=id)

def _job_dirs(self):
"""Generate ids of jobs in the workspace.
Expand Down Expand Up @@ -702,6 +712,22 @@ def num_jobs(self):

__len__ = num_jobs

def _contains_job_id(self, job_id):
"""Determine whether a job id is in the project's data space.

Parameters
----------
job_id : str
The job id to test for initialization.

Returns
-------
bool
True if the job id is initialized for this project.

"""
return os.path.exists(os.path.join(self._wd, job_id))

def __contains__(self, job):
"""Determine whether job is in the project's data space.

Expand All @@ -716,7 +742,7 @@ def __contains__(self, job):
True if the job is initialized for this project.

"""
return os.path.exists(os.path.join(self._wd, job.id))
return self._contains_job_id(job.id)

@deprecated(deprecated_in="1.3", removed_in="2.0", current_version=__version__)
def build_job_search_index(self, index, _trust=False):
Expand Down Expand Up @@ -1232,31 +1258,41 @@ def _get_statepoint(self, job_id, fn=None):

"""
if not self._sp_cache:
# Triggers if no state points have been added to the cache, and all
# the values are None.
self._read_cache()
try:
if job_id in self._sp_cache:
return self._sp_cache[job_id]
else:
self._sp_cache_misses += 1
if (
not self._sp_cache_warned
and self._sp_cache_misses > self._sp_cache_miss_warning_threshold
):
logger.debug(
"High number of state point cache misses. Consider "
"to update cache with the Project.update_cache() method."
)
self._sp_cache_warned = True
statepoint = self._get_statepoint_from_workspace(job_id)
except KeyError as error:
# State point cache hit
return self._sp_cache[job_id]
except KeyError:
# State point cache missed
self._sp_cache_misses += 1
if (
not self._sp_cache_warned
and self._sp_cache_misses > self._sp_cache_miss_warning_threshold
):
logger.debug(
klywang marked this conversation as resolved.
Show resolved Hide resolved
"High number of state point cache misses. Consider "
"to update cache with the Project.update_cache() method."
)
self._sp_cache_warned = True
try:
statepoint = self.read_statepoints(fn=fn)[job_id]
except OSError as io_error:
if io_error.errno != errno.ENOENT:
raise io_error
else:
raise error
self._sp_cache[job_id] = statepoint
statepoint = self._get_statepoint_from_workspace(job_id)
# Update the project's state point cache from this cache miss
self._sp_cache[job_id] = statepoint
except KeyError as error:
# Fall back to a file containing all state points because the state
# point could not be read from the job workspace.
try:
statepoints = self.read_statepoints(fn=fn)
# Update the project's state point cache
self._sp_cache.update(statepoints)
statepoint = statepoints[job_id]
except OSError as io_error:
if io_error.errno != errno.ENOENT:
raise io_error
else:
raise error
return statepoint

@deprecated(
Expand Down Expand Up @@ -1737,6 +1773,7 @@ def repair(self, fn_statepoints=None, index=None, job_ids=None):
# Load internal cache from all available external sources.
self._read_cache()
try:
# Updates the state point cache from the provided file
self._sp_cache.update(self.read_statepoints(fn=fn_statepoints))
except OSError as error:
if error.errno != errno.ENOENT or fn_statepoints is not None:
Expand Down Expand Up @@ -1894,8 +1931,9 @@ def update_cache(self):
logger.info("Update cache...")
start = time.time()
cache = self._read_cache()
cached_ids = set(self._sp_cache)
self._update_in_memory_cache()
if cache is None or set(cache) != set(self._sp_cache):
if cache is None or set(cache) != cached_ids:
fn_cache = self.fn(self.FN_CACHE)
fn_cache_tmp = fn_cache + "~"
try:
Expand Down
Loading