Skip to content

Commit

Permalink
Refactor target collection state checks.
Browse files Browse the repository at this point in the history
  • Loading branch information
riga committed Dec 14, 2024
1 parent b5be40c commit bca2d55
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 66 deletions.
2 changes: 0 additions & 2 deletions law/contrib/gfal/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import os
import sys
import gc
import contextlib
import stat as _stat

Expand Down Expand Up @@ -108,7 +107,6 @@ def context(self):
finally:
if self.atomic_contexts and pid in self._contexts:
del self._contexts[pid]
gc.collect()

@contextlib.contextmanager
def transfer_parameters(self, ctx):
Expand Down
162 changes: 98 additions & 64 deletions law/target/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@

import types
import random
from abc import abstractmethod
from functools import partial
from contextlib import contextmanager
from collections import defaultdict, deque

import six

Expand Down Expand Up @@ -85,22 +86,30 @@ def _iter_flat(self):
for key, targets in gen:
yield (key, targets)

def _iter_state(self, existing=True, optional_existing=None, keys=False, unpack=True):
def _iter_state(
self,
existing=True,
optional_existing=None,
keys=False,
unpack=True,
exists_func=None,
):
existing = bool(existing)
if optional_existing is not None:
optional_existing = bool(optional_existing)

# helper to check for existence
def exists(t):
if optional_existing is not None and t.optional:
return optional_existing
if isinstance(t, TargetCollection):
return t.exists(optional_existing=optional_existing)
return t.exists()
if exists_func is None:
def exists_func(t):
if optional_existing is not None and t.optional:
return optional_existing
if isinstance(t, TargetCollection):
return t.exists(optional_existing=optional_existing)
return t.exists()

# loop and yield
for key, targets in self._iter_flat():
state = all(exists(t) for t in targets)
state = all(map(exists_func, targets))
if state is existing:
if unpack:
targets = self.targets[key]
Expand Down Expand Up @@ -147,7 +156,7 @@ def complete(self, **kwargs):
return self.optional or self.exists(**kwargs)

def _exists_fwd(self, **kwargs):
fwd = ["optional_existing"]
fwd = ["optional_existing", "exists_func"]
return self.exists(**{key: kwargs[key] for key in fwd if key in kwargs})

def exists(self, **kwargs):
Expand Down Expand Up @@ -178,7 +187,7 @@ def count(self, **kwargs):
target_keys = [key for key, _ in self._iter_state(**kwargs)]

n = len(target_keys)
return n if not keys else (n, target_keys)
return (n, target_keys) if keys else n

def random_target(self):
if isinstance(self.targets, (list, tuple)):
Expand Down Expand Up @@ -268,15 +277,41 @@ class SiblingFileCollectionBase(FileCollection):
Base class for file collections whose elements are located in the same directory (siblings).
"""

@classmethod
def _exists_in_basenames(cls, target, basenames, optional_existing, target_dirs):
if optional_existing is not None and target.optional:
return optional_existing
if isinstance(target, SiblingFileCollectionBase):
return target._exists_fwd(
basenames=basenames,
optional_existing=optional_existing,
)
if isinstance(target, TargetCollection):
return target.exists(exists_func=partial(
cls._exists_in_basenames,
basenames=basenames,
optional_existing=optional_existing,
target_dirs=target_dirs,
))
if isinstance(basenames, dict):
if target_dirs and target in target_dirs:
basenames = basenames[target_dirs[target]]
else:
# need to find find the collection manually, that could possibly contain the target,
# then use its basenames
for col, _basenames in basenames.items():
if col._exists_in_dir(target):
basenames = _basenames
break
else:
return False
return target.basename in basenames

def remove(self, silent=True):
for targets in self.iter_existing(unpack=False):
for t in targets:
t.remove(silent=silent)

@abstractmethod
def _exists_fwd(self, **kwargs):
return


class SiblingFileCollection(SiblingFileCollectionBase):
"""
Expand Down Expand Up @@ -344,42 +379,39 @@ def _iter_state(
basenames=None,
keys=False,
unpack=True,
exists_func=None,
):
existing = bool(existing)
if optional_existing is not None:
optional_existing = bool(optional_existing)

# the directory must exist
if not self.dir.exists():
return

# get the basenames of all elements of the directory
existing = bool(existing)
if optional_existing is not None:
optional_existing = bool(optional_existing)

