From 294b2da102451a268f58da0783da93eb9012b6d9 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Wed, 13 Jan 2021 12:45:10 -0800 Subject: [PATCH 01/42] Replace old JSONDict with new BufferedJSONDict. --- signac/__init__.py | 4 ++-- signac/contrib/job.py | 13 ++++++---- signac/contrib/project.py | 8 +++---- .../synced_collections/buffered_collection.py | 24 ++++++++++++++++++- tests/test_buffered_mode.py | 12 ++++++++-- 5 files changed, 47 insertions(+), 14 deletions(-) diff --git a/signac/__init__.py b/signac/__init__.py index 6bf487e2f..72bf15c87 100644 --- a/signac/__init__.py +++ b/signac/__init__.py @@ -28,10 +28,10 @@ from .contrib import get_job, get_project, index, index_files, init_project from .core.h5store import H5Store, H5StoreManager from .core.jsondict import JSONDict -from .core.jsondict import buffer_reads_writes as buffered from .core.jsondict import flush_all as flush from .core.jsondict import get_buffer_load, get_buffer_size -from .core.jsondict import in_buffered_mode as is_buffered +from .core.synced_collections.buffered_collection import buffer_all as buffered +from .core.synced_collections.buffered_collection import is_buffered from .db import get_database from .diff import diff_jobs from .version import __version__ diff --git a/signac/contrib/job.py b/signac/contrib/job.py index 264563fa2..c7277473b 100644 --- a/signac/contrib/job.py +++ b/signac/contrib/job.py @@ -14,7 +14,7 @@ from ..core import json from ..core.attrdict import SyncedAttrDict from ..core.h5store import H5StoreManager -from ..core.jsondict import JSONDict +from ..core.synced_collections.collection_json import BufferedJSONDict from ..sync import sync_jobs from ..version import __version__ from .errors import DestinationExistsError, JobsCorruptedError @@ -376,7 +376,8 @@ def document(self): If you need a deep copy that will not modify the underlying persistent JSON file, use :attr:`~Job.document` instead of :attr:`~Job.doc`. - For more information, see :attr:`~Job.statepoint` or :class:`~signac.JSONDict`. + For more information, see + :class:`~signac.core.synced_collections.collection_json.BufferedJSONDict`. See :ref:`signac document ` for the command line equivalent. @@ -386,10 +387,11 @@ def document(self): The job document handle. """ + # TODO: Fix this docstring explaining how to get a deep copy. if self._document is None: self.init() fn_doc = os.path.join(self.workspace(), self.FN_DOCUMENT) - self._document = JSONDict(filename=fn_doc, write_concern=True) + self._document = BufferedJSONDict(filename=fn_doc, write_concern=True) return self._document @document.setter @@ -398,7 +400,7 @@ def document(self, new_doc): Parameters ---------- - new_doc : :class:`~signac.JSONDict` + new_doc : :class:`~signac.core.synced_collections.collection_json.BufferedJSONDict` The job document handle. """ @@ -412,7 +414,8 @@ def doc(self): If you need a deep copy that will not modify the underlying persistent JSON file, use :attr:`~Job.document` instead of :attr:`~Job.doc`. - For more information, see :attr:`~Job.statepoint` or :class:`~signac.JSONDict`. + For more information, see + :class:`~signac.core.synced_collections.collection_json.BufferedJSONDict`. """ return self.document diff --git a/signac/contrib/project.py b/signac/contrib/project.py index 3a2487e29..dc3bd589f 100644 --- a/signac/contrib/project.py +++ b/signac/contrib/project.py @@ -26,7 +26,7 @@ from ..common.config import Config, get_config, load_config from ..core import json from ..core.h5store import H5StoreManager -from ..core.jsondict import JSONDict +from ..core.synced_collections.collection_json import BufferedJSONDict from ..sync import sync_projects from ..version import SCHEMA_VERSION, __version__ from .collection import Collection @@ -496,13 +496,13 @@ def document(self): Returns ------- - :class:`~signac.JSONDict` + :class:`~signac.core.synced_collections.collection_json.BufferedJSONDict` The project document. """ if self._document is None: fn_doc = os.path.join(self.root_directory(), self.FN_DOCUMENT) - self._document = JSONDict(filename=fn_doc, write_concern=True) + self._document = BufferedJSONDict(filename=fn_doc, write_concern=True) return self._document @document.setter @@ -525,7 +525,7 @@ def doc(self): Returns ------- - :class:`~signac.JSONDict` + :class:`~signac.core.synced_collections.collection_json.BufferedJSONDict` The project document. """ diff --git a/signac/core/synced_collections/buffered_collection.py b/signac/core/synced_collections/buffered_collection.py index 1ea446ad3..5d6afe791 100644 --- a/signac/core/synced_collections/buffered_collection.py +++ b/signac/core/synced_collections/buffered_collection.py @@ -36,6 +36,7 @@ """ import logging +import warnings from inspect import isabstract from typing import Any, List @@ -207,7 +208,7 @@ def _flush_buffer(self): # This function provides a more familiar module-scope, function-based interface # for enabling buffering rather than calling the class's static method. -def buffer_all(): +def buffer_all(force_write=None, buffer_size=None): """Return a global buffer context for all BufferedCollection instances. All future operations use the buffer whenever possible. Write operations @@ -217,4 +218,25 @@ def buffer_all(): manager represents a promise to buffer whenever possible, but does not guarantee that no writes will occur under all circumstances. """ + if force_write is not None: + warnings.warn( + DeprecationWarning( + "The force_write parameter is deprecated and will be removed in " + "signac 2.0. This functionality is no longer supported." + ) + ) + if buffer_size is not None: + warnings.warn( + DeprecationWarning( + "The buffer_size parameter is deprecated and will be removed in " + "signac 2.0. The buffer size should be set using the " + "set_buffer_capacity method of FileBufferedCollection or any of its " + "subclasses." + ) + ) return _BUFFER_ALL_CONTEXT + + +def is_buffered(): + """Check the global buffered mode setting.""" + return bool(_BUFFER_ALL_CONTEXT) diff --git a/tests/test_buffered_mode.py b/tests/test_buffered_mode.py index 7a7d5f586..3f27ca03f 100644 --- a/tests/test_buffered_mode.py +++ b/tests/test_buffered_mode.py @@ -72,6 +72,9 @@ def test_basic_and_nested(self): assert job.doc.a == 2 assert job.doc.a == 2 + @pytest.mark.xfail( + reason="The new SyncedCollection does not implement force_write." + ) def test_buffered_mode_force_write(self): with signac.buffered(force_write=False): with signac.buffered(force_write=False): @@ -88,6 +91,9 @@ def test_buffered_mode_force_write(self): pass assert not signac.is_buffered() + @pytest.mark.xfail( + reason="The new SyncedCollection does not implement force_write." + ) def test_buffered_mode_force_write_with_file_modification(self): job = self.project.open_job(dict(a=0)) job.init() @@ -114,8 +120,8 @@ def test_buffered_mode_force_write_with_file_modification(self): file.write(json.dumps({"a": x}).encode()) assert job.doc.a == (not x) - @pytest.mark.skipif( - not ABLE_TO_PREVENT_WRITE, reason="unable to trigger permission error" + @pytest.mark.xfail( + reason="The new SyncedCollection does not implement force_write." ) def test_force_write_mode_with_permission_error(self): job = self.project.open_job(dict(a=0)) @@ -138,6 +144,7 @@ def test_force_write_mode_with_permission_error(self): os.chmod(path, mode) assert job.doc.a == x + @pytest.mark.xfail(reason="This API for setting the buffer size is deprecated.") def test_buffered_mode_change_buffer_size(self): assert not signac.is_buffered() with signac.buffered(buffer_size=12): @@ -165,6 +172,7 @@ def test_buffered_mode_change_buffer_size(self): with signac.buffered(buffer_size=14): pass + @pytest.mark.xfail(reason="This test uses various deprecated APIs.") def test_integration(self): def routine(): for i in range(1, 4): From f9acc3817eea1b14cd0ba465e07af0333e1bb71e Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Wed, 13 Jan 2021 12:49:08 -0800 Subject: [PATCH 02/42] Verify that replacing BufferedJSONDict with MemoryBufferedJSONDict. --- signac/contrib/job.py | 4 +++- signac/contrib/project.py | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/signac/contrib/job.py b/signac/contrib/job.py index c7277473b..7fe15e27a 100644 --- a/signac/contrib/job.py +++ b/signac/contrib/job.py @@ -14,7 +14,9 @@ from ..core import json from ..core.attrdict import SyncedAttrDict from ..core.h5store import H5StoreManager -from ..core.synced_collections.collection_json import BufferedJSONDict +from ..core.synced_collections.collection_json import ( + MemoryBufferedJSONDict as BufferedJSONDict, +) from ..sync import sync_jobs from ..version import __version__ from .errors import DestinationExistsError, JobsCorruptedError diff --git a/signac/contrib/project.py b/signac/contrib/project.py index dc3bd589f..a4f20430b 100644 --- a/signac/contrib/project.py +++ b/signac/contrib/project.py @@ -26,7 +26,9 @@ from ..common.config import Config, get_config, load_config from ..core import json from ..core.h5store import H5StoreManager -from ..core.synced_collections.collection_json import BufferedJSONDict +from ..core.synced_collections.collection_json import ( + MemoryBufferedJSONDict as BufferedJSONDict, +) from ..sync import sync_projects from ..version import SCHEMA_VERSION, __version__ from .collection import Collection From b0deb4ea65d73d104f177c610450f210e65a03b0 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Wed, 13 Jan 2021 15:32:22 -0800 Subject: [PATCH 03/42] Remove largely redundant _reset_sp method. --- signac/contrib/job.py | 19 +++---------------- 1 file changed, 3 insertions(+), 16 deletions(-) diff --git a/signac/contrib/job.py b/signac/contrib/job.py index 7fe15e27a..c6d3a9ec2 100644 --- a/signac/contrib/job.py +++ b/signac/contrib/job.py @@ -31,7 +31,7 @@ class _sp_save_hook: When a job's state point is changed, in addition to the contents of the file being modified this hook - calls :meth:`~Job._reset_sp` to rehash the state + calls :meth:`~Job.reset_statepoint` to rehash the state point, compute a new job id, and move the folder. Parameters @@ -50,7 +50,7 @@ def load(self): def save(self): """Reset the state point for all the jobs.""" for job in self.jobs: - job._reset_sp() + job.reset_statepoint(job.statepoint()) class Job: @@ -237,19 +237,6 @@ def reset_statepoint(self, new_statepoint): self._cwd = [] logger.info(f"Moved '{self}' -> '{dst}'.") - def _reset_sp(self, new_statepoint=None): - """Check for new state point requested to assign this job. - - Parameters - ---------- - new_statepoint : dict - The job's new state point (Default value = None). - - """ - if new_statepoint is None: - new_statepoint = self.statepoint() - self.reset_statepoint(new_statepoint) - def update_statepoint(self, update, overwrite=False): """Update the state point of this job. @@ -358,7 +345,7 @@ def statepoint(self, new_statepoint): The new state point to be assigned. """ - self._reset_sp(new_statepoint) + self.reset_statepoint(new_statepoint) @property def sp(self): From 700624e4b2598ca88165fa05d42258bfef9cf437 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Wed, 13 Jan 2021 16:14:24 -0800 Subject: [PATCH 04/42] Remove single-use internal functions in Job to reduce surface area for SyncedCollection integration. --- signac/contrib/job.py | 38 ++++++++------------------------------ 1 file changed, 8 insertions(+), 30 deletions(-) diff --git a/signac/contrib/job.py b/signac/contrib/job.py index c6d3a9ec2..261ad3543 100644 --- a/signac/contrib/job.py +++ b/signac/contrib/job.py @@ -14,6 +14,7 @@ from ..core import json from ..core.attrdict import SyncedAttrDict from ..core.h5store import H5StoreManager +from ..core.synced_collections.collection_json import JSONDict from ..core.synced_collections.collection_json import ( MemoryBufferedJSONDict as BufferedJSONDict, ) @@ -274,35 +275,6 @@ def update_statepoint(self, update, overwrite=False): statepoint.update(update) self.reset_statepoint(statepoint) - def _read_manifest(self): - """Read and parse the manifest file, if it exists. - - Returns - ------- - manifest : dict - State point data. - - Raises - ------ - JobsCorruptedError - If an error occurs while parsing the state point manifest. - OSError - If an error occurs while reading the state point manifest. - - """ - fn_manifest = os.path.join(self.workspace(), self.FN_MANIFEST) - try: - with open(fn_manifest, "rb") as file: - manifest = json.loads(file.read().decode()) - except OSError as error: - if error.errno != errno.ENOENT: - raise error - except ValueError: - # This catches JSONDecodeError, a subclass of ValueError - raise JobsCorruptedError([self.id]) - else: - return manifest - @property def statepoint(self): """Get the job's state point. @@ -558,7 +530,13 @@ def _check_manifest(self): If the manifest hash is not equal to the job id. """ - manifest = self._read_manifest() + fn_manifest = os.path.join(self.workspace(), self.FN_MANIFEST) + try: + manifest = JSONDict(fn_manifest)() + except ValueError: + # This catches JSONDecodeError, a subclass of ValueError + raise JobsCorruptedError([self.id]) + if calc_id(manifest) != self.id: raise JobsCorruptedError([self.id]) return manifest From 013291ca42b1c1deb0f2b7d520b741a3855a80ae Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Wed, 13 Jan 2021 17:08:51 -0800 Subject: [PATCH 05/42] Move logic from _init into init. --- signac/contrib/import_export.py | 2 +- signac/contrib/job.py | 100 +++++++++++++------------------- 2 files changed, 42 insertions(+), 60 deletions(-) diff --git a/signac/contrib/import_export.py b/signac/contrib/import_export.py index 5b497dfef..bd605567c 100644 --- a/signac/contrib/import_export.py +++ b/signac/contrib/import_export.py @@ -766,7 +766,7 @@ def _copy_to_job_workspace(src, job, copytree): raise DestinationExistsError(job) raise else: - job._init() + job.init() return dst diff --git a/signac/contrib/job.py b/signac/contrib/job.py index 261ad3543..a112f2455 100644 --- a/signac/contrib/job.py +++ b/signac/contrib/job.py @@ -458,64 +458,6 @@ def data(self, new_data): """ self.stores[self.KEY_DATA] = new_data - def _init(self, force=False): - """Contains all logic for job initialization. - - This method is called by :meth:`~.init` and is responsible - for actually creating the job workspace directory and - writing out the state point manifest file. - - Parameters - ---------- - force : bool - If ``True``, write the job manifest even if it - already exists. If ``False``, this method will - raise an Exception if the manifest exists - (Default value = False). - - """ - fn_manifest = os.path.join(self.workspace(), self.FN_MANIFEST) - - # Attempt early exit if the manifest exists and is valid - try: - statepoint = self._check_manifest() - except Exception: - # Any exception means this method cannot exit early. - - # Create the workspace directory if it did not exist yet. - try: - _mkdir_p(self.workspace()) - except OSError: - logger.error( - "Error occurred while trying to create " - "workspace directory for job '{}'.".format(self) - ) - raise - - try: - # Prepare the data before file creation and writing - blob = json.dumps(self.statepoint, indent=2) - - try: - # Open the file for writing only if it does not exist yet. - with open(fn_manifest, "w" if force else "x") as file: - file.write(blob) - except OSError as error: - if error.errno not in (errno.EEXIST, errno.EACCES): - raise - except Exception as error: - # Attempt to delete the file on error, to prevent corruption. - try: - os.remove(fn_manifest) - except Exception: # ignore all errors here - pass - raise error - else: - statepoint = self._check_manifest() - - # Update the project's state point cache if the manifest is valid - self._project._register(self.id, statepoint) - def _check_manifest(self): """Check whether the manifest file exists and is correct. @@ -564,7 +506,47 @@ def init(self, force=False): """ try: - self._init(force=force) + fn_manifest = os.path.join(self.workspace(), self.FN_MANIFEST) + + # Attempt early exit if the manifest exists and is valid + try: + statepoint = self._check_manifest() + except Exception: + # Any exception means this method cannot exit early. + + # Create the workspace directory if it did not exist yet. + try: + _mkdir_p(self.workspace()) + except OSError: + logger.error( + "Error occurred while trying to create " + "workspace directory for job '{}'.".format(self) + ) + raise + + try: + # Prepare the data before file creation and writing + blob = json.dumps(self.statepoint, indent=2) + + try: + # Open the file for writing only if it does not exist yet. + with open(fn_manifest, "w" if force else "x") as file: + file.write(blob) + except OSError as error: + if error.errno not in (errno.EEXIST, errno.EACCES): + raise + except Exception as error: + # Attempt to delete the file on error, to prevent corruption. + try: + os.remove(fn_manifest) + except Exception: # ignore all errors here + pass + raise error + else: + statepoint = self._check_manifest() + + # Update the project's state point cache if the manifest is valid + self._project._register(self.id, statepoint) except Exception: logger.error( f"State point manifest file of job '{self.id}' appears to be corrupted." From 3b74e4810d2054a77fcb80b03ceb9bd0b8e67509 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Mon, 18 Jan 2021 16:56:53 -0800 Subject: [PATCH 06/42] Working implementation of statepoint using new SyncedCollection. --- signac/contrib/job.py | 172 ++++++++++++------ .../synced_collections/collection_json.py | 7 +- 2 files changed, 121 insertions(+), 58 deletions(-) diff --git a/signac/contrib/job.py b/signac/contrib/job.py index a112f2455..e9ca5057e 100644 --- a/signac/contrib/job.py +++ b/signac/contrib/job.py @@ -11,13 +11,12 @@ from deprecation import deprecated -from ..core import json -from ..core.attrdict import SyncedAttrDict from ..core.h5store import H5StoreManager from ..core.synced_collections.collection_json import JSONDict from ..core.synced_collections.collection_json import ( MemoryBufferedJSONDict as BufferedJSONDict, ) +from ..errors import KeyTypeError from ..sync import sync_jobs from ..version import __version__ from .errors import DestinationExistsError, JobsCorruptedError @@ -27,31 +26,79 @@ logger = logging.getLogger(__name__) -class _sp_save_hook: - """Hook to handle job migration when state points are changed. +class _LoadAndSaveSingleThread: + """A context manager for :class:`SyncedCollection` to wrap saving and loading. - When a job's state point is changed, in addition - to the contents of the file being modified this hook - calls :meth:`~Job.reset_statepoint` to rehash the state - point, compute a new job id, and move the folder. + Unclear how to mesh thread-safety with the fact that I'm introducing locks on a per-file level, + but the save operation changes the filename within the context _and_ multiple jobs could + point to the same statepoint. When the filename is changed by reset_statepoint, I need some + way to consistently also repoint the locks. + """ - Parameters - ---------- - jobs : iterable of `Jobs` - List of jobs(instance of `Job`). + def __init__(self, collection): + self._collection = collection + + def __enter__(self): + self._collection._load() + def __exit__(self, exc_type, exc_val, exc_tb): + self._collection._save() + + +class _StatepointDict(JSONDict): + """A JSON-backed dictionary for storing job statepoints. + + There are two principal reasons for extending the base JSONDict: + 1. Saving needs to trigger a job directory migration, and + 2. Statepoints are assumed to not have to support external modification, + so they never need to load from disk _except_ the very first time a + job is opened by id and they're not present in the cache. """ - def __init__(self, *jobs): - self.jobs = list(jobs) + _PROTECTED_KEYS = ("_jobs", "_requires_init") + _LoadSaveType = _LoadAndSaveSingleThread # type: ignore + + def __init__( + self, + jobs=None, + filename=None, + write_concern=False, + data=None, + parent=None, + *args, + **kwargs, + ): + self._jobs = list(jobs) + self._requires_init = data is None + super().__init__( + filename=filename, + write_concern=write_concern, + data=data, + parent=parent, + *args, + **kwargs, + ) + + def _load(self): + """Don't attempt a load unless no data was initially provided.""" + if self._requires_init: + super()._load() + self._requires_init = False - def load(self): - pass + def _save(self): + """Don't save to disk by default.""" + for job in self._jobs: + job.reset_statepoint(self._data) - def save(self): - """Reset the state point for all the jobs.""" - for job in self.jobs: - job.reset_statepoint(job.statepoint()) + def save(self, force): + """Need a way to force a save to disk.""" + if force or not os.path.isfile(self._filename): + super()._save() + + def move(self, new_filename): + """Move to new filename.""" + os.replace(self.filename, new_filename) + self._filename = new_filename class Job: @@ -91,30 +138,26 @@ class Job: def __init__(self, project, statepoint=None, _id=None): self._project = project + # Prepare wd in advance so that the attribute exists in checks below. + self._wd = None + if statepoint is None and _id is None: raise ValueError("Either statepoint or _id must be provided.") elif statepoint is not None: - # A state point was provided. - self._statepoint = SyncedAttrDict(statepoint, parent=_sp_save_hook(self)) - # If the id is provided, assume the job is already registered in - # the project cache and that the id is valid for the state point. - if _id is None: - # Validate the state point and recursively convert to supported types. - statepoint = self.statepoint() - # Compute the id from the state point if not provided. - self._id = calc_id(statepoint) - # Update the project's state point cache immediately if opened by state point - self._project._register(self.id, statepoint) - else: - self._id = _id + self._statepoint = _StatepointDict(jobs=[self], data=statepoint) + try: + self._id = calc_id(self._statepoint._to_base()) if _id is None else _id + except TypeError: + raise KeyTypeError + self._statepoint._filename = self._statepoint_filename + + # Update the project's state point cache immediately if opened by state point + self._project._register(self.id, statepoint) else: # Only an id was provided. State point will be loaded lazily. self._statepoint = None self._id = _id - # Prepare job working directory - self._wd = None - # Prepare job document self._document = None @@ -187,6 +230,11 @@ def workspace(self): self._wd = os.path.join(self._project.workspace(), self.id) return self._wd + @property + def _statepoint_filename(self): + """Get the path of the state point file for this job.""" + return os.path.join(self.workspace(), self.FN_MANIFEST) + @property def ws(self): """Alias for :meth:`~Job.workspace`.""" @@ -210,14 +258,13 @@ def reset_statepoint(self, new_statepoint): dst = self._project.open_job(new_statepoint) if dst == self: return - fn_manifest = os.path.join(self.workspace(), self.FN_MANIFEST) - fn_manifest_backup = fn_manifest + "~" + try: - os.replace(fn_manifest, fn_manifest_backup) + self.statepoint.move(self.statepoint.filename + "~") try: os.replace(self.workspace(), dst.workspace()) except OSError as error: - os.replace(fn_manifest_backup, fn_manifest) # rollback + self._statepoint.move(self._statepoint.filename[:-1]) # rollback if error.errno in (errno.EEXIST, errno.ENOTEMPTY, errno.EACCES): raise DestinationExistsError(dst) else: @@ -226,11 +273,13 @@ def reset_statepoint(self, new_statepoint): dst.init() except OSError as error: if error.errno == errno.ENOENT: - pass # job is not initialized + pass # File is not initialized. else: raise + # Update this instance self.statepoint._data = dst.statepoint._data + self.statepoint._filename = dst.statepoint._filename self._id = dst._id self._wd = None self._document = None @@ -299,8 +348,18 @@ def statepoint(self): """ if self._statepoint is None: # Load state point manifest lazily - statepoint = self._check_manifest() - self._statepoint = SyncedAttrDict(statepoint, parent=_sp_save_hook(self)) + self._statepoint = _StatepointDict( + jobs=[self], filename=self._statepoint_filename + ) + + try: + statepoint = self._statepoint() + except ValueError: + # This catches JSONDecodeError, a subclass of ValueError + raise JobsCorruptedError([self.id]) + + if calc_id(statepoint) != self.id: + raise JobsCorruptedError([self.id]) # Update the project's state point cache when loaded lazily self._project._register(self.id, statepoint) @@ -506,11 +565,11 @@ def init(self, force=False): """ try: - fn_manifest = os.path.join(self.workspace(), self.FN_MANIFEST) - # Attempt early exit if the manifest exists and is valid try: - statepoint = self._check_manifest() + statepoint = self.statepoint._load_from_resource() + if calc_id(statepoint) != self.id: + raise JobsCorruptedError([self.id]) except Exception: # Any exception means this method cannot exit early. @@ -525,28 +584,31 @@ def init(self, force=False): raise try: - # Prepare the data before file creation and writing - blob = json.dumps(self.statepoint, indent=2) - try: # Open the file for writing only if it does not exist yet. - with open(fn_manifest, "w" if force else "x") as file: - file.write(blob) + self.statepoint.save(force=force) except OSError as error: if error.errno not in (errno.EEXIST, errno.EACCES): raise except Exception as error: # Attempt to delete the file on error, to prevent corruption. try: - os.remove(fn_manifest) + os.remove(self._statepoint_filename) except Exception: # ignore all errors here pass raise error else: - statepoint = self._check_manifest() + try: + statepoint = self.statepoint._load_from_resource() + except ValueError: + # This catches JSONDecodeError, a subclass of ValueError + raise JobsCorruptedError([self.id]) + + if calc_id(statepoint) != self.id: + raise JobsCorruptedError([self.id]) # Update the project's state point cache if the manifest is valid - self._project._register(self.id, statepoint) + self._project._register(self.id, self.statepoint()) except Exception: logger.error( f"State point manifest file of job '{self.id}' appears to be corrupted." @@ -770,7 +832,7 @@ def __exit__(self, err_type, err_value, tb): def __setstate__(self, state): self.__dict__.update(state) - self.statepoint._parent.jobs.append(self) + self.statepoint._jobs.append(self) def __deepcopy__(self, memo): cls = self.__class__ diff --git a/signac/core/synced_collections/collection_json.py b/signac/core/synced_collections/collection_json.py index 504d3da40..711186d62 100644 --- a/signac/core/synced_collections/collection_json.py +++ b/signac/core/synced_collections/collection_json.py @@ -8,6 +8,7 @@ import os import uuid import warnings +from typing import Tuple from .memory_buffered_collection import SharedMemoryFileBufferedCollection from .serialized_file_buffered_collection import SerializedFileBufferedCollection @@ -215,7 +216,7 @@ class JSONDict(JSONCollection, SyncedAttrDict): """ - _PROTECTED_KEYS = ("_filename",) + _PROTECTED_KEYS: Tuple[str, ...] = ("_filename",) def __init__( self, @@ -294,7 +295,7 @@ def __init__( class BufferedJSONDict(BufferedJSONCollection, SyncedAttrDict): """A buffered :class:`JSONDict`.""" - _PROTECTED_KEYS = ( + _PROTECTED_KEYS: Tuple[str, ...] = ( "_filename", "_buffered", "_is_buffered", @@ -344,7 +345,7 @@ def __init__( class MemoryBufferedJSONDict(MemoryBufferedJSONCollection, SyncedAttrDict): """A buffered :class:`JSONDict`.""" - _PROTECTED_KEYS = SyncedAttrDict._PROTECTED_KEYS + ( + _PROTECTED_KEYS: Tuple[str, ...] = SyncedAttrDict._PROTECTED_KEYS + ( "_filename", "_buffered", "_is_buffered", From f93f79b21ea618ed11fe101d7d15e197927950c8 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Mon, 18 Jan 2021 16:58:21 -0800 Subject: [PATCH 07/42] Remove _check_manifest. --- signac/contrib/job.py | 25 ------------------------- 1 file changed, 25 deletions(-) diff --git a/signac/contrib/job.py b/signac/contrib/job.py index e9ca5057e..285cc7ffc 100644 --- a/signac/contrib/job.py +++ b/signac/contrib/job.py @@ -517,31 +517,6 @@ def data(self, new_data): """ self.stores[self.KEY_DATA] = new_data - def _check_manifest(self): - """Check whether the manifest file exists and is correct. - - Returns - ------- - manifest : dict - State point data. - - Raises - ------ - JobsCorruptedError - If the manifest hash is not equal to the job id. - - """ - fn_manifest = os.path.join(self.workspace(), self.FN_MANIFEST) - try: - manifest = JSONDict(fn_manifest)() - except ValueError: - # This catches JSONDecodeError, a subclass of ValueError - raise JobsCorruptedError([self.id]) - - if calc_id(manifest) != self.id: - raise JobsCorruptedError([self.id]) - return manifest - def init(self, force=False): """Initialize the job's workspace directory. From fc0ba1d6ed298bcbc57ab93f80702681991882d9 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Mon, 18 Jan 2021 17:06:07 -0800 Subject: [PATCH 08/42] Expose loading explicitly to remove the need for internal laziness in the StatepointDict. --- signac/contrib/job.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/signac/contrib/job.py b/signac/contrib/job.py index 285cc7ffc..2c27d1348 100644 --- a/signac/contrib/job.py +++ b/signac/contrib/job.py @@ -55,7 +55,7 @@ class _StatepointDict(JSONDict): job is opened by id and they're not present in the cache. """ - _PROTECTED_KEYS = ("_jobs", "_requires_init") + _PROTECTED_KEYS = ("_jobs",) _LoadSaveType = _LoadAndSaveSingleThread # type: ignore def __init__( @@ -69,7 +69,6 @@ def __init__( **kwargs, ): self._jobs = list(jobs) - self._requires_init = data is None super().__init__( filename=filename, write_concern=write_concern, @@ -81,9 +80,7 @@ def __init__( def _load(self): """Don't attempt a load unless no data was initially provided.""" - if self._requires_init: - super()._load() - self._requires_init = False + pass def _save(self): """Don't save to disk by default.""" @@ -95,6 +92,10 @@ def save(self, force): if force or not os.path.isfile(self._filename): super()._save() + def load(self): + """Need a way to force a save to disk.""" + super()._load() + def move(self, new_filename): """Move to new filename.""" os.replace(self.filename, new_filename) @@ -353,6 +354,7 @@ def statepoint(self): ) try: + self._statepoint.load() statepoint = self._statepoint() except ValueError: # This catches JSONDecodeError, a subclass of ValueError From 12bbdf91812729940fb5f99d74f31316cc7f6e00 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Mon, 18 Jan 2021 17:35:24 -0800 Subject: [PATCH 09/42] Simplify the code as much as possible by inlining move method and catching the correct error. --- signac/contrib/job.py | 38 +++++++++++++------------------------- 1 file changed, 13 insertions(+), 25 deletions(-) diff --git a/signac/contrib/job.py b/signac/contrib/job.py index 2c27d1348..28dd4a512 100644 --- a/signac/contrib/job.py +++ b/signac/contrib/job.py @@ -8,14 +8,12 @@ import os import shutil from copy import deepcopy +from json import JSONDecodeError from deprecation import deprecated from ..core.h5store import H5StoreManager -from ..core.synced_collections.collection_json import JSONDict -from ..core.synced_collections.collection_json import ( - MemoryBufferedJSONDict as BufferedJSONDict, -) +from ..core.synced_collections.collection_json import BufferedJSONDict, JSONDict from ..errors import KeyTypeError from ..sync import sync_jobs from ..version import __version__ @@ -79,28 +77,23 @@ def __init__( ) def _load(self): - """Don't attempt a load unless no data was initially provided.""" + """Never automatically load from disk.""" pass def _save(self): - """Don't save to disk by default.""" + """Trigger job migrations on save by default.""" for job in self._jobs: job.reset_statepoint(self._data) def save(self, force): - """Need a way to force a save to disk.""" + """Force a save to disk.""" if force or not os.path.isfile(self._filename): super()._save() def load(self): - """Need a way to force a save to disk.""" + """Force a load from disk.""" super()._load() - def move(self, new_filename): - """Move to new filename.""" - os.replace(self.filename, new_filename) - self._filename = new_filename - class Job: """The job instance is a handle to the data of a unique state point. @@ -260,12 +253,13 @@ def reset_statepoint(self, new_statepoint): if dst == self: return + tmp_statepoint_file = self.statepoint.filename + "~" try: - self.statepoint.move(self.statepoint.filename + "~") + os.replace(self.statepoint.filename, tmp_statepoint_file) try: os.replace(self.workspace(), dst.workspace()) except OSError as error: - self._statepoint.move(self._statepoint.filename[:-1]) # rollback + os.replace(tmp_statepoint_file, self.statepoint.filename) # rollback if error.errno in (errno.EEXIST, errno.ENOTEMPTY, errno.EACCES): raise DestinationExistsError(dst) else: @@ -356,11 +350,8 @@ def statepoint(self): try: self._statepoint.load() statepoint = self._statepoint() - except ValueError: - # This catches JSONDecodeError, a subclass of ValueError - raise JobsCorruptedError([self.id]) - - if calc_id(statepoint) != self.id: + assert calc_id(statepoint) == self.id + except (JSONDecodeError, AssertionError): raise JobsCorruptedError([self.id]) # Update the project's state point cache when loaded lazily @@ -577,11 +568,8 @@ def init(self, force=False): else: try: statepoint = self.statepoint._load_from_resource() - except ValueError: - # This catches JSONDecodeError, a subclass of ValueError - raise JobsCorruptedError([self.id]) - - if calc_id(statepoint) != self.id: + assert calc_id(statepoint) == self.id + except (JSONDecodeError, AssertionError): raise JobsCorruptedError([self.id]) # Update the project's state point cache if the manifest is valid From b961308eeefa6324187a091b0adfbfee0321759e Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Mon, 18 Jan 2021 17:43:03 -0800 Subject: [PATCH 10/42] Improve documentation of context manager for statepoint loading. --- signac/contrib/job.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/signac/contrib/job.py b/signac/contrib/job.py index 28dd4a512..81508ac71 100644 --- a/signac/contrib/job.py +++ b/signac/contrib/job.py @@ -27,10 +27,12 @@ class _LoadAndSaveSingleThread: """A context manager for :class:`SyncedCollection` to wrap saving and loading. - Unclear how to mesh thread-safety with the fact that I'm introducing locks on a per-file level, - but the save operation changes the filename within the context _and_ multiple jobs could - point to the same statepoint. When the filename is changed by reset_statepoint, I need some - way to consistently also repoint the locks. + It's also not obvious how to achieve thread-safety for statepoint + modifications within the current framework because when multiple copies of + a job (shallow copies owning the same statepoint) exist and one of them is + modified, the calls to reset_statepoint will invalidate the per-file locks + because the folders are moved. Since statepoint accesses do not need to be + thread safe, this context manager simply removes that functionality. """ def __init__(self, collection): From fd8045d24cf62030e6ccefffed829445f4c3166a Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Mon, 18 Jan 2021 17:44:36 -0800 Subject: [PATCH 11/42] Replace MemoryBufferedJSONDict in Project for now. --- signac/contrib/project.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/signac/contrib/project.py b/signac/contrib/project.py index a4f20430b..dc3bd589f 100644 --- a/signac/contrib/project.py +++ b/signac/contrib/project.py @@ -26,9 +26,7 @@ from ..common.config import Config, get_config, load_config from ..core import json from ..core.h5store import H5StoreManager -from ..core.synced_collections.collection_json import ( - MemoryBufferedJSONDict as BufferedJSONDict, -) +from ..core.synced_collections.collection_json import BufferedJSONDict from ..sync import sync_projects from ..version import SCHEMA_VERSION, __version__ from .collection import Collection From 82c6aa982e0b8737dacb878fdd3985db833e4e70 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Mon, 18 Jan 2021 17:53:42 -0800 Subject: [PATCH 12/42] Add documentation of why jobs must be stored as a list in the statepoint. --- signac/contrib/job.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/signac/contrib/job.py b/signac/contrib/job.py index 81508ac71..bda9c3088 100644 --- a/signac/contrib/job.py +++ b/signac/contrib/job.py @@ -68,6 +68,10 @@ def __init__( *args, **kwargs, ): + # A job-statepoint mapping need not be unique because multiple Python + # Job objects can point to the same data on disk. We need to store + # these jobs in a shared list here so that shallow copies can point to + # the same place and trigger each other to update. self._jobs = list(jobs) super().__init__( filename=filename, @@ -799,6 +803,8 @@ def __exit__(self, err_type, err_value, tb): def __setstate__(self, state): self.__dict__.update(state) + # Note that we append to a list of jobs rather than replacing to + # support transparent id updates between shallow copies of a job. self.statepoint._jobs.append(self) def __deepcopy__(self, memo): From 474fceb0400900c927bd4592f7d4c6fd61e69b51 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Tue, 19 Jan 2021 16:03:57 -0800 Subject: [PATCH 13/42] Address PR comments. --- signac/contrib/job.py | 116 +++++++++++++++++++++++++----------------- 1 file changed, 70 insertions(+), 46 deletions(-) diff --git a/signac/contrib/job.py b/signac/contrib/job.py index bda9c3088..b7ad4c84d 100644 --- a/signac/contrib/job.py +++ b/signac/contrib/job.py @@ -24,39 +24,31 @@ logger = logging.getLogger(__name__) -class _LoadAndSaveSingleThread: - """A context manager for :class:`SyncedCollection` to wrap saving and loading. - - It's also not obvious how to achieve thread-safety for statepoint - modifications within the current framework because when multiple copies of - a job (shallow copies owning the same statepoint) exist and one of them is - modified, the calls to reset_statepoint will invalidate the per-file locks - because the folders are moved. Since statepoint accesses do not need to be - thread safe, this context manager simply removes that functionality. - """ - - def __init__(self, collection): - self._collection = collection - - def __enter__(self): - self._collection._load() - - def __exit__(self, exc_type, exc_val, exc_tb): - self._collection._save() - - class _StatepointDict(JSONDict): """A JSON-backed dictionary for storing job statepoints. - There are two principal reasons for extending the base JSONDict: + There are three principal reasons for extending the base JSONDict: 1. Saving needs to trigger a job directory migration, and - 2. Statepoints are assumed to not have to support external modification, - so they never need to load from disk _except_ the very first time a - job is opened by id and they're not present in the cache. + 2. Statepoints are assumed to not support external modification, so + they never need to load from disk _except_ the very first time a job + is opened by id and they're not present in the cache. + 3. It must be possible to load and/or save on demand during tasks like + Job directory migrations. """ _PROTECTED_KEYS = ("_jobs",) - _LoadSaveType = _LoadAndSaveSingleThread # type: ignore + # State points are rarely modified and are not designed for efficient + # modification, so they do not support multithreaded execution. + # Implementing thread safe modifications would also be quite difficult + # because state point modification triggers a migration that moves the + # file. Moreover, since shallow copies of jobs share state points to + # trigger id updates, and since Job.reset_statepoint is called within + # _StatepointDict._save, the filename will actually change withiin the + # context. Since this linkage between the Job and the _StatepointDict + # allows the _StatepointDict to be in invalid intermediate states during + # the process, making the threading work would require somewhat complex and + # highly specialized handling. + _supports_threading = False def __init__( self, @@ -83,21 +75,43 @@ def __init__( ) def _load(self): - """Never automatically load from disk.""" + # State points never load from disk automatically. They are either + # initialized with provided data (e.g. from the state point cache), or + # they load from disk the first time state point data is requested for + # a Job opened by id (in which case the state point must first be + # validated manually). pass def _save(self): - """Trigger job migrations on save by default.""" + # State point modification triggers job migration for all jobs sharing + # this state point (shallow copies of a single job). for job in self._jobs: job.reset_statepoint(self._data) def save(self, force): - """Force a save to disk.""" + """Force a save to disk. + + Unlike normal JSONDict objects, this class requires the ability to save + on command. Moreover, this save must be conditional on whether or not a + file is present to allow the user to observe state points in corrupted + data spaces and attempt to recover. + + Parameters + ---------- + force : bool + If True, save even if the file is present on disk. + """ if force or not os.path.isfile(self._filename): super()._save() def load(self): - """Force a load from disk.""" + """Force a load from disk. + + Unlike normal JSONDict objects, this class requires the ability to load + on command. These loads typically occur when the state point must be + validated against the data on disk; at all other times, the in-memory + data is assumed to be accurate to avoid unnecessary I/O. + """ super()._load() @@ -123,9 +137,9 @@ class Job: """ FN_MANIFEST = "signac_statepoint.json" - """The job's manifest filename. + """The job's state point filename. - The job manifest is a human-readable file containing the job's state + The job state point is a human-readable file containing the job's state point that is stored in each job's workspace directory. """ @@ -348,7 +362,7 @@ def statepoint(self): """ if self._statepoint is None: - # Load state point manifest lazily + # Load the state point lazily. self._statepoint = _StatepointDict( jobs=[self], filename=self._statepoint_filename ) @@ -393,8 +407,11 @@ def document(self): .. warning:: + Even deep copies of :attr:`~Job.document` will modify the same file, + so changes will still effectively be persisted between deep copies. If you need a deep copy that will not modify the underlying - persistent JSON file, use :attr:`~Job.document` instead of :attr:`~Job.doc`. + persistent JSON file, use the call operator to get an (otherwise + equivalent) raw dictionary: ``job.document()``. For more information, see :class:`~signac.core.synced_collections.collection_json.BufferedJSONDict`. @@ -406,7 +423,6 @@ def document(self): The job document handle. """ - # TODO: Fix this docstring explaining how to get a deep copy. if self._document is None: self.init() fn_doc = os.path.join(self.workspace(), self.FN_DOCUMENT) @@ -431,10 +447,18 @@ def doc(self): .. warning:: + Even deep copies of :attr:`~Job.doc` will modify the same file, so + changes will still effectively be persisted between deep copies. If you need a deep copy that will not modify the underlying - persistent JSON file, use :attr:`~Job.document` instead of :attr:`~Job.doc`. - For more information, see - :class:`~signac.core.synced_collections.collection_json.BufferedJSONDict`. + persistent JSON file, use the call operator to get an (otherwise + equivalent) raw dictionary: ``job.doc()``. + + See :ref:`signac document ` for the command line equivalent. + + Returns + ------- + :class:`~signac.JSONDict` + The job document handle. """ return self.document @@ -519,8 +543,8 @@ def data(self, new_data): def init(self, force=False): """Initialize the job's workspace directory. - This function will do nothing if the directory and - the job manifest already exist. + This function will do nothing if the directory and the job state point + already exist. Returns the calling job. @@ -529,8 +553,8 @@ def init(self, force=False): Parameters ---------- force : bool - Overwrite any existing state point's manifest - files, e.g., to repair them if they got corrupted (Default value = False). + Overwrite any existing state point files, e.g., to repair them if + they got corrupted (Default value = False). Returns ------- @@ -539,7 +563,7 @@ def init(self, force=False): """ try: - # Attempt early exit if the manifest exists and is valid + # Attempt early exit if the state point file exists and is valid. try: statepoint = self.statepoint._load_from_resource() if calc_id(statepoint) != self.id: @@ -578,11 +602,11 @@ def init(self, force=False): except (JSONDecodeError, AssertionError): raise JobsCorruptedError([self.id]) - # Update the project's state point cache if the manifest is valid - self._project._register(self.id, self.statepoint()) + # Update the project's state point cache if the saved file is valid. + self._project._register(self.id, statepoint) except Exception: logger.error( - f"State point manifest file of job '{self.id}' appears to be corrupted." + f"State point file of job '{self.id}' appears to be corrupted." ) raise return self From 52db85e06f600752ea378eec66c50eb220977a66 Mon Sep 17 00:00:00 2001 From: Bradley Dice Date: Wed, 20 Jan 2021 17:01:25 -0600 Subject: [PATCH 14/42] Add back import. --- tests/test_job.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/test_job.py b/tests/test_job.py index e1be4ab4e..d2b2db898 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -537,6 +537,9 @@ def test_chained_init(self): assert os.path.exists(os.path.join(job.workspace(), job.FN_MANIFEST)) def test_construction(self): + from signac import Project # noqa: F401 + + # The eval statement needs to have Project available job = self.open_job(test_token) job2 = eval(repr(job)) assert job == job2 From 985443e304d4de695f132edd49ca411a18c6808a Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Wed, 20 Jan 2021 19:08:13 -0800 Subject: [PATCH 15/42] Ensure _StatepointDict is always initialized in constructor. --- signac/contrib/job.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/signac/contrib/job.py b/signac/contrib/job.py index a2303767f..1a7ed1ec8 100644 --- a/signac/contrib/job.py +++ b/signac/contrib/job.py @@ -65,6 +65,7 @@ def __init__( # these jobs in a shared list here so that shallow copies can point to # the same place and trigger each other to update. self._jobs = list(jobs) + self._requires_init = data is None super().__init__( filename=filename, write_concern=write_concern, @@ -169,8 +170,10 @@ def __init__(self, project, statepoint=None, _id=None): self._project._register(self.id, statepoint) else: # Only an id was provided. State point will be loaded lazily. - self._statepoint = None self._id = _id + self._statepoint = _StatepointDict( + jobs=[self], filename=self._statepoint_filename + ) # Prepare job document self._document = None @@ -361,12 +364,8 @@ def statepoint(self): Returns the job's state point. """ - if self._statepoint is None: + if self._statepoint._requires_init: # Load the state point lazily. - self._statepoint = _StatepointDict( - jobs=[self], filename=self._statepoint_filename - ) - try: self._statepoint.load() statepoint = self._statepoint() @@ -376,6 +375,7 @@ def statepoint(self): # Update the project's state point cache when loaded lazily self._project._register(self.id, statepoint) + self._statepoint._requires_init = False return self._statepoint @@ -565,7 +565,7 @@ def init(self, force=False): try: # Attempt early exit if the state point file exists and is valid. try: - statepoint = self.statepoint._load_from_resource() + statepoint = self._statepoint._load_from_resource() if calc_id(statepoint) != self.id: raise JobsCorruptedError([self.id]) except Exception: @@ -584,7 +584,7 @@ def init(self, force=False): try: try: # Open the file for writing only if it does not exist yet. - self.statepoint.save(force=force) + self._statepoint.save(force=force) except OSError as error: if error.errno not in (errno.EEXIST, errno.EACCES): raise @@ -597,7 +597,7 @@ def init(self, force=False): raise error else: try: - statepoint = self.statepoint._load_from_resource() + statepoint = self._statepoint._load_from_resource() assert calc_id(statepoint) == self.id except (JSONDecodeError, AssertionError): raise JobsCorruptedError([self.id]) From a2f162757ff1b548a192ed58715b98e1d89ff5c0 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Wed, 20 Jan 2021 15:01:18 -0800 Subject: [PATCH 16/42] Change _StatepointDict to validate id on load. --- signac/contrib/job.py | 32 +++++++++++++++++++++----------- 1 file changed, 21 insertions(+), 11 deletions(-) diff --git a/signac/contrib/job.py b/signac/contrib/job.py index 1a7ed1ec8..bdee44fea 100644 --- a/signac/contrib/job.py +++ b/signac/contrib/job.py @@ -36,7 +36,7 @@ class _StatepointDict(JSONDict): Job directory migrations. """ - _PROTECTED_KEYS = ("_jobs",) + _PROTECTED_KEYS = ("_jobs", "_requires_init") # State points are rarely modified and are not designed for efficient # modification, so they do not support multithreaded execution. # Implementing thread safe modifications would also be quite difficult @@ -105,7 +105,7 @@ def save(self, force): if force or not os.path.isfile(self._filename): super()._save() - def load(self): + def load(self, job_id): """Force a load from disk. Unlike normal JSONDict objects, this class requires the ability to load @@ -113,7 +113,22 @@ def load(self): validated against the data on disk; at all other times, the in-memory data is assumed to be accurate to avoid unnecessary I/O. """ - super()._load() + if not self._suspend_sync: + if self._root is None: + try: + data = self._load_from_resource() + except JSONDecodeError: + raise JobsCorruptedError([job_id]) + + if calc_id(data) != job_id: + raise JobsCorruptedError([job_id]) + + with self._suspend_sync: + self._update(data) + + return data + else: + self._root._load() class Job: @@ -367,9 +382,7 @@ def statepoint(self): if self._statepoint._requires_init: # Load the state point lazily. try: - self._statepoint.load() - statepoint = self._statepoint() - assert calc_id(statepoint) == self.id + statepoint = self._statepoint.load(self.id) except (JSONDecodeError, AssertionError): raise JobsCorruptedError([self.id]) @@ -565,9 +578,7 @@ def init(self, force=False): try: # Attempt early exit if the state point file exists and is valid. try: - statepoint = self._statepoint._load_from_resource() - if calc_id(statepoint) != self.id: - raise JobsCorruptedError([self.id]) + statepoint = self._statepoint.load(self.id) except Exception: # Any exception means this method cannot exit early. @@ -597,8 +608,7 @@ def init(self, force=False): raise error else: try: - statepoint = self._statepoint._load_from_resource() - assert calc_id(statepoint) == self.id + statepoint = self._statepoint.load(self.id) except (JSONDecodeError, AssertionError): raise JobsCorruptedError([self.id]) From 58e52934a1a760c2d1fd75737b4d8ceb50114863 Mon Sep 17 00:00:00 2001 From: Bradley Dice Date: Wed, 20 Jan 2021 17:37:09 -0600 Subject: [PATCH 17/42] Refactor error handling into _StatepointDict class. --- signac/contrib/job.py | 62 +++++++++++++++++++------------------------ 1 file changed, 28 insertions(+), 34 deletions(-) diff --git a/signac/contrib/job.py b/signac/contrib/job.py index bdee44fea..5653b07f3 100644 --- a/signac/contrib/job.py +++ b/signac/contrib/job.py @@ -102,8 +102,22 @@ def save(self, force): force : bool If True, save even if the file is present on disk. """ - if force or not os.path.isfile(self._filename): - super()._save() + try: + # Open the file for writing only if it does not exist yet. + if force or not os.path.isfile(self._filename): + super()._save() + except Exception as error: + if not isinstance(error, OSError) or error.errno not in ( + errno.EEXIST, + errno.EACCES, + ): + # Attempt to delete the file on error, to prevent corruption. + # OSErrors that are EEXIST or EACCES don't need to delete the file. + try: + os.remove(self._filename) + except Exception: # ignore all errors here + pass + raise error def load(self, job_id): """Force a load from disk. @@ -113,22 +127,18 @@ def load(self, job_id): validated against the data on disk; at all other times, the in-memory data is assumed to be accurate to avoid unnecessary I/O. """ - if not self._suspend_sync: - if self._root is None: - try: - data = self._load_from_resource() - except JSONDecodeError: - raise JobsCorruptedError([job_id]) + try: + data = self._load_from_resource() + except JSONDecodeError: + raise JobsCorruptedError([job_id]) - if calc_id(data) != job_id: - raise JobsCorruptedError([job_id]) + if calc_id(data) != job_id: + raise JobsCorruptedError([job_id]) - with self._suspend_sync: - self._update(data) + with self._suspend_sync: + self._update(data) - return data - else: - self._root._load() + return data class Job: @@ -592,25 +602,9 @@ def init(self, force=False): ) raise - try: - try: - # Open the file for writing only if it does not exist yet. - self._statepoint.save(force=force) - except OSError as error: - if error.errno not in (errno.EEXIST, errno.EACCES): - raise - except Exception as error: - # Attempt to delete the file on error, to prevent corruption. - try: - os.remove(self._statepoint_filename) - except Exception: # ignore all errors here - pass - raise error - else: - try: - statepoint = self._statepoint.load(self.id) - except (JSONDecodeError, AssertionError): - raise JobsCorruptedError([self.id]) + self._statepoint.save(force=force) + # Re-load from disk as a validation. + statepoint = self._statepoint.load(self.id) # Update the project's state point cache if the saved file is valid. self._project._register(self.id, statepoint) From af907c4c0ef030e7bcef2b158998568a59a16507 Mon Sep 17 00:00:00 2001 From: Bradley Dice Date: Wed, 20 Jan 2021 17:39:31 -0600 Subject: [PATCH 18/42] Update docstrings. --- signac/contrib/job.py | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/signac/contrib/job.py b/signac/contrib/job.py index 5653b07f3..31c9611fc 100644 --- a/signac/contrib/job.py +++ b/signac/contrib/job.py @@ -122,10 +122,27 @@ def save(self, force): def load(self, job_id): """Force a load from disk. - Unlike normal JSONDict objects, this class requires the ability to load - on command. These loads typically occur when the state point must be - validated against the data on disk; at all other times, the in-memory - data is assumed to be accurate to avoid unnecessary I/O. + Unlike normal JSONDict objects, this class requires the ability to + load on command. These loads typically occur when the state point + must be validated against the data on disk; at all other times, the + in-memory data is assumed to be accurate to avoid unnecessary I/O. + + Parameters + ---------- + job_id : str + Job id used to validate contents on disk. + + Returns + ------- + data : dict + Dictionary of state point data. + + Raises + ------ + JobsCorruptedError + If the data on disk is invalid or its hash does not match the job + id. + """ try: data = self._load_from_resource() From d6e71dbbe9f86030725d46e1648f9ae1a6d09b00 Mon Sep 17 00:00:00 2001 From: Bradley Dice Date: Wed, 20 Jan 2021 17:43:24 -0600 Subject: [PATCH 19/42] Update comment. --- signac/contrib/job.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/signac/contrib/job.py b/signac/contrib/job.py index 31c9611fc..85d7deb28 100644 --- a/signac/contrib/job.py +++ b/signac/contrib/job.py @@ -620,7 +620,8 @@ def init(self, force=False): raise self._statepoint.save(force=force) - # Re-load from disk as a validation. + # Re-load as a validation (required to detect invalid data on + # disk). statepoint = self._statepoint.load(self.id) # Update the project's state point cache if the saved file is valid. From fba0fab54c49126063d29b1186545570e4e83552 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Wed, 20 Jan 2021 20:33:09 -0800 Subject: [PATCH 20/42] Fix some docstrings. --- signac/contrib/errors.py | 4 ++-- signac/core/synced_collections/synced_attr_dict.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/signac/contrib/errors.py b/signac/contrib/errors.py index 80c3da855..0391a2a96 100644 --- a/signac/contrib/errors.py +++ b/signac/contrib/errors.py @@ -28,8 +28,8 @@ class DestinationExistsError(Error, RuntimeError): Parameters ---------- - destination : - The destination object causing the error. + destination : str + The destination causing the error. """ diff --git a/signac/core/synced_collections/synced_attr_dict.py b/signac/core/synced_collections/synced_attr_dict.py index 01a1b477f..c1b61714f 100644 --- a/signac/core/synced_collections/synced_attr_dict.py +++ b/signac/core/synced_collections/synced_attr_dict.py @@ -185,7 +185,7 @@ def reset(self, data=None): Parameters ---------- - data: mapping + data : mapping Data to update the instance (Default value = None). Raises From aafd813caef66092315e6032d9c10a1d67ce9c4d Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Wed, 20 Jan 2021 20:44:06 -0800 Subject: [PATCH 21/42] Remove redundant JobsCorruptedError check. --- signac/contrib/job.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/signac/contrib/job.py b/signac/contrib/job.py index 85d7deb28..f837dfa9e 100644 --- a/signac/contrib/job.py +++ b/signac/contrib/job.py @@ -407,11 +407,7 @@ def statepoint(self): """ if self._statepoint._requires_init: - # Load the state point lazily. - try: - statepoint = self._statepoint.load(self.id) - except (JSONDecodeError, AssertionError): - raise JobsCorruptedError([self.id]) + statepoint = self._statepoint.load(self.id) # Update the project's state point cache when loaded lazily self._project._register(self.id, statepoint) From 44fca055bf9c494fe935234e1a4b765a333ad1b5 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Fri, 22 Jan 2021 13:44:49 -0800 Subject: [PATCH 22/42] Rewrite reset_statepoint to not depend on creating another job. --- signac/contrib/hashing.py | 8 +++++- signac/contrib/job.py | 58 +++++++++++++++++++++++++++------------ 2 files changed, 47 insertions(+), 19 deletions(-) diff --git a/signac/contrib/hashing.py b/signac/contrib/hashing.py index a2c5a58bd..9a1b391fe 100644 --- a/signac/contrib/hashing.py +++ b/signac/contrib/hashing.py @@ -6,6 +6,9 @@ import hashlib import json +from ..core.synced_collections.utils import SCJSONEncoder +from ..errors import KeyTypeError + # We must use the standard library json for exact consistency in formatting @@ -27,7 +30,10 @@ def calc_id(spec): Encoded hash in hexadecimal format. """ - blob = json.dumps(spec, sort_keys=True) + try: + blob = json.dumps(spec, cls=SCJSONEncoder, sort_keys=True) + except TypeError: + raise KeyTypeError m = hashlib.md5() m.update(blob.encode()) return m.hexdigest() diff --git a/signac/contrib/job.py b/signac/contrib/job.py index f837dfa9e..9738100e0 100644 --- a/signac/contrib/job.py +++ b/signac/contrib/job.py @@ -14,7 +14,6 @@ from ..core.h5store import H5StoreManager from ..core.synced_collections.collection_json import BufferedJSONDict, JSONDict -from ..errors import KeyTypeError from ..sync import sync_jobs from ..version import __version__ from .errors import DestinationExistsError, JobsCorruptedError @@ -87,10 +86,10 @@ def _save(self): # State point modification triggers job migration for all jobs sharing # this state point (shallow copies of a single job). for job in self._jobs: - job.reset_statepoint(self._data) + job.reset_statepoint(self) - def save(self, force): - """Force a save to disk. + def save(self, force=False): + """Trigger a save to disk. Unlike normal JSONDict objects, this class requires the ability to save on command. Moreover, this save must be conditional on whether or not a @@ -120,7 +119,7 @@ def save(self, force): raise error def load(self, job_id): - """Force a load from disk. + """Trigger a load from disk. Unlike normal JSONDict objects, this class requires the ability to load on command. These loads typically occur when the state point @@ -202,10 +201,7 @@ def __init__(self, project, statepoint=None, _id=None): raise ValueError("Either statepoint or _id must be provided.") elif statepoint is not None: self._statepoint = _StatepointDict(jobs=[self], data=statepoint) - try: - self._id = calc_id(self._statepoint._to_base()) if _id is None else _id - except TypeError: - raise KeyTypeError + self._id = calc_id(self._statepoint._to_base()) if _id is None else _id self._statepoint._filename = self._statepoint_filename # Update the project's state point cache immediately if opened by state point @@ -314,23 +310,46 @@ def reset_statepoint(self, new_statepoint): The job's new state point. """ - dst = self._project.open_job(new_statepoint) - if dst == self: + if isinstance(new_statepoint, JSONDict): + new_statepoint = new_statepoint() + + # This technically isn't 100% equivalent to the old logic, because this + # doesn't check workspace equality. However, since the old logic opened + # the new job using self._project it wouldn't actually be possible to + # have two different projects, so checking the id is sufficient. + new_id = calc_id(new_statepoint) + if self._id == new_id: return + # In the old version of the code the loading and saving was all handled + # by the job, but in the new code the _StatepointDict expects to be + # saved in order to perform in-memory modifications prior to any + # disk save operation, so we have to manually reset this here. + # However this also affects directly calling reset_statepoint, so + # we need a way to handle that too. The easiest way is to just make + # reset_statepoint call reset on the _StatepointDict here. + # The only issue with this is that normal modifications of the + # statepoint (e.g. __setitem__ calls) will result in changing the + # statepoint internally, then calling Job.reset_statepoint, which will + # then call this reset method, resulting in effectively changing this + # twice. Hopefully that's not a significant performance hit. + self._statepoint._update(new_statepoint) + tmp_statepoint_file = self.statepoint.filename + "~" + should_init = False try: os.replace(self.statepoint.filename, tmp_statepoint_file) + new_workspace = os.path.join(self._project.workspace(), new_id) try: - os.replace(self.workspace(), dst.workspace()) + os.replace(self.workspace(), new_workspace) except OSError as error: os.replace(tmp_statepoint_file, self.statepoint.filename) # rollback if error.errno in (errno.EEXIST, errno.ENOTEMPTY, errno.EACCES): - raise DestinationExistsError(dst) + raise DestinationExistsError(new_id) else: raise else: - dst.init() + should_init = True except OSError as error: if error.errno == errno.ENOENT: pass # File is not initialized. @@ -338,14 +357,17 @@ def reset_statepoint(self, new_statepoint): raise # Update this instance - self.statepoint._data = dst.statepoint._data - self.statepoint._filename = dst.statepoint._filename - self._id = dst._id + old_id = self._id + self._id = new_id self._wd = None self._document = None self._stores = None self._cwd = [] - logger.info(f"Moved '{self}' -> '{dst}'.") + self._data = None + self._statepoint._filename = self._statepoint_filename + if should_init: + self.init() + logger.info(f"Moved '{old_id}' -> '{new_id}'.") def update_statepoint(self, update, overwrite=False): """Update the state point of this job. From e6fac098837942bdde75d5d6ebecfd1fefa3e56d Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Fri, 22 Jan 2021 14:49:03 -0800 Subject: [PATCH 23/42] Reduce direct accesses of internal attributes and do some simplification of the code. --- signac/contrib/job.py | 52 ++++++++++++++++++++++++++++++------------- 1 file changed, 37 insertions(+), 15 deletions(-) diff --git a/signac/contrib/job.py b/signac/contrib/job.py index 9738100e0..d3ecd85e9 100644 --- a/signac/contrib/job.py +++ b/signac/contrib/job.py @@ -35,7 +35,7 @@ class _StatepointDict(JSONDict): Job directory migrations. """ - _PROTECTED_KEYS = ("_jobs", "_requires_init") + _PROTECTED_KEYS = ("_jobs", "filename") # State points are rarely modified and are not designed for efficient # modification, so they do not support multithreaded execution. # Implementing thread safe modifications would also be quite difficult @@ -64,7 +64,6 @@ def __init__( # these jobs in a shared list here so that shallow copies can point to # the same place and trigger each other to update. self._jobs = list(jobs) - self._requires_init = data is None super().__init__( filename=filename, write_concern=write_concern, @@ -144,6 +143,11 @@ def load(self, job_id): """ try: + # TODO: This method will return None if the file does not exist on + # disk. I think the only time that load should be called is when a + # job is opened by id. If a user opens a job by id that doesn't + # exist, it will result in a KeyError on project.open_job, so I + # don't think that this is a problem, but we should double check. data = self._load_from_resource() except JSONDecodeError: raise JobsCorruptedError([job_id]) @@ -156,6 +160,23 @@ def load(self, job_id): return data + @property + def filename(self): + """str: Get or set the name of the file this collection is synchronized with. + + This property is overridden in order to support setting. In the parent + class the semantics for setting a filename are unclear; since a JSONDict + is always synced to a file, setting a filename might be expected to move + the file if it exists. In this subclass, we can safely define exactly the + behavior required, which is that the filename changes but the file is not + moved. + """ + return self._filename + + @filename.setter + def filename(self, new_filename): + self._filename = new_filename + class Job: """The job instance is a handle to the data of a unique state point. @@ -200,9 +221,10 @@ def __init__(self, project, statepoint=None, _id=None): if statepoint is None and _id is None: raise ValueError("Either statepoint or _id must be provided.") elif statepoint is not None: + self._statepoint_requires_init = False self._statepoint = _StatepointDict(jobs=[self], data=statepoint) - self._id = calc_id(self._statepoint._to_base()) if _id is None else _id - self._statepoint._filename = self._statepoint_filename + self._id = calc_id(self._statepoint()) if _id is None else _id + self._statepoint.filename = self._statepoint_filename # Update the project's state point cache immediately if opened by state point self._project._register(self.id, statepoint) @@ -212,6 +234,7 @@ def __init__(self, project, statepoint=None, _id=None): self._statepoint = _StatepointDict( jobs=[self], filename=self._statepoint_filename ) + self._statepoint_requires_init = True # Prepare job document self._document = None @@ -351,9 +374,7 @@ def reset_statepoint(self, new_statepoint): else: should_init = True except OSError as error: - if error.errno == errno.ENOENT: - pass # File is not initialized. - else: + if error.errno != errno.ENOENT: # OK if file is not initialized. raise # Update this instance @@ -363,8 +384,7 @@ def reset_statepoint(self, new_statepoint): self._document = None self._stores = None self._cwd = [] - self._data = None - self._statepoint._filename = self._statepoint_filename + self._statepoint.filename = self._statepoint_filename if should_init: self.init() logger.info(f"Moved '{old_id}' -> '{new_id}'.") @@ -428,12 +448,12 @@ def statepoint(self): Returns the job's state point. """ - if self._statepoint._requires_init: + if self._statepoint_requires_init: statepoint = self._statepoint.load(self.id) # Update the project's state point cache when loaded lazily self._project._register(self.id, statepoint) - self._statepoint._requires_init = False + self._statepoint_requires_init = False return self._statepoint @@ -637,13 +657,15 @@ def init(self, force=False): ) raise + # The state point save will not overwrite an existing file on + # disk unless force is True, so the subsequent load will catch + # when a preexisting invalid file was present. self._statepoint.save(force=force) - # Re-load as a validation (required to detect invalid data on - # disk). + # TODO: Can we omit this entirely if force=False? statepoint = self._statepoint.load(self.id) - # Update the project's state point cache if the saved file is valid. - self._project._register(self.id, statepoint) + # Update the project's state point cache if the saved file is valid. + self._project._register(self.id, statepoint) except Exception: logger.error( f"State point file of job '{self.id}' appears to be corrupted." From c48c4e8e2a46990d13cc72707c736b0a2149b24a Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Fri, 22 Jan 2021 16:14:57 -0800 Subject: [PATCH 24/42] Reraise errors in JSONCollection. --- signac/core/synced_collections/collection_json.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/signac/core/synced_collections/collection_json.py b/signac/core/synced_collections/collection_json.py index 711186d62..11cd7b060 100644 --- a/signac/core/synced_collections/collection_json.py +++ b/signac/core/synced_collections/collection_json.py @@ -112,6 +112,8 @@ def _load_from_resource(self): except OSError as error: if error.errno == errno.ENOENT: return None + else: + raise error def _save_to_resource(self): """Write the data to JSON file.""" From c3a88ccd91d21b12d0a39e669bdc5da3c62bc592 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Fri, 22 Jan 2021 16:23:36 -0800 Subject: [PATCH 25/42] Change reset to require a non-None argument and to call _update internally. --- signac/contrib/job.py | 4 ---- .../synced_collections/synced_attr_dict.py | 15 ++++----------- signac/core/synced_collections/synced_list.py | 18 ++++++------------ .../synced_collection_test.py | 4 ---- 4 files changed, 10 insertions(+), 31 deletions(-) diff --git a/signac/contrib/job.py b/signac/contrib/job.py index d3ecd85e9..a222ba9cb 100644 --- a/signac/contrib/job.py +++ b/signac/contrib/job.py @@ -336,10 +336,6 @@ def reset_statepoint(self, new_statepoint): if isinstance(new_statepoint, JSONDict): new_statepoint = new_statepoint() - # This technically isn't 100% equivalent to the old logic, because this - # doesn't check workspace equality. However, since the old logic opened - # the new job using self._project it wouldn't actually be possible to - # have two different projects, so checking the id is sufficient. new_id = calc_id(new_statepoint) if self._id == new_id: return diff --git a/signac/core/synced_collections/synced_attr_dict.py b/signac/core/synced_collections/synced_attr_dict.py index c1b61714f..5b2010f0a 100644 --- a/signac/core/synced_collections/synced_attr_dict.py +++ b/signac/core/synced_collections/synced_attr_dict.py @@ -180,29 +180,22 @@ def __setitem__(self, key, value): for key, value in data.items(): self._data[key] = self._from_base(value, parent=self) - def reset(self, data=None): + def reset(self, data): """Update the instance with new data. Parameters ---------- data : mapping - Data to update the instance (Default value = None). + Data to update the instance. Raises ------ ValueError - If the data is not instance of mapping + If the data is not a mapping. """ - if data is None: - data = {} if _mapping_resolver.get_type(data) == "MAPPING": - self._validate(data) - with self._suspend_sync: - self._data = { - key: self._from_base(data=value, parent=self) - for key, value in data.items() - } + self._update(data) with self._thread_lock: self._save() else: diff --git a/signac/core/synced_collections/synced_list.py b/signac/core/synced_collections/synced_list.py index d68a3336e..e7ef83e8b 100644 --- a/signac/core/synced_collections/synced_list.py +++ b/signac/core/synced_collections/synced_list.py @@ -150,30 +150,24 @@ def _update(self, data=None): ) ) - def reset(self, data=None): + def reset(self, data): """Update the instance with new data. Parameters ---------- - data: non-string Sequence, optional - Data to update the instance (Default value = None). + data: non-string Sequence + Data to update the instance. Raises ------ ValueError - If the data is not instance of non-string seqeuence + If the data is not a non-string sequence. """ - if data is None: - data = [] - elif NUMPY and isinstance(data, numpy.ndarray): + if NUMPY and isinstance(data, numpy.ndarray): data = data.tolist() - self._validate(data) if _sequence_resolver.get_type(data) == "SEQUENCE": - with self._suspend_sync: - self._data = [ - self._from_base(data=value, parent=self) for value in data - ] + self._update(data) with self._thread_lock: self._save() else: diff --git a/tests/test_synced_collections/synced_collection_test.py b/tests/test_synced_collections/synced_collection_test.py index 4e72d1da0..7d06b197b 100644 --- a/tests/test_synced_collections/synced_collection_test.py +++ b/tests/test_synced_collections/synced_collection_test.py @@ -208,8 +208,6 @@ def test_reset(self, synced_collection, testdata): synced_collection[key] = testdata assert len(synced_collection) == 1 assert synced_collection[key] == testdata - synced_collection.reset() - assert len(synced_collection) == 0 synced_collection.reset({"reset": "abc"}) assert len(synced_collection) == 1 assert synced_collection[key] == "abc" @@ -544,8 +542,6 @@ def test_reset(self, synced_collection): synced_collection.reset([1, 2, 3]) assert len(synced_collection) == 3 assert synced_collection == [1, 2, 3] - synced_collection.reset() - assert len(synced_collection) == 0 synced_collection.reset([3, 4]) assert len(synced_collection) == 2 assert synced_collection == [3, 4] From 97db9a0c0b68e548c4ebfcbe0206cadcb5b96a32 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Fri, 22 Jan 2021 16:35:56 -0800 Subject: [PATCH 26/42] Add reset_data method to provide clear access points of the _StatepointDict for the Job. --- signac/contrib/job.py | 35 ++++++++++++++++++++++------------- 1 file changed, 22 insertions(+), 13 deletions(-) diff --git a/signac/contrib/job.py b/signac/contrib/job.py index a222ba9cb..fca26c3d1 100644 --- a/signac/contrib/job.py +++ b/signac/contrib/job.py @@ -177,6 +177,22 @@ class the semantics for setting a filename are unclear; since a JSONDict def filename(self, new_filename): self._filename = new_filename + def reset_data(self, data): + """Reset the in-memory data without saving. + + When a statepoint is modified prior to the job being initialized, the + job's reset_statepoint method needs a way to modify the in-memory data + without calling save. This method provides that interface via a transparent + call-through to the internal _update method. + + Parameters + ---------- + data : dict + The new data. + + """ + self._update(data) + class Job: """The job instance is a handle to the data of a unique state point. @@ -340,19 +356,12 @@ def reset_statepoint(self, new_statepoint): if self._id == new_id: return - # In the old version of the code the loading and saving was all handled - # by the job, but in the new code the _StatepointDict expects to be - # saved in order to perform in-memory modifications prior to any - # disk save operation, so we have to manually reset this here. - # However this also affects directly calling reset_statepoint, so - # we need a way to handle that too. The easiest way is to just make - # reset_statepoint call reset on the _StatepointDict here. - # The only issue with this is that normal modifications of the - # statepoint (e.g. __setitem__ calls) will result in changing the - # statepoint internally, then calling Job.reset_statepoint, which will - # then call this reset method, resulting in effectively changing this - # twice. Hopefully that's not a significant performance hit. - self._statepoint._update(new_statepoint) + # Normal modifications of the statepoint (e.g. __setitem__ calls) will + # result in changing the statepoint internally, then calling + # Job.reset_statepoint, which will then call this reset method, + # resulting in two in-memory modifications of the data. Hopefully + # that's not a significant performance hit. + self._statepoint.reset_data(new_statepoint) tmp_statepoint_file = self.statepoint.filename + "~" should_init = False From c2a9fa42cdc3db4136637137bc8e674a2b3e3c7a Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Sat, 23 Jan 2021 10:26:29 -0800 Subject: [PATCH 27/42] Create new internal method for handling resetting. --- signac/contrib/job.py | 30 +++++------------------------- 1 file changed, 5 insertions(+), 25 deletions(-) diff --git a/signac/contrib/job.py b/signac/contrib/job.py index fca26c3d1..65920d30c 100644 --- a/signac/contrib/job.py +++ b/signac/contrib/job.py @@ -41,7 +41,7 @@ class _StatepointDict(JSONDict): # Implementing thread safe modifications would also be quite difficult # because state point modification triggers a migration that moves the # file. Moreover, since shallow copies of jobs share state points to - # trigger id updates, and since Job.reset_statepoint is called within + # trigger id updates, and since Job._reset_statepoint is called within # _StatepointDict._save, the filename will actually change withiin the # context. Since this linkage between the Job and the _StatepointDict # allows the _StatepointDict to be in invalid intermediate states during @@ -85,7 +85,7 @@ def _save(self): # State point modification triggers job migration for all jobs sharing # this state point (shallow copies of a single job). for job in self._jobs: - job.reset_statepoint(self) + job._reset_statepoint(self) def save(self, force=False): """Trigger a save to disk. @@ -177,22 +177,6 @@ class the semantics for setting a filename are unclear; since a JSONDict def filename(self, new_filename): self._filename = new_filename - def reset_data(self, data): - """Reset the in-memory data without saving. - - When a statepoint is modified prior to the job being initialized, the - job's reset_statepoint method needs a way to modify the in-memory data - without calling save. This method provides that interface via a transparent - call-through to the internal _update method. - - Parameters - ---------- - data : dict - The new data. - - """ - self._update(data) - class Job: """The job instance is a handle to the data of a unique state point. @@ -349,6 +333,9 @@ def reset_statepoint(self, new_statepoint): The job's new state point. """ + self._statepoint.reset(new_statepoint) + + def _reset_statepoint(self, new_statepoint): if isinstance(new_statepoint, JSONDict): new_statepoint = new_statepoint() @@ -356,13 +343,6 @@ def reset_statepoint(self, new_statepoint): if self._id == new_id: return - # Normal modifications of the statepoint (e.g. __setitem__ calls) will - # result in changing the statepoint internally, then calling - # Job.reset_statepoint, which will then call this reset method, - # resulting in two in-memory modifications of the data. Hopefully - # that's not a significant performance hit. - self._statepoint.reset_data(new_statepoint) - tmp_statepoint_file = self.statepoint.filename + "~" should_init = False try: From 61ef343251986f98abefafdfa3a902ae7176169c Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Sat, 23 Jan 2021 10:44:21 -0800 Subject: [PATCH 28/42] Move statepoint resetting logic into the statepoint object itself. --- signac/contrib/job.py | 106 ++++++++++++++++++------------------------ 1 file changed, 44 insertions(+), 62 deletions(-) diff --git a/signac/contrib/job.py b/signac/contrib/job.py index 65920d30c..c101cbfdd 100644 --- a/signac/contrib/job.py +++ b/signac/contrib/job.py @@ -41,8 +41,7 @@ class _StatepointDict(JSONDict): # Implementing thread safe modifications would also be quite difficult # because state point modification triggers a migration that moves the # file. Moreover, since shallow copies of jobs share state points to - # trigger id updates, and since Job._reset_statepoint is called within - # _StatepointDict._save, the filename will actually change withiin the + # trigger id updates, so the filename will actually change within the # context. Since this linkage between the Job and the _StatepointDict # allows the _StatepointDict to be in invalid intermediate states during # the process, making the threading work would require somewhat complex and @@ -84,8 +83,47 @@ def _load(self): def _save(self): # State point modification triggers job migration for all jobs sharing # this state point (shallow copies of a single job). + new_id = calc_id(self) + + # All elements of the job list are shallow copies of each other, so any + # one of them is representative. + job = self._jobs[0] + if job._id == new_id: + return + + tmp_statepoint_file = self.filename + "~" + should_init = False + try: + os.replace(self.filename, tmp_statepoint_file) + new_workspace = os.path.join(job._project.workspace(), new_id) + try: + os.replace(job.workspace(), new_workspace) + except OSError as error: + os.replace(tmp_statepoint_file, self.filename) # rollback + if error.errno in (errno.EEXIST, errno.ENOTEMPTY, errno.EACCES): + raise DestinationExistsError(new_id) + else: + raise + else: + should_init = True + except OSError as error: + if error.errno != errno.ENOENT: # OK if file is not initialized. + raise + + # Update each job instance. for job in self._jobs: - job._reset_statepoint(self) + old_id = job._id + job._id = new_id + job._wd = None + job._document = None + job._stores = None + job._cwd = [] + self._filename = job._statepoint_filename + if should_init: + # All the jobs are equivalent, so just init the last one from the + # for loop. + job.init() + logger.info(f"Moved '{old_id}' -> '{new_id}'.") def save(self, force=False): """Trigger a save to disk. @@ -160,23 +198,6 @@ def load(self, job_id): return data - @property - def filename(self): - """str: Get or set the name of the file this collection is synchronized with. - - This property is overridden in order to support setting. In the parent - class the semantics for setting a filename are unclear; since a JSONDict - is always synced to a file, setting a filename might be expected to move - the file if it exists. In this subclass, we can safely define exactly the - behavior required, which is that the filename changes but the file is not - moved. - """ - return self._filename - - @filename.setter - def filename(self, new_filename): - self._filename = new_filename - class Job: """The job instance is a handle to the data of a unique state point. @@ -224,7 +245,7 @@ def __init__(self, project, statepoint=None, _id=None): self._statepoint_requires_init = False self._statepoint = _StatepointDict(jobs=[self], data=statepoint) self._id = calc_id(self._statepoint()) if _id is None else _id - self._statepoint.filename = self._statepoint_filename + self._statepoint._filename = self._statepoint_filename # Update the project's state point cache immediately if opened by state point self._project._register(self.id, statepoint) @@ -335,45 +356,6 @@ def reset_statepoint(self, new_statepoint): """ self._statepoint.reset(new_statepoint) - def _reset_statepoint(self, new_statepoint): - if isinstance(new_statepoint, JSONDict): - new_statepoint = new_statepoint() - - new_id = calc_id(new_statepoint) - if self._id == new_id: - return - - tmp_statepoint_file = self.statepoint.filename + "~" - should_init = False - try: - os.replace(self.statepoint.filename, tmp_statepoint_file) - new_workspace = os.path.join(self._project.workspace(), new_id) - try: - os.replace(self.workspace(), new_workspace) - except OSError as error: - os.replace(tmp_statepoint_file, self.statepoint.filename) # rollback - if error.errno in (errno.EEXIST, errno.ENOTEMPTY, errno.EACCES): - raise DestinationExistsError(new_id) - else: - raise - else: - should_init = True - except OSError as error: - if error.errno != errno.ENOENT: # OK if file is not initialized. - raise - - # Update this instance - old_id = self._id - self._id = new_id - self._wd = None - self._document = None - self._stores = None - self._cwd = [] - self._statepoint.filename = self._statepoint_filename - if should_init: - self.init() - logger.info(f"Moved '{old_id}' -> '{new_id}'.") - def update_statepoint(self, update, overwrite=False): """Update the state point of this job. @@ -409,7 +391,7 @@ def update_statepoint(self, update, overwrite=False): if statepoint.get(key, value) != value: raise KeyError(key) statepoint.update(update) - self.reset_statepoint(statepoint) + self._statepoint.reset(statepoint) @property def statepoint(self): @@ -452,7 +434,7 @@ def statepoint(self, new_statepoint): The new state point to be assigned. """ - self.reset_statepoint(new_statepoint) + self._statepoint.reset(new_statepoint) @property def sp(self): From 4078701dd8a38dad24e45f05d6e86fd3bd529966 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Sat, 23 Jan 2021 10:46:41 -0800 Subject: [PATCH 29/42] Stop accessing internal statepoint filename attribute directly and rely on validation on construction. --- signac/contrib/job.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/signac/contrib/job.py b/signac/contrib/job.py index c101cbfdd..72a1864ad 100644 --- a/signac/contrib/job.py +++ b/signac/contrib/job.py @@ -243,9 +243,10 @@ def __init__(self, project, statepoint=None, _id=None): raise ValueError("Either statepoint or _id must be provided.") elif statepoint is not None: self._statepoint_requires_init = False - self._statepoint = _StatepointDict(jobs=[self], data=statepoint) - self._id = calc_id(self._statepoint()) if _id is None else _id - self._statepoint._filename = self._statepoint_filename + self._id = calc_id(statepoint) if _id is None else _id + self._statepoint = _StatepointDict( + jobs=[self], filename=self._statepoint_filename, data=statepoint + ) # Update the project's state point cache immediately if opened by state point self._project._register(self.id, statepoint) From b2c0696ff8847ea7b0b178d28114cf9762377c45 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Sat, 23 Jan 2021 11:03:34 -0800 Subject: [PATCH 30/42] Make statepoint thread safe. --- signac/contrib/job.py | 26 ++++++++++---------------- 1 file changed, 10 insertions(+), 16 deletions(-) diff --git a/signac/contrib/job.py b/signac/contrib/job.py index 72a1864ad..346b77c5c 100644 --- a/signac/contrib/job.py +++ b/signac/contrib/job.py @@ -36,17 +36,6 @@ class _StatepointDict(JSONDict): """ _PROTECTED_KEYS = ("_jobs", "filename") - # State points are rarely modified and are not designed for efficient - # modification, so they do not support multithreaded execution. - # Implementing thread safe modifications would also be quite difficult - # because state point modification triggers a migration that moves the - # file. Moreover, since shallow copies of jobs share state points to - # trigger id updates, so the filename will actually change within the - # context. Since this linkage between the Job and the _StatepointDict - # allows the _StatepointDict to be in invalid intermediate states during - # the process, making the threading work would require somewhat complex and - # highly specialized handling. - _supports_threading = False def __init__( self, @@ -118,11 +107,16 @@ def _save(self): job._document = None job._stores = None job._cwd = [] - self._filename = job._statepoint_filename + + # Since all the jobs are equivalent, just grab the filename from the + # last one and init it. Also migrate the lock. + old_lock_id = self._lock_id + self._filename = job._statepoint_filename + type(self)._locks[self._lock_id] = type(self)._locks.pop(old_lock_id) + if should_init: - # All the jobs are equivalent, so just init the last one from the - # for loop. job.init() + logger.info(f"Moved '{old_id}' -> '{new_id}'.") def save(self, force=False): @@ -857,8 +851,8 @@ def __exit__(self, err_type, err_value, tb): def __setstate__(self, state): self.__dict__.update(state) - # Note that we append to a list of jobs rather than replacing to - # support transparent id updates between shallow copies of a job. + # We append to a list of jobs rather than replacing to support + # transparent id updates between shallow copies of a job. self.statepoint._jobs.append(self) def __deepcopy__(self, memo): From 98c19d505957d5851a7f89ebb3e778151ad53dad Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Sat, 23 Jan 2021 11:16:46 -0800 Subject: [PATCH 31/42] Some minor cleanup. --- signac/contrib/job.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/signac/contrib/job.py b/signac/contrib/job.py index 346b77c5c..72885009f 100644 --- a/signac/contrib/job.py +++ b/signac/contrib/job.py @@ -83,9 +83,10 @@ def _save(self): tmp_statepoint_file = self.filename + "~" should_init = False try: + # Move the statepoint to an intermediate location as a backup. os.replace(self.filename, tmp_statepoint_file) - new_workspace = os.path.join(job._project.workspace(), new_id) try: + new_workspace = os.path.join(job._project.workspace(), new_id) os.replace(job.workspace(), new_workspace) except OSError as error: os.replace(tmp_statepoint_file, self.filename) # rollback @@ -96,7 +97,10 @@ def _save(self): else: should_init = True except OSError as error: - if error.errno != errno.ENOENT: # OK if file is not initialized. + # The most likely reason we got here is because the state point + # file move failed due to the job not being initialized so the file + # doesn't exist, which is OK. + if error.errno != errno.ENOENT: raise # Update each job instance. @@ -109,7 +113,7 @@ def _save(self): job._cwd = [] # Since all the jobs are equivalent, just grab the filename from the - # last one and init it. Also migrate the lock. + # last one and init it. Also migrate the lock for multithreaded support. old_lock_id = self._lock_id self._filename = job._statepoint_filename type(self)._locks[self._lock_id] = type(self)._locks.pop(old_lock_id) @@ -175,11 +179,6 @@ def load(self, job_id): """ try: - # TODO: This method will return None if the file does not exist on - # disk. I think the only time that load should be called is when a - # job is opened by id. If a user opens a job by id that doesn't - # exist, it will result in a KeyError on project.open_job, so I - # don't think that this is a problem, but we should double check. data = self._load_from_resource() except JSONDecodeError: raise JobsCorruptedError([job_id]) @@ -623,7 +622,6 @@ def init(self, force=False): # disk unless force is True, so the subsequent load will catch # when a preexisting invalid file was present. self._statepoint.save(force=force) - # TODO: Can we omit this entirely if force=False? statepoint = self._statepoint.load(self.id) # Update the project's state point cache if the saved file is valid. From f60a271855e5c98a09f07d0505c9c6ac30c164cc Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Sat, 23 Jan 2021 11:25:08 -0800 Subject: [PATCH 32/42] Remove now unnecessary protection of the filename key. --- signac/contrib/job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/signac/contrib/job.py b/signac/contrib/job.py index 72885009f..efe1a3dff 100644 --- a/signac/contrib/job.py +++ b/signac/contrib/job.py @@ -35,7 +35,7 @@ class _StatepointDict(JSONDict): Job directory migrations. """ - _PROTECTED_KEYS = ("_jobs", "filename") + _PROTECTED_KEYS = ("_jobs",) def __init__( self, From 24378ceca9f0bc23973db2dac241074d8aea211c Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Sat, 23 Jan 2021 12:57:41 -0800 Subject: [PATCH 33/42] Explicitly document behavior of returning None from _load_from_resource. --- signac/core/synced_collections/collection_json.py | 5 +++-- signac/core/synced_collections/collection_mongodb.py | 5 +++-- signac/core/synced_collections/collection_redis.py | 5 +++-- signac/core/synced_collections/collection_zarr.py | 5 +++-- signac/core/synced_collections/synced_collection.py | 8 ++++++-- 5 files changed, 18 insertions(+), 10 deletions(-) diff --git a/signac/core/synced_collections/collection_json.py b/signac/core/synced_collections/collection_json.py index 11cd7b060..2e2f2e151 100644 --- a/signac/core/synced_collections/collection_json.py +++ b/signac/core/synced_collections/collection_json.py @@ -100,9 +100,10 @@ def _load_from_resource(self): Returns ------- - Collection + Collection or None An equivalent unsynced collection satisfying :meth:`is_base_type` that - contains the data in the JSON file. + contains the data in the JSON file. Will return None if the file does + not exist. """ try: diff --git a/signac/core/synced_collections/collection_mongodb.py b/signac/core/synced_collections/collection_mongodb.py index 28126440a..ad983a458 100644 --- a/signac/core/synced_collections/collection_mongodb.py +++ b/signac/core/synced_collections/collection_mongodb.py @@ -53,9 +53,10 @@ def _load_from_resource(self): Returns ------- - Collection + Collection or None An equivalent unsynced collection satisfying :meth:`is_base_type` that - contains the data in the MongoDB database. + contains the data in the MongoDB database. Will return None if no data + was found in the database. """ blob = self._collection.find_one(self._uid) diff --git a/signac/core/synced_collections/collection_redis.py b/signac/core/synced_collections/collection_redis.py index b53fc0554..21df97b9b 100644 --- a/signac/core/synced_collections/collection_redis.py +++ b/signac/core/synced_collections/collection_redis.py @@ -39,9 +39,10 @@ def _load_from_resource(self): Returns ------- - Collection + Collection or None An equivalent unsynced collection satisfying :meth:`is_base_type` that - contains the data in the Redis database. + contains the data in the Redis database. Will return None if no data + was found in the Redis database. """ blob = self._client.get(self._key) diff --git a/signac/core/synced_collections/collection_zarr.py b/signac/core/synced_collections/collection_zarr.py index 45783beb7..99fe02f37 100644 --- a/signac/core/synced_collections/collection_zarr.py +++ b/signac/core/synced_collections/collection_zarr.py @@ -48,9 +48,10 @@ def _load_from_resource(self): Returns ------- - Collection + Collection or None An equivalent unsynced collection satisfying :meth:`is_base_type` that - contains the data in the Zarr group. + contains the data in the Zarr group. Will return None if associated + data is not found in the Zarr group. """ try: diff --git a/signac/core/synced_collections/synced_collection.py b/signac/core/synced_collections/synced_collection.py index fb9151467..001f5e23d 100644 --- a/signac/core/synced_collections/synced_collection.py +++ b/signac/core/synced_collections/synced_collection.py @@ -340,11 +340,15 @@ def is_base_type(cls, data): def _load_from_resource(self): """Load data from underlying backend. - This method must be implemented for each backend. + This method must be implemented for each backend. Backends may choose + to return ``None``, signaling that no modification should be performed + on the data in memory. This mode is useful for backends where the underlying + resource (e.g. a file) may not initially exist, but can be transparently + created on save. Returns ------- - Collection + Collection or None An equivalent unsynced collection satisfying :meth:`is_base_type` that contains the data in the underlying resource (e.g. a file). From e70c2f021dace684b95a331f778240ba794b91e5 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Sat, 23 Jan 2021 14:05:22 -0800 Subject: [PATCH 34/42] Apply suggestions from code review Co-authored-by: Bradley Dice --- signac/contrib/job.py | 13 +++++++------ signac/core/synced_collections/synced_list.py | 2 +- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/signac/contrib/job.py b/signac/contrib/job.py index efe1a3dff..691451a2a 100644 --- a/signac/contrib/job.py +++ b/signac/contrib/job.py @@ -32,7 +32,7 @@ class _StatepointDict(JSONDict): they never need to load from disk _except_ the very first time a job is opened by id and they're not present in the cache. 3. It must be possible to load and/or save on demand during tasks like - Job directory migrations. + job directory migrations. """ _PROTECTED_KEYS = ("_jobs",) @@ -173,7 +173,7 @@ def load(self, job_id): Raises ------ - JobsCorruptedError + :class:`~signac.errors.JobsCorruptedError` If the data on disk is invalid or its hash does not match the job id. @@ -410,6 +410,7 @@ def statepoint(self): """ if self._statepoint_requires_init: + # Load state point data lazily (on access). statepoint = self._statepoint.load(self.id) # Update the project's state point cache when loaded lazily @@ -449,8 +450,8 @@ def document(self): Even deep copies of :attr:`~Job.document` will modify the same file, so changes will still effectively be persisted between deep copies. If you need a deep copy that will not modify the underlying - persistent JSON file, use the call operator to get an (otherwise - equivalent) raw dictionary: ``job.document()``. + persistent JSON file, use the call operator to get an equivalent + plain dictionary: ``job.document()``. For more information, see :class:`~signac.core.synced_collections.collection_json.BufferedJSONDict`. @@ -489,8 +490,8 @@ def doc(self): Even deep copies of :attr:`~Job.doc` will modify the same file, so changes will still effectively be persisted between deep copies. If you need a deep copy that will not modify the underlying - persistent JSON file, use the call operator to get an (otherwise - equivalent) raw dictionary: ``job.doc()``. + persistent JSON file, use the call operator to get an equivalent + plain dictionary: ``job.doc()``. See :ref:`signac document ` for the command line equivalent. diff --git a/signac/core/synced_collections/synced_list.py b/signac/core/synced_collections/synced_list.py index e7ef83e8b..bd3235379 100644 --- a/signac/core/synced_collections/synced_list.py +++ b/signac/core/synced_collections/synced_list.py @@ -155,7 +155,7 @@ def reset(self, data): Parameters ---------- - data: non-string Sequence + data : non-string Sequence Data to update the instance. Raises From 5689aca87909ff236204547f8b2f47fd7d610bca Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Sat, 23 Jan 2021 14:10:01 -0800 Subject: [PATCH 35/42] Rename SCJSONEncoder to SyncedCollectionJSONEncoder. --- signac/contrib/hashing.py | 4 ++-- .../core/synced_collections/collection_json.py | 4 ++-- .../serialized_file_buffered_collection.py | 4 ++-- signac/core/synced_collections/utils.py | 2 +- tests/test_synced_collections/test_utils.py | 16 ++++++++++------ 5 files changed, 17 insertions(+), 13 deletions(-) diff --git a/signac/contrib/hashing.py b/signac/contrib/hashing.py index 9a1b391fe..8d6fea032 100644 --- a/signac/contrib/hashing.py +++ b/signac/contrib/hashing.py @@ -6,7 +6,7 @@ import hashlib import json -from ..core.synced_collections.utils import SCJSONEncoder +from ..core.synced_collections.utils import SyncedCollectionJSONEncoder from ..errors import KeyTypeError # We must use the standard library json for exact consistency in formatting @@ -31,7 +31,7 @@ def calc_id(spec): """ try: - blob = json.dumps(spec, cls=SCJSONEncoder, sort_keys=True) + blob = json.dumps(spec, cls=SyncedCollectionJSONEncoder, sort_keys=True) except TypeError: raise KeyTypeError m = hashlib.md5() diff --git a/signac/core/synced_collections/collection_json.py b/signac/core/synced_collections/collection_json.py index 2e2f2e151..2a3b4b9b7 100644 --- a/signac/core/synced_collections/collection_json.py +++ b/signac/core/synced_collections/collection_json.py @@ -15,7 +15,7 @@ from .synced_attr_dict import SyncedAttrDict from .synced_collection import SyncedCollection from .synced_list import SyncedList -from .utils import SCJSONEncoder +from .utils import SyncedCollectionJSONEncoder from .validators import json_format_validator @@ -119,7 +119,7 @@ def _load_from_resource(self): def _save_to_resource(self): """Write the data to JSON file.""" # Serialize data - blob = json.dumps(self, cls=SCJSONEncoder).encode() + blob = json.dumps(self, cls=SyncedCollectionJSONEncoder).encode() # When write_concern flag is set, we write the data into dummy file and then # replace that file with original file. We also enable this mode # irrespective of the write_concern flag if we're running in diff --git a/signac/core/synced_collections/serialized_file_buffered_collection.py b/signac/core/synced_collections/serialized_file_buffered_collection.py index 43749870c..8601b54e9 100644 --- a/signac/core/synced_collections/serialized_file_buffered_collection.py +++ b/signac/core/synced_collections/serialized_file_buffered_collection.py @@ -13,7 +13,7 @@ from .errors import MetadataError from .file_buffered_collection import FileBufferedCollection -from .utils import SCJSONEncoder +from .utils import SyncedCollectionJSONEncoder class SerializedFileBufferedCollection(FileBufferedCollection): @@ -168,7 +168,7 @@ def _encode(data): The underlying encoded data. """ - return json.dumps(data, cls=SCJSONEncoder).encode() + return json.dumps(data, cls=SyncedCollectionJSONEncoder).encode() @staticmethod def _decode(blob): diff --git a/signac/core/synced_collections/utils.py b/signac/core/synced_collections/utils.py index 882171c69..ba677ef86 100644 --- a/signac/core/synced_collections/utils.py +++ b/signac/core/synced_collections/utils.py @@ -114,7 +114,7 @@ def default(o: Any) -> Dict[str, Any]: # noqa: D102 raise TypeError from e -class SCJSONEncoder(JSONEncoder): +class SyncedCollectionJSONEncoder(JSONEncoder): """A JSONEncoder capable of encoding SyncedCollections and other supported types. This encoder will attempt to obtain a JSON-serializable representation of diff --git a/tests/test_synced_collections/test_utils.py b/tests/test_synced_collections/test_utils.py index d0b398575..efa2e25bd 100644 --- a/tests/test_synced_collections/test_utils.py +++ b/tests/test_synced_collections/test_utils.py @@ -10,7 +10,10 @@ from signac.core.synced_collections.collection_json import JSONDict from signac.core.synced_collections.synced_list import SyncedList -from signac.core.synced_collections.utils import AbstractTypeResolver, SCJSONEncoder +from signac.core.synced_collections.utils import ( + AbstractTypeResolver, + SyncedCollectionJSONEncoder, +) try: import numpy @@ -52,7 +55,7 @@ def encode_flat_dict(d): # Raw dictionaries should be encoded transparently. data = {"foo": 1, "bar": 2, "baz": 3} assert json.dumps(data) == encode_flat_dict(data) - assert json.dumps(data, cls=SCJSONEncoder) == json.dumps(data) + assert json.dumps(data, cls=SyncedCollectionJSONEncoder) == json.dumps(data) with TemporaryDirectory() as tmp_dir: fn = os.path.join(tmp_dir, "test_json_encoding.json") @@ -60,14 +63,15 @@ def encode_flat_dict(d): synced_data.update(data) with pytest.raises(TypeError): json.dumps(synced_data) - assert json.dumps(synced_data, cls=SCJSONEncoder) == encode_flat_dict( - synced_data - ) + assert json.dumps( + synced_data, cls=SyncedCollectionJSONEncoder + ) == encode_flat_dict(synced_data) if NUMPY: array = numpy.random.rand(3) synced_data["foo"] = array assert isinstance(synced_data["foo"], SyncedList) assert ( - json.loads(json.dumps(synced_data, cls=SCJSONEncoder)) == synced_data() + json.loads(json.dumps(synced_data, cls=SyncedCollectionJSONEncoder)) + == synced_data() ) From 4692e7fe39ace0782c6fe394d503e51224f4c497 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Sat, 23 Jan 2021 14:12:01 -0800 Subject: [PATCH 36/42] Only access old id once. --- signac/contrib/job.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/signac/contrib/job.py b/signac/contrib/job.py index 691451a2a..6fa25445b 100644 --- a/signac/contrib/job.py +++ b/signac/contrib/job.py @@ -77,7 +77,8 @@ def _save(self): # All elements of the job list are shallow copies of each other, so any # one of them is representative. job = self._jobs[0] - if job._id == new_id: + old_id = job._id + if old_id == new_id: return tmp_statepoint_file = self.filename + "~" @@ -105,7 +106,6 @@ def _save(self): # Update each job instance. for job in self._jobs: - old_id = job._id job._id = new_id job._wd = None job._document = None From 3c40dc8943b2d71127bae34826f623b9c97c4483 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Sat, 23 Jan 2021 14:29:37 -0800 Subject: [PATCH 37/42] Move lazy attribute initialization into one location. --- signac/contrib/job.py | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/signac/contrib/job.py b/signac/contrib/job.py index 6fa25445b..7dca34e01 100644 --- a/signac/contrib/job.py +++ b/signac/contrib/job.py @@ -107,10 +107,7 @@ def _save(self): # Update each job instance. for job in self._jobs: job._id = new_id - job._wd = None - job._document = None - job._stores = None - job._cwd = [] + job._initialize_lazy_properties() # Since all the jobs are equivalent, just grab the filename from the # last one and init it. Also migrate the lock for multithreaded support. @@ -228,9 +225,7 @@ class Job: def __init__(self, project, statepoint=None, _id=None): self._project = project - - # Prepare wd in advance so that the attribute exists in checks below. - self._wd = None + self._initialize_lazy_properties() if statepoint is None and _id is None: raise ValueError("Either statepoint or _id must be provided.") @@ -251,13 +246,11 @@ def __init__(self, project, statepoint=None, _id=None): ) self._statepoint_requires_init = True - # Prepare job document + def _initialize_lazy_properties(self): + """Initialize all properties that are designed to be loaded lazily.""" + self._wd = None self._document = None - - # Prepare job H5StoreManager self._stores = None - - # Prepare current working directory for context management self._cwd = [] @deprecated( From 0facff5af8e202c00828136d720466cbf1b870a1 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Sun, 24 Jan 2021 14:16:52 -0800 Subject: [PATCH 38/42] Address PR requests that don't cause any issues. --- signac/__init__.py | 2 +- signac/contrib/hashing.py | 6 +-- signac/contrib/job.py | 52 ++++++++++++++----- .../synced_collections/collection_json.py | 2 +- tests/test_job.py | 7 ++- 5 files changed, 47 insertions(+), 22 deletions(-) diff --git a/signac/__init__.py b/signac/__init__.py index 72bf15c87..a274fc217 100644 --- a/signac/__init__.py +++ b/signac/__init__.py @@ -26,10 +26,10 @@ ) from .contrib import filesystems as fs from .contrib import get_job, get_project, index, index_files, init_project +from .contrib.job import get_buffer_load, get_buffer_size from .core.h5store import H5Store, H5StoreManager from .core.jsondict import JSONDict from .core.jsondict import flush_all as flush -from .core.jsondict import get_buffer_load, get_buffer_size from .core.synced_collections.buffered_collection import buffer_all as buffered from .core.synced_collections.buffered_collection import is_buffered from .db import get_database diff --git a/signac/contrib/hashing.py b/signac/contrib/hashing.py index 8d6fea032..36d9970d2 100644 --- a/signac/contrib/hashing.py +++ b/signac/contrib/hashing.py @@ -7,7 +7,6 @@ import json from ..core.synced_collections.utils import SyncedCollectionJSONEncoder -from ..errors import KeyTypeError # We must use the standard library json for exact consistency in formatting @@ -30,10 +29,7 @@ def calc_id(spec): Encoded hash in hexadecimal format. """ - try: - blob = json.dumps(spec, cls=SyncedCollectionJSONEncoder, sort_keys=True) - except TypeError: - raise KeyTypeError + blob = json.dumps(spec, cls=SyncedCollectionJSONEncoder, sort_keys=True) m = hashlib.md5() m.update(blob.encode()) return m.hexdigest() diff --git a/signac/contrib/job.py b/signac/contrib/job.py index 7dca34e01..85dff52e8 100644 --- a/signac/contrib/job.py +++ b/signac/contrib/job.py @@ -14,6 +14,7 @@ from ..core.h5store import H5StoreManager from ..core.synced_collections.collection_json import BufferedJSONDict, JSONDict +from ..errors import KeyTypeError from ..sync import sync_jobs from ..version import __version__ from .errors import DestinationExistsError, JobsCorruptedError @@ -23,12 +24,12 @@ logger = logging.getLogger(__name__) -class _StatepointDict(JSONDict): - """A JSON-backed dictionary for storing job statepoints. +class _StatePointDict(JSONDict): + """A JSON-backed dictionary for storing job state points. There are three principal reasons for extending the base JSONDict: 1. Saving needs to trigger a job directory migration, and - 2. Statepoints are assumed to not support external modification, so + 2. State points are assumed to not support external modification, so they never need to load from disk _except_ the very first time a job is opened by id and they're not present in the cache. 3. It must be possible to load and/or save on demand during tasks like @@ -47,10 +48,13 @@ def __init__( *args, **kwargs, ): - # A job-statepoint mapping need not be unique because multiple Python - # Job objects can point to the same data on disk. We need to store - # these jobs in a shared list here so that shallow copies can point to - # the same place and trigger each other to update. + # Multiple Python Job objects can share a single `_StatepointDict` + # instance because they are shallow copies referring to the same data + # on disk. We need to store these jobs in a shared list here so that + # shallow copies can point to the same place and trigger each other to + # update. This does not apply to independently created Job objects, + # even if they refer to the same disk data; this only applies to + # explicit shallow copies and unpickled objects within a session. self._jobs = list(jobs) super().__init__( filename=filename, @@ -76,7 +80,7 @@ def _save(self): # All elements of the job list are shallow copies of each other, so any # one of them is representative. - job = self._jobs[0] + job = next(iter(self._jobs)) old_id = job._id if old_id == new_id: return @@ -84,7 +88,7 @@ def _save(self): tmp_statepoint_file = self.filename + "~" should_init = False try: - # Move the statepoint to an intermediate location as a backup. + # Move the state point to an intermediate location as a backup. os.replace(self.filename, tmp_statepoint_file) try: new_workspace = os.path.join(job._project.workspace(), new_id) @@ -96,6 +100,7 @@ def _save(self): else: raise else: + # os.remove(self.filename + "~") should_init = True except OSError as error: # The most likely reason we got here is because the state point @@ -116,6 +121,14 @@ def _save(self): type(self)._locks[self._lock_id] = type(self)._locks.pop(old_lock_id) if should_init: + # Only initializing one job assumes that all changes in init are + # changes reflected in the underlying resource (the JSON file). + # This assumption is currently valid because all in-memory + # attributes are loaded lazily (and are handled by the call to + # _initialize_lazy_properties above), except for the key defining + # property of the job id (which is also updated above). If init + # ever changes to making modifications to the job object, we may + # need to call it for all jobs. job.init() logger.info(f"Moved '{old_id}' -> '{new_id}'.") @@ -148,7 +161,7 @@ def save(self, force=False): os.remove(self._filename) except Exception: # ignore all errors here pass - raise error + raise def load(self, job_id): """Trigger a load from disk. @@ -231,8 +244,11 @@ def __init__(self, project, statepoint=None, _id=None): raise ValueError("Either statepoint or _id must be provided.") elif statepoint is not None: self._statepoint_requires_init = False - self._id = calc_id(statepoint) if _id is None else _id - self._statepoint = _StatepointDict( + try: + self._id = calc_id(statepoint) if _id is None else _id + except TypeError: + raise KeyTypeError + self._statepoint = _StatePointDict( jobs=[self], filename=self._statepoint_filename, data=statepoint ) @@ -241,7 +257,7 @@ def __init__(self, project, statepoint=None, _id=None): else: # Only an id was provided. State point will be loaded lazily. self._id = _id - self._statepoint = _StatepointDict( + self._statepoint = _StatePointDict( jobs=[self], filename=self._statepoint_filename ) self._statepoint_requires_init = True @@ -854,3 +870,13 @@ def __deepcopy__(self, memo): for key, value in self.__dict__.items(): setattr(result, key, deepcopy(value, memo)) return result + + +def get_buffer_load(): + """Get the actual size of the buffer.""" + return BufferedJSONDict.get_buffer_size() + + +def get_buffer_size(): + """Get the maximum available capacity of the buffer.""" + return BufferedJSONDict.get_buffer_capacity() diff --git a/signac/core/synced_collections/collection_json.py b/signac/core/synced_collections/collection_json.py index 2a3b4b9b7..5812cd916 100644 --- a/signac/core/synced_collections/collection_json.py +++ b/signac/core/synced_collections/collection_json.py @@ -114,7 +114,7 @@ def _load_from_resource(self): if error.errno == errno.ENOENT: return None else: - raise error + raise def _save_to_resource(self): """Write the data to JSON file.""" diff --git a/tests/test_job.py b/tests/test_job.py index d2b2db898..e49a2ae4d 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -446,11 +446,14 @@ class A: assert str(key) in job.sp def test_invalid_sp_key_types(self): - job = self.open_job(dict(invalid_key=True)).init() - class A: pass + with pytest.raises(KeyTypeError): + self.open_job({A(): True}).init() + + job = self.open_job(dict(invalid_key=True)).init() + for key in (0.0, A(), (1, 2, 3)): with pytest.raises(KeyTypeError): job.sp[key] = "test" From 22b3ae8c4761eee4c7d9faaf6c2dfc3e31765b52 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Sun, 24 Jan 2021 14:24:40 -0800 Subject: [PATCH 39/42] Remove the temporary state point file backup. --- signac/contrib/job.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/signac/contrib/job.py b/signac/contrib/job.py index 85dff52e8..330cb5bf2 100644 --- a/signac/contrib/job.py +++ b/signac/contrib/job.py @@ -100,7 +100,6 @@ def _save(self): else: raise else: - # os.remove(self.filename + "~") should_init = True except OSError as error: # The most likely reason we got here is because the state point @@ -114,6 +113,14 @@ def _save(self): job._id = new_id job._initialize_lazy_properties() + # Remove the temporary statepoint file if it was created. Have to do it + # here because we need to get the updated job statepoint filename. + try: + os.remove(job._statepoint_filename + "~") + except OSError as error: + if error.errno != errno.ENOENT: + raise + # Since all the jobs are equivalent, just grab the filename from the # last one and init it. Also migrate the lock for multithreaded support. old_lock_id = self._lock_id From ab11d7d2741531c21fe02ba746d374cce24ca770 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Sun, 24 Jan 2021 14:48:06 -0800 Subject: [PATCH 40/42] Make as many old buffer tests as possible. --- signac/__init__.py | 3 ++- signac/contrib/job.py | 7 ++++++- tests/test_buffered_mode.py | 23 +++++++---------------- 3 files changed, 15 insertions(+), 18 deletions(-) diff --git a/signac/__init__.py b/signac/__init__.py index a274fc217..69ea0e0a8 100644 --- a/signac/__init__.py +++ b/signac/__init__.py @@ -26,7 +26,7 @@ ) from .contrib import filesystems as fs from .contrib import get_job, get_project, index, index_files, init_project -from .contrib.job import get_buffer_load, get_buffer_size +from .contrib.job import get_buffer_load, get_buffer_size, set_buffer_size from .core.h5store import H5Store, H5StoreManager from .core.jsondict import JSONDict from .core.jsondict import flush_all as flush @@ -69,6 +69,7 @@ "flush", "get_buffer_size", "get_buffer_load", + "set_buffer_size", "JSONDict", "H5Store", "H5StoreManager", diff --git a/signac/contrib/job.py b/signac/contrib/job.py index 330cb5bf2..5799021df 100644 --- a/signac/contrib/job.py +++ b/signac/contrib/job.py @@ -881,9 +881,14 @@ def __deepcopy__(self, memo): def get_buffer_load(): """Get the actual size of the buffer.""" - return BufferedJSONDict.get_buffer_size() + return BufferedJSONDict.get_current_buffer_size() def get_buffer_size(): """Get the maximum available capacity of the buffer.""" return BufferedJSONDict.get_buffer_capacity() + + +def set_buffer_size(new_size): + """Set the maximum available capacity of the buffer.""" + return BufferedJSONDict.set_buffer_capacity(new_size) diff --git a/tests/test_buffered_mode.py b/tests/test_buffered_mode.py index 3f27ca03f..575c18d00 100644 --- a/tests/test_buffered_mode.py +++ b/tests/test_buffered_mode.py @@ -13,7 +13,8 @@ from test_project import TestProjectBase import signac -from signac.errors import BufferedFileError, BufferException, Error +from signac.core.synced_collections.errors import BufferedError +from signac.errors import BufferedFileError, Error PYPY = "PyPy" in platform.python_implementation() @@ -144,35 +145,25 @@ def test_force_write_mode_with_permission_error(self): os.chmod(path, mode) assert job.doc.a == x - @pytest.mark.xfail(reason="This API for setting the buffer size is deprecated.") def test_buffered_mode_change_buffer_size(self): assert not signac.is_buffered() - with signac.buffered(buffer_size=12): + signac.set_buffer_size(12) + with signac.buffered(): assert signac.buffered() assert signac.get_buffer_size() == 12 assert not signac.is_buffered() - with pytest.raises(TypeError): - with signac.buffered(buffer_size=True): - pass assert not signac.is_buffered() - with signac.buffered(buffer_size=12): + with signac.buffered(): assert signac.buffered() assert signac.get_buffer_size() == 12 - with signac.buffered(buffer_size=12): + with signac.buffered(): assert signac.buffered() assert signac.get_buffer_size() == 12 assert not signac.is_buffered() - with pytest.raises(BufferException): - with signac.buffered(buffer_size=12): - assert signac.buffered() - assert signac.get_buffer_size() == 12 - with signac.buffered(buffer_size=14): - pass - @pytest.mark.xfail(reason="This test uses various deprecated APIs.") def test_integration(self): def routine(): for i in range(1, 4): @@ -248,7 +239,7 @@ def routine(): assert job2.doc.a == (not x) assert job.doc.a == (not x) - with pytest.raises(BufferedFileError) as cm: + with pytest.raises(BufferedError) as cm: with signac.buffered(): assert job.doc.a == (not x) job.doc.a = x From dfa7c4cffc50389d3f8504814509920e911683b8 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Sun, 24 Jan 2021 14:57:35 -0800 Subject: [PATCH 41/42] Reset buffer size after test. --- tests/test_buffered_mode.py | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/tests/test_buffered_mode.py b/tests/test_buffered_mode.py index 575c18d00..c8cf71ec1 100644 --- a/tests/test_buffered_mode.py +++ b/tests/test_buffered_mode.py @@ -146,23 +146,27 @@ def test_force_write_mode_with_permission_error(self): assert job.doc.a == x def test_buffered_mode_change_buffer_size(self): - assert not signac.is_buffered() - signac.set_buffer_size(12) - with signac.buffered(): - assert signac.buffered() - assert signac.get_buffer_size() == 12 + original_buffer_size = signac.get_buffer_size() + try: + assert not signac.is_buffered() + signac.set_buffer_size(12) + with signac.buffered(): + assert signac.buffered() + assert signac.get_buffer_size() == 12 - assert not signac.is_buffered() + assert not signac.is_buffered() - assert not signac.is_buffered() - with signac.buffered(): - assert signac.buffered() - assert signac.get_buffer_size() == 12 + assert not signac.is_buffered() with signac.buffered(): assert signac.buffered() assert signac.get_buffer_size() == 12 + with signac.buffered(): + assert signac.buffered() + assert signac.get_buffer_size() == 12 - assert not signac.is_buffered() + assert not signac.is_buffered() + finally: + signac.set_buffer_size(original_buffer_size) def test_integration(self): def routine(): From d71a49cdec34b76d12bffc7907671199b3d388e2 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Mon, 25 Jan 2021 08:18:56 -0800 Subject: [PATCH 42/42] Last changes from PR review. --- signac/__init__.py | 2 +- signac/contrib/job.py | 6 +++--- tests/test_buffered_mode.py | 3 +++ 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/signac/__init__.py b/signac/__init__.py index 69ea0e0a8..64721eec8 100644 --- a/signac/__init__.py +++ b/signac/__init__.py @@ -28,10 +28,10 @@ from .contrib import get_job, get_project, index, index_files, init_project from .contrib.job import get_buffer_load, get_buffer_size, set_buffer_size from .core.h5store import H5Store, H5StoreManager -from .core.jsondict import JSONDict from .core.jsondict import flush_all as flush from .core.synced_collections.buffered_collection import buffer_all as buffered from .core.synced_collections.buffered_collection import is_buffered +from .core.synced_collections.collection_json import JSONDict from .db import get_database from .diff import diff_jobs from .version import __version__ diff --git a/signac/contrib/job.py b/signac/contrib/job.py index 5799021df..fbf3d40a1 100644 --- a/signac/contrib/job.py +++ b/signac/contrib/job.py @@ -48,7 +48,7 @@ def __init__( *args, **kwargs, ): - # Multiple Python Job objects can share a single `_StatepointDict` + # Multiple Python Job objects can share a single `_StatePointDict` # instance because they are shallow copies referring to the same data # on disk. We need to store these jobs in a shared list here so that # shallow copies can point to the same place and trigger each other to @@ -113,8 +113,8 @@ def _save(self): job._id = new_id job._initialize_lazy_properties() - # Remove the temporary statepoint file if it was created. Have to do it - # here because we need to get the updated job statepoint filename. + # Remove the temporary state point file if it was created. Have to do it + # here because we need to get the updated job state point filename. try: os.remove(job._statepoint_filename + "~") except OSError as error: diff --git a/tests/test_buffered_mode.py b/tests/test_buffered_mode.py index c8cf71ec1..15d60a429 100644 --- a/tests/test_buffered_mode.py +++ b/tests/test_buffered_mode.py @@ -73,6 +73,7 @@ def test_basic_and_nested(self): assert job.doc.a == 2 assert job.doc.a == 2 + # Remove this test in signac 2.0. @pytest.mark.xfail( reason="The new SyncedCollection does not implement force_write." ) @@ -92,6 +93,7 @@ def test_buffered_mode_force_write(self): pass assert not signac.is_buffered() + # Remove this test in signac 2.0. @pytest.mark.xfail( reason="The new SyncedCollection does not implement force_write." ) @@ -121,6 +123,7 @@ def test_buffered_mode_force_write_with_file_modification(self): file.write(json.dumps({"a": x}).encode()) assert job.doc.a == (not x) + # Remove this test in signac 2.0. @pytest.mark.xfail( reason="The new SyncedCollection does not implement force_write." )