Skip to content

Commit

Permalink
Merge pull request #141 from flyingcircusio/ts-nixos-module-support
Browse files Browse the repository at this point in the history
Nixos module support
  • Loading branch information
zagy authored Dec 8, 2023
2 parents 804f4f6 + d7f4772 commit ec5556e
Show file tree
Hide file tree
Showing 4 changed files with 387 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGES.d/20231208_113313_cz_ts_nixos_module_support.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
* Add `nixos.NixOSModule` to inject component attributes into .nix files.
289 changes: 288 additions & 1 deletion src/batou_ext/nix.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import collections
import hashlib
import inspect
import json
import os
import os.path
import shlex
import subprocess
import time
from pathlib import Path

import batou
import batou.component
Expand All @@ -16,6 +19,24 @@
import batou.lib.supervisor
import batou.utils
import pkg_resources
from batou import (
IPAddressConfigurationError,
ReportingException,
UpdateNeeded,
output,
)
from batou.component import Component, RootComponent
from batou.environment import Environment
from batou.host import Host
from batou.lib.file import (
File,
Group,
ManagedContentBase,
Mode,
Owner,
Presence,
)
from batou.utils import Address, NetLoc


class Package(batou.component.Component):
Expand Down Expand Up @@ -67,7 +88,6 @@ def namevar_for_breadcrumb(self):


class PurgePackage(batou.component.Component):

namevar = "package"

def verify(self):
Expand Down Expand Up @@ -480,3 +500,270 @@ def update(self):
# Start up once to load all dependencies here and not upon the first
# use:
self.cmd("./{} -c True".format(self.python))


def nix_dict_to_nix(dct):
"""Converts a dict with values that are already nixified to Nix code."""
content = " ".join(f"{n} = {v};" for n, v in dct.items())
return "{ " + content + " }"


def seq_to_nix(seq):
content = " ".join(value_to_nix(v) for v in seq)
return "[ " + content + " ]"


def mapping_to_nix(obj):
# XXX: only str keys for now

converted = {}
for k, v in obj.items():
conv = value_to_nix(v)
if conv is not None:
converted[k] = conv
return nix_dict_to_nix(converted)


def str_to_nix(value):
# https://nixos.org/manual/nix/stable/language/values.html#type-string
value = (
value.replace("\\", "\\\\").replace("${", "\\${").replace('"', '\\"')
)
return f'"{value}"'


def environment_to_nix_dict(env: Environment):
dct = {
"base_dir": str_to_nix(env.base_dir),
"connect_method": str_to_nix(env.connect_method),
"deployment_base": str_to_nix(env.deployment_base),
"name": str_to_nix(env.name),
"target_directory": str_to_nix(env.target_directory),
"workdir_base": str_to_nix(env.workdir_base),
}

if env.host_domain is not None:
dct["host_domain"] = str_to_nix(env.host_domain)
if env.platform is not None:
dct["platform"] = str_to_nix(env.platform)
if env.service_user is not None:
dct["service_user"] = str_to_nix(env.service_user)

return dct


def netloc_to_nix_dict(netloc: NetLoc):
return {
"__toString": f'_: "{netloc}"',
"host": str_to_nix(netloc.host),
"port": str(netloc.port),
}


def address_to_nix_dict(addr: Address):
dct = {
"__toString": f"_: {str_to_nix(str(addr))}",
"connect": nix_dict_to_nix(netloc_to_nix_dict(addr.connect)),
}
try:
dct["listen"] = nix_dict_to_nix(netloc_to_nix_dict(addr.listen))
except IPAddressConfigurationError:
pass
try:
dct["listen_v6"] = nix_dict_to_nix(netloc_to_nix_dict(addr.listen_v6))
except IPAddressConfigurationError:
pass

return dct


def host_to_nix_dict(host: Host):
return {"fqdn": str_to_nix(host.fqdn), "name": str_to_nix(host.name)}


def value_to_nix(value):
if isinstance(value, str):
return str_to_nix(value)
elif isinstance(value, bool):
return str(value).lower()
elif value is None:
return None
elif isinstance(value, int):
return str(value)
elif isinstance(value, Path):
return str(value)
elif isinstance(value, dict):
return mapping_to_nix(value)
elif isinstance(value, list):
return seq_to_nix(value)
elif isinstance(value, tuple):
return seq_to_nix(value)
elif isinstance(value, Component):
return component_to_nix(value)
elif isinstance(value, Address):
return nix_dict_to_nix(address_to_nix_dict(value))
elif isinstance(value, Host):
return nix_dict_to_nix(host_to_nix_dict(value))
elif isinstance(value, Environment):
return nix_dict_to_nix(environment_to_nix_dict(value))
else:
raise TypeError(f"unsupported type '{type(value)}'")