# get all basenames
if basenames is None:
basenames = self.dir.listdir()
basenames = self.dir.listdir() if self.dir.exists() else []

# helper to check for existence
def exists(t):
if optional_existing is not None and t.optional:
return optional_existing
if isinstance(t, SiblingFileCollectionBase):
return t._exists_fwd(
basenames=basenames,
optional_existing=optional_existing,
)
if isinstance(t, TargetCollection):
return all(exists(_t) for _t in flatten_collections(t))
return t.basename in basenames
if exists_func is None:
exists_func = partial(
self._exists_in_basenames,
basenames=basenames,
optional_existing=optional_existing,
target_dirs=None,
)

# loop and yield
for key, targets in self._iter_flat():
state = all(exists(t) for t in targets)
state = all(map(exists_func, targets))
if state is existing:
if unpack:
targets = self.targets[key]
yield (key, targets) if keys else targets

def _exists_fwd(self, **kwargs):
fwd = ["basenames", "optional_existing"]
fwd = ["optional_existing", "basenames", "exists_func"]
return self.exists(**{key: kwargs[key] for key in fwd if key in kwargs})


Expand All @@ -402,69 +434,71 @@ def __init__(self, *args, **kwargs):
# _flat_target_list attributes, but store them again in sibling file collections to speed up
# some methods by grouping them into targets in the same physical directory
self.collections = []
self._flat_target_collections = {}
grouped_targets = {}
self._flat_target_dirs = {}
grouped_targets = defaultdict(list)
for t in flatten_collections(self._flat_target_list):
grouped_targets.setdefault(t.parent.uri(), []).append(t)
grouped_targets[t.parent.uri()].append(t)
for targets in grouped_targets.values():
# create and store the collection
collection = SiblingFileCollection(targets)
self.collections.append(collection)
# remember the collection per target
# remember the absolute collection dir per target for faster loopups later
for t in targets:
self._flat_target_collections[t] = collection
self._flat_target_dirs[t] = collection.dir.abspath

def _repr_pairs(self):
return SiblingFileCollectionBase._repr_pairs(self) + [("collections", len(self.collections))]

def _get_basenames(self):
return {
collection: (collection.dir.listdir() if collection.dir.exists() else [])
for collection in self.collections
}

def _iter_state(
self,
existing=True,
optional_existing=None,
basenames=None,
keys=False,
unpack=True,
exists_func=None,
):
existing = bool(existing)
if optional_existing is not None:
optional_existing = bool(optional_existing)

# get the dict of all basenames
# get all basenames
if basenames is None:
basenames = self._get_basenames()

# reuse state iteration of wrapped collections
for coll in self.collections:
iter_kwargs = {
"existing": existing,
"optional_existing": optional_existing,
"keys": keys,
"unpack": unpack,
basenames = {
col.dir.abspath: (col.dir.listdir() if col.dir.exists() else [])
for col in self.collections
}
if isinstance(coll, SiblingFileCollectionBase) and coll in basenames:
iter_kwargs["basenames"] = basenames[coll]
for obj in coll._iter_state(**iter_kwargs):
yield obj

# helper to check for existence
if exists_func is None:
exists_func = partial(
self._exists_in_basenames,
basenames=basenames,
optional_existing=optional_existing,
target_dirs=self._flat_target_dirs,
)

# loop and yield
for key, targets in self._iter_flat():
state = all(map(exists_func, targets))
if state is existing:
if unpack:
targets = self.targets[key]
yield (key, targets) if keys else targets

def _exists_fwd(self, **kwargs):
fwd = [("basenames", "basenames_dict"), ("optional_existing", "optional_existing")]
return self.exists(**{dst: kwargs[src] for dst, src in fwd if src in kwargs})
fwd = ["optional_existing", "basenames", "exists_func"]
return self.exists(**{key: kwargs[key] for key in fwd if key in kwargs})


def flatten_collections(*targets):
lookup = flatten(targets)
lookup = deque(flatten(targets))
targets = []

while lookup:
t = lookup.pop(0)
t = lookup.popleft()
if isinstance(t, TargetCollection):
lookup[:0] = t._flat_target_list
lookup.extendleft(t._flat_target_list)
else:
targets.append(t)

Expand Down

0 comments on commit bca2d55

Please sign in to comment.