From bca2d55757fafb7c416fe45de9b47a8bb34b7ec0 Mon Sep 17 00:00:00 2001 From: "Marcel R." Date: Sat, 14 Dec 2024 09:28:10 +0100 Subject: [PATCH] Refactor target collection state checks. --- law/contrib/gfal/target.py | 2 - law/target/collection.py | 162 ++++++++++++++++++++++--------------- 2 files changed, 98 insertions(+), 66 deletions(-) diff --git a/law/contrib/gfal/target.py b/law/contrib/gfal/target.py index 70fb1423..30aeb03f 100644 --- a/law/contrib/gfal/target.py +++ b/law/contrib/gfal/target.py @@ -9,7 +9,6 @@ import os import sys -import gc import contextlib import stat as _stat @@ -108,7 +107,6 @@ def context(self): finally: if self.atomic_contexts and pid in self._contexts: del self._contexts[pid] - gc.collect() @contextlib.contextmanager def transfer_parameters(self, ctx): diff --git a/law/target/collection.py b/law/target/collection.py index cb12955b..28f9f5c6 100644 --- a/law/target/collection.py +++ b/law/target/collection.py @@ -11,8 +11,9 @@ import types import random -from abc import abstractmethod +from functools import partial from contextlib import contextmanager +from collections import defaultdict, deque import six @@ -85,22 +86,30 @@ def _iter_flat(self): for key, targets in gen: yield (key, targets) - def _iter_state(self, existing=True, optional_existing=None, keys=False, unpack=True): + def _iter_state( + self, + existing=True, + optional_existing=None, + keys=False, + unpack=True, + exists_func=None, + ): existing = bool(existing) if optional_existing is not None: optional_existing = bool(optional_existing) # helper to check for existence - def exists(t): - if optional_existing is not None and t.optional: - return optional_existing - if isinstance(t, TargetCollection): - return t.exists(optional_existing=optional_existing) - return t.exists() + if exists_func is None: + def exists_func(t): + if optional_existing is not None and t.optional: + return optional_existing + if isinstance(t, TargetCollection): + return t.exists(optional_existing=optional_existing) + return t.exists() # loop and yield for key, targets in self._iter_flat(): - state = all(exists(t) for t in targets) + state = all(map(exists_func, targets)) if state is existing: if unpack: targets = self.targets[key] @@ -147,7 +156,7 @@ def complete(self, **kwargs): return self.optional or self.exists(**kwargs) def _exists_fwd(self, **kwargs): - fwd = ["optional_existing"] + fwd = ["optional_existing", "exists_func"] return self.exists(**{key: kwargs[key] for key in fwd if key in kwargs}) def exists(self, **kwargs): @@ -178,7 +187,7 @@ def count(self, **kwargs): target_keys = [key for key, _ in self._iter_state(**kwargs)] n = len(target_keys) - return n if not keys else (n, target_keys) + return (n, target_keys) if keys else n def random_target(self): if isinstance(self.targets, (list, tuple)): @@ -268,15 +277,41 @@ class SiblingFileCollectionBase(FileCollection): Base class for file collections whose elements are located in the same directory (siblings). """ + @classmethod + def _exists_in_basenames(cls, target, basenames, optional_existing, target_dirs): + if optional_existing is not None and target.optional: + return optional_existing + if isinstance(target, SiblingFileCollectionBase): + return target._exists_fwd( + basenames=basenames, + optional_existing=optional_existing, + ) + if isinstance(target, TargetCollection): + return target.exists(exists_func=partial( + cls._exists_in_basenames, + basenames=basenames, + optional_existing=optional_existing, + target_dirs=target_dirs, + )) + if isinstance(basenames, dict): + if target_dirs and target in target_dirs: + basenames = basenames[target_dirs[target]] + else: + # need to find find the collection manually, that could possibly contain the target, + # then use its basenames + for col, _basenames in basenames.items(): + if col._exists_in_dir(target): + basenames = _basenames + break + else: + return False + return target.basename in basenames + def remove(self, silent=True): for targets in self.iter_existing(unpack=False): for t in targets: t.remove(silent=silent) - @abstractmethod - def _exists_fwd(self, **kwargs): - return - class SiblingFileCollection(SiblingFileCollectionBase): """ @@ -344,42 +379,39 @@ def _iter_state( basenames=None, keys=False, unpack=True, + exists_func=None, ): - existing = bool(existing) - if optional_existing is not None: - optional_existing = bool(optional_existing) - # the directory must exist if not self.dir.exists(): return - # get the basenames of all elements of the directory + existing = bool(existing) + if optional_existing is not None: + optional_existing = bool(optional_existing) + + # get all basenames if basenames is None: - basenames = self.dir.listdir() + basenames = self.dir.listdir() if self.dir.exists() else [] # helper to check for existence - def exists(t): - if optional_existing is not None and t.optional: - return optional_existing - if isinstance(t, SiblingFileCollectionBase): - return t._exists_fwd( - basenames=basenames, - optional_existing=optional_existing, - ) - if isinstance(t, TargetCollection): - return all(exists(_t) for _t in flatten_collections(t)) - return t.basename in basenames + if exists_func is None: + exists_func = partial( + self._exists_in_basenames, + basenames=basenames, + optional_existing=optional_existing, + target_dirs=None, + ) # loop and yield for key, targets in self._iter_flat(): - state = all(exists(t) for t in targets) + state = all(map(exists_func, targets)) if state is existing: if unpack: targets = self.targets[key] yield (key, targets) if keys else targets def _exists_fwd(self, **kwargs): - fwd = ["basenames", "optional_existing"] + fwd = ["optional_existing", "basenames", "exists_func"] return self.exists(**{key: kwargs[key] for key in fwd if key in kwargs}) @@ -402,27 +434,21 @@ def __init__(self, *args, **kwargs): # _flat_target_list attributes, but store them again in sibling file collections to speed up # some methods by grouping them into targets in the same physical directory self.collections = [] - self._flat_target_collections = {} - grouped_targets = {} + self._flat_target_dirs = {} + grouped_targets = defaultdict(list) for t in flatten_collections(self._flat_target_list): - grouped_targets.setdefault(t.parent.uri(), []).append(t) + grouped_targets[t.parent.uri()].append(t) for targets in grouped_targets.values(): # create and store the collection collection = SiblingFileCollection(targets) self.collections.append(collection) - # remember the collection per target + # remember the absolute collection dir per target for faster loopups later for t in targets: - self._flat_target_collections[t] = collection + self._flat_target_dirs[t] = collection.dir.abspath def _repr_pairs(self): return SiblingFileCollectionBase._repr_pairs(self) + [("collections", len(self.collections))] - def _get_basenames(self): - return { - collection: (collection.dir.listdir() if collection.dir.exists() else []) - for collection in self.collections - } - def _iter_state( self, existing=True, @@ -430,41 +456,49 @@ def _iter_state( basenames=None, keys=False, unpack=True, + exists_func=None, ): existing = bool(existing) if optional_existing is not None: optional_existing = bool(optional_existing) - # get the dict of all basenames + # get all basenames if basenames is None: - basenames = self._get_basenames() - - # reuse state iteration of wrapped collections - for coll in self.collections: - iter_kwargs = { - "existing": existing, - "optional_existing": optional_existing, - "keys": keys, - "unpack": unpack, + basenames = { + col.dir.abspath: (col.dir.listdir() if col.dir.exists() else []) + for col in self.collections } - if isinstance(coll, SiblingFileCollectionBase) and coll in basenames: - iter_kwargs["basenames"] = basenames[coll] - for obj in coll._iter_state(**iter_kwargs): - yield obj + + # helper to check for existence + if exists_func is None: + exists_func = partial( + self._exists_in_basenames, + basenames=basenames, + optional_existing=optional_existing, + target_dirs=self._flat_target_dirs, + ) + + # loop and yield + for key, targets in self._iter_flat(): + state = all(map(exists_func, targets)) + if state is existing: + if unpack: + targets = self.targets[key] + yield (key, targets) if keys else targets def _exists_fwd(self, **kwargs): - fwd = [("basenames", "basenames_dict"), ("optional_existing", "optional_existing")] - return self.exists(**{dst: kwargs[src] for dst, src in fwd if src in kwargs}) + fwd = ["optional_existing", "basenames", "exists_func"] + return self.exists(**{key: kwargs[key] for key in fwd if key in kwargs}) def flatten_collections(*targets): - lookup = flatten(targets) + lookup = deque(flatten(targets)) targets = [] while lookup: - t = lookup.pop(0) + t = lookup.popleft() if isinstance(t, TargetCollection): - lookup[:0] = t._flat_target_list + lookup.extendleft(t._flat_target_list) else: targets.append(t)