def component_to_nix(component: Component):
from batou_ext.nixos import NixOSModuleContext

attrs = {}

for name, value in inspect.getmembers(component):
if name.startswith("_"):
pass
elif value is component:
pass
elif inspect.ismethod(value) or inspect.isgenerator(value):
pass
elif name in ("sub_components", "changed"):
pass
elif isinstance(value, NixOSModuleContext):
pass
elif isinstance(value, RootComponent):
if (
value.component is not component
and component.parent is not value.component
):
attrs[name] = component_to_nix(value.component)
elif isinstance(value, Component):
if value is not component.parent:
attrs[name] = component_to_nix(value)
elif isinstance(value, NixOSModuleContext):
pass
else:
try:
converted_value = value_to_nix(value)
if converted_value is not None:
attrs[name] = converted_value
except TypeError as e:
component.log(f"Cannot convert {name}: {e.args[0]}")

return nix_dict_to_nix(attrs)


class NixSyntaxCheckFailed(ReportingException):
def __init__(self, error_msg, path=None):
self.error_msg = error_msg.strip().removeprefix("error: ")
self.path = path

def __str__(self):
return f"Nix syntax check failed: {self.error_msg} in {self.path}"

def report(self):
output.error(f"Nix check {self.error_msg}")


class NixContent(ManagedContentBase):
format_nix_code = False
check_nix_syntax = True

def render(self):
pass

def verify(self, predicting=False):
update_needed = False

if self.format_nix_code:
try:
proc = subprocess.run(
["nixfmt"],
input=self.content,
check=True,
capture_output=True,
)
self.content = proc.stdout
except FileNotFoundError:
self.log("Cannot format Nix file, nixfmt not found.")
except subprocess.CalledProcessError as e:
self.log(f"nixfmt failed: {e.stderr}")

try:
super().verify(predicting)
except UpdateNeeded:
update_needed = True

if self.check_nix_syntax:
try:
subprocess.run(
["nix-instantiate", "--parse", "-"],
input=self.content,
check=True,
capture_output=True,
)
except FileNotFoundError:
self.log(
"Cannot syntax-check Nix file, nix-instantiate not found."
)
except subprocess.CalledProcessError as e:
raise NixSyntaxCheckFailed(
e.stderr.decode("utf8"), path=self.path
)

if update_needed:
raise UpdateNeeded()


class NixFile(File):
format_nix_code = False

def configure(self):
self._unmapped_path = self.path
self.path = self.map(self.path)
self += Presence(self.path, leading=self.leading)

# variation: content or source explicitly given

# The mode needs to be set early to allow batou to get out of
# accidental "permission denied" situations.
if self.mode:
self += Mode(self.path, mode=self.mode)

# no content or source given but file with same name
# exists
if self.content is None and not self.source:
guess_source = self.root.defdir + "/" + os.path.basename(self.path)
if os.path.isfile(guess_source):
self.source = guess_source
else:
# Avoid the edge case where we want to support a very simple
# case: specify File('asdf') and have an identical named file
# in the component definition directory that will be templated
# to the work directory.
#
# However, if you mis-spell the file, then you might
# accidentally end up with an empty file in the work directory.
# If you really want an empty File then you can either use
# Presence(), or (recommended) use File('asdf', content='') to
# make this explicit. We don't want to accidentally confuse the
# convenience case (template a simple file) and an edge case
# (have an empty file)
raise ValueError(
"Missing implicit template file {}. Or did you want "
"to create an empty file? Then use File('{}', content='').".format(
guess_source, self._unmapped_path
)
)
if self.content or self.source:
content = NixContent(
self.path,
source=self.source,
encoding=self.encoding,
content=self.content,
sensitive_data=self.sensitive_data,
format_nix_code=self.format_nix_code,
)
self += content
self.content = content.content

if self.owner:
self += Owner(self.path, owner=self.owner)

if self.group:
self += Group(self.path, group=self.group)
Loading

0 comments on commit ec5556e

Please sign in to comment.