diff --git a/src/vulnix/nix.py b/src/vulnix/nix.py index 8c0eafc..ae30c73 100644 --- a/src/vulnix/nix.py +++ b/src/vulnix/nix.py @@ -59,11 +59,13 @@ def _call_nix(self, args): 'nix-command flakes'] + args) return call(['nix'] + args) - def _find_deriver(self, path, qpi_deriver=None): + def _find_deriver(self, path, qpi_deriver="undef"): + if not path or not qpi_deriver: + return None if path.endswith('.drv'): return path # Deriver from QueryPathInfo - if qpi_deriver is None: + if qpi_deriver == "undef": qpi_deriver = call(['nix-store', '-qd', path]).strip() _log.debug('qpi_deriver: %s', qpi_deriver) if qpi_deriver and qpi_deriver != 'unknown-deriver' and p.exists( @@ -109,22 +111,24 @@ def add_path(self, path): if self.closure: for output in self._find_outputs(path): - for candidate in map( - # We cannot use the `deriver` field directly because - # like from `nix-store -qd` that path may not exist. - # However, we know that if it is not present - # the path has no deriver because it is a - # derivation input source so we can skip it. - lambda p: self._find_deriver( - p.get('path'), - qpi_deriver=p.get('deriver') - ) if p.get('deriver') is not None else None, - json.loads( - self._call_nix(['path-info', '-r', '--json', output]) - ) - ): - if candidate is not None: + data = json.loads(self._call_nix(['path-info', '-r', '--json', output])) + if not data: + continue + # 'nix path-info -r --json' can return two different json + # output format: https://github.com/NixOS/nix/pull/9242 + if isinstance(data, dict): + for outpath, info in data.items(): + drv = info.get('deriver') + candidate = self._find_deriver(outpath, qpi_deriver=drv) self.update(candidate) + elif isinstance(data, list): + for info in data: + outpath = info.get('path') + drv = info.get('deriver') + candidate = self._find_deriver(outpath, qpi_deriver=drv) + self.update(candidate) + else: + _log.warning("path-info for '%s' returned unexpected json", output) else: deriver = self._find_deriver(path) if self.requisites: @@ -136,7 +140,7 @@ def add_path(self, path): self.update(deriver) def update(self, drv_path): - if not drv_path.endswith('.drv'): + if not drv_path or not drv_path.endswith('.drv'): return try: drv_obj = load(drv_path)