diff --git a/niworkflows/utils/spaces.py b/niworkflows/utils/spaces.py index 3cf8ec1dc72..b38f701f3fc 100644 --- a/niworkflows/utils/spaces.py +++ b/niworkflows/utils/spaces.py @@ -161,6 +161,8 @@ class Reference: """The dictionary of specs.""" standard = attr.ib(default=False, repr=False, type=bool) """Whether this space is standard or not.""" + cifti = attr.ib(default=False, repr=False, type=bool) + """Whether this space is a CIFTI space or not.""" dim = attr.ib(default=3, repr=False, type=int) """Dimensionality of the sampling manifold.""" @@ -184,6 +186,25 @@ def __attrs_post_init__(self): if self.space in self._standard_spaces: object.__setattr__(self, "standard", True) + if "volspace" in self.spec: + object.__setattr__(self, "cifti", True) + + if "volspace" in self.spec: + volspace = self.spec["volspace"] + if (self.space in self._standard_spaces) and (volspace not in self._standard_spaces): + raise ValueError( + f"Surface space ({self.space}) is a standard space, " + f"but volume space ({volspace}) is not. " + "Mixing standard and non-standard spaces is not currently allowed." + ) + elif (self.space not in self._standard_spaces) and (volspace in self._standard_spaces): + raise ValueError( + f"Surface space ({self.space}) is a non-standard space, " + f"but volume space ({volspace}) is a standard space. " + "Mixing standard and non-standard spaces is not currently allowed." + ) + + # Check that cohort is handled appropriately _cohorts = ["%s" % t for t in _tfapi.TF_LAYOUT.get_cohorts(template=self.space)] if "cohort" in self.spec: if not _cohorts: @@ -204,6 +225,30 @@ def __attrs_post_init__(self): "Set a valid cohort selector from: %s." % (self.space, _cohorts) ) + # Check that cohort is handled appropriately for the volume template if necessary + if "volspace" in self.spec: + _cohorts = [ + "%s" % t for t in _tfapi.TF_LAYOUT.get_cohorts(template=self.spec["volspace"]) + ] + if "volcohort" in self.spec: + if not _cohorts: + raise ValueError( + 'standard space "%s" does not accept a cohort ' + "specification." % self.spec["volspace"] + ) + + if str(self.spec["volcohort"]) not in _cohorts: + raise ValueError( + 'standard space "%s" does not contain any cohort ' + 'named "%s".' % (self.spec["volspace"], self.spec["volcohort"]) + ) + elif _cohorts: + _cohorts = ", ".join(['"cohort-%s"' % c for c in _cohorts]) + raise ValueError( + 'standard space "%s" is not fully defined.\n' + "Set a valid cohort selector from: %s." % (self.spec["volspace"], _cohorts) + ) + @property def fullname(self): """ @@ -218,9 +263,17 @@ def fullname(self): 'MNIPediatricAsym:cohort-1' """ - if "cohort" not in self.spec: - return self.space - return "%s:cohort-%s" % (self.space, self.spec["cohort"]) + name = self.space + + if "cohort" in self.spec: + name += f":cohort-{self.spec['cohort']}" + + if "volspace" in self.spec: + name += f"::{self.spec['volspace']}" + if "volcohort" in self.spec: + name += f":cohort-{self.spec['volcohort']}" + + return name @property def legacyname(self): @@ -343,13 +396,35 @@ def from_string(cls, value): Reference(space='MNIPediatricAsym', spec={'cohort': '6', 'res': '2'}), Reference(space='MNIPediatricAsym', spec={'cohort': '6', 'res': 'iso1.6mm'})] + >>> Reference.from_string( + ... "dhcpAsym:cohort-42:den-32k::dhcpVol:cohort-44:res-2" + ... ) # doctest: +NORMALIZE_WHITESPACE + [Reference(space='dhcpAsym', spec={'cohort': '42', 'den': '32k', 'volspace': 'dhcpVol', + 'volcohort': '44', 'volres': '2'})] + """ + volume_value = None + if "::" in value: + # CIFTI definition with both surface and volume spaces defined + value, volume_value = value.split("::") + # We treat the surface space definition as the "primary" space + _args = value.split(":") + _args = value.split(":") spec = defaultdict(list, {}) for modifier in _args[1:]: mitems = modifier.split("-", 1) spec[mitems[0]].append(len(mitems) == 1 or mitems[1]) + if volume_value: + # Tack on the volume space definition to the surface space definition + volume_args = volume_value.split(":") + # There are two special entities to prevent overloading: volspace and volcohort + spec["volspace"] = [volume_args[0]] + for modifier in volume_args[1:]: + mitems = modifier.split("-", 1) + spec[f"vol{mitems[0]}"].append(len(mitems) == 1 or mitems[1]) + allspecs = _expand_entities(spec) return [cls(_args[0], s) for s in allspecs] @@ -585,9 +660,8 @@ def insert(self, index, value, error=True): elif error is True: raise ValueError('space "%s" already in spaces.' % str(value)) - def get_spaces(self, standard=True, nonstandard=True, dim=(2, 3)): - """ - Return space names. + def get_spaces(self, standard=True, nonstandard=True, dim=(2, 3), cifti=(True, False)): + """Return space names. Parameters ---------- @@ -597,6 +671,8 @@ def get_spaces(self, standard=True, nonstandard=True, dim=(2, 3)): Return nonstandard spaces. dim : :obj:`tuple`, optional Desired dimensions of the standard spaces (default is ``(2, 3)``) + cifti : :obj:`tuple`, optional + Desired CIFTI status of the standard spaces (default is ``(True, False)``). Examples -------- @@ -631,13 +707,13 @@ def get_spaces(self, standard=True, nonstandard=True, dim=(2, 3)): s.fullname not in out and (s.standard is standard or s.standard is not nonstandard) and s.dim in dim + and s.cifti in cifti ): out.append(s.fullname) return out - def get_standard(self, full_spec=False, dim=(2, 3)): - """ - Return output spaces. + def get_standard(self, full_spec=False, dim=(2, 3), cifti=(True, False)): + """Return standard output spaces. Parameters ---------- @@ -646,30 +722,39 @@ def get_standard(self, full_spec=False, dim=(2, 3)): have density or resolution set). dim : :obj:`tuple`, optional Desired dimensions of the standard spaces (default is ``(2, 3)``) - + cifti : :obj:`tuple`, optional + Desired CIFTI status of the standard spaces (default is ``(True, False)``). """ + out = [s for s in self.references if s.standard] + out = [s for s in out if s.dim in dim] + out = [s for s in out if s.cifti in cifti] if not full_spec: - return [s for s in self.references if s.standard and s.dim in dim] + return out - return [ - s - for s in self.references - if s.standard - and s.dim in dim - and (hasspec("res", s.spec) or hasspec("den", s.spec)) - ] + out = [s for s in out if hasspec("res", s.spec) or hasspec("den", s.spec)] + return out - def get_nonstandard(self, full_spec=False, dim=(2, 3)): - """Return nonstandard spaces.""" + def get_nonstandard(self, full_spec=False, dim=(2, 3), cifti=(True, False)): + """Return nonstandard output spaces. + + Parameters + ---------- + full_spec : :obj:`bool` + Return only fully-specified standard references (i.e., they must either + have density or resolution set). + dim : :obj:`tuple`, optional + Desired dimensions of the standard spaces (default is ``(2, 3)``) + cifti : :obj:`tuple`, optional + Desired CIFTI status of the standard spaces (default is ``(True, False)``). + """ + out = [s for s in self.references if not s.standard] + out = [s for s in out if s.dim in dim] + out = [s for s in out if s.cifti in cifti] if not full_spec: - return [s.space for s in self.references if not s.standard and s.dim in dim] - return [ - s.space - for s in self.references - if not s.standard - and s.dim in dim - and (hasspec("res", s.spec) or hasspec("den", s.spec)) - ] + return out + + out = [s for s in out if hasspec("res", s.spec) or hasspec("den", s.spec)] + return out def get_fs_spaces(self): """ diff --git a/niworkflows/utils/tests/test_spaces.py b/niworkflows/utils/tests/test_spaces.py index 2c72378f987..7a10651f221 100644 --- a/niworkflows/utils/tests/test_spaces.py +++ b/niworkflows/utils/tests/test_spaces.py @@ -82,6 +82,10 @@ def parser(): ), ), (("MNI152NLin6Asym", "func"), ("MNI152NLin6Asym:res-native", "func")), + ( + ("dhcpAsym:cohort-42:den-32k::dhcpVol:cohort-44:res-2",), + ("dhcpAsym:cohort-42:den-32k:volcohort-44:volres-2:volspace-dhcpVol",), + ), ], ) def test_space_action(parser, spaces, expected):