From e480e342f48ba6044b9c0e4a6a769d9640721253 Mon Sep 17 00:00:00 2001 From: Lukas Juhrich Date: Sat, 28 Jan 2023 13:59:43 +0100 Subject: [PATCH] [model] Add `UnixTombstone` class MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This encompasses - A generated column `User.login_hash` - An FKey User→UnixTombstone - An FKey UnixAccount→UnixTombstone - Partial indices on the UnixTombstone table to ensure a composite nullable key with optional components, but with equality - A trigger updating the user's tombstone if a UnixAccount is inserted --- pycroft/model/user.py | 105 +++++++++++++++++++++++++++++ tests/model/test_unix_tombstone.py | 101 +++++++++++++++++++++++++++ 2 files changed, 206 insertions(+) create mode 100644 tests/model/test_unix_tombstone.py diff --git a/pycroft/model/user.py b/pycroft/model/user.py index acaa98f0f..231941222 100644 --- a/pycroft/model/user.py +++ b/pycroft/model/user.py @@ -32,6 +32,8 @@ Index, text, event, + CheckConstraint, + Computed, ) from sqlalchemy.dialects.postgresql import ExcludeConstraint from sqlalchemy.ext.associationproxy import association_proxy @@ -96,6 +98,7 @@ class BaseUser(IntegerIdModel): __abstract__ = True login: Mapped[str40] = mapped_column(unique=True) + login_hash: Mapped[str] = mapped_column(Computed("digest(login, 'sha512')")) name: Mapped[str255] registered_at: Mapped[utc.DateTimeTz] passwd_hash: Mapped[str_deferred | None] @@ -461,6 +464,11 @@ def email_internal(self): __table_args__ = (UniqueConstraint('swdd_person_id'),) +@event.listens_for(User.__table__, "before_create") +def create_pgcrypto(target, connection, **kw): + connection.execute(text("create extension if not exists pgcrypto")) + + manager.add_function( User.__table__, ddl.Function( @@ -632,13 +640,110 @@ class Property(IntegerIdModel): class UnixAccount(IntegerIdModel): uid: Mapped[int] = mapped_column( + ForeignKey("unix_tombstone.uid"), unique=True, server_default=unix_account_uid_seq.next_value() ) + tombstone: Mapped[UnixTombstone] = relationship(viewonly=True) gid: Mapped[int] = mapped_column(default=100) login_shell: Mapped[str] = mapped_column(default="/bin/bash") home_directory: Mapped[str] = mapped_column(unique=True) +class UnixTombstone(ModelBase): + # mapped_column does not work yet for reference in `__mapper_args__`, unfortunately. + from sqlalchemy import Column, Integer, String + + uid: Mapped[int] = Column(Integer, unique=True) + login_hash: Mapped[str] = Column(String, unique=True) + + # backrefs + unix_account: Mapped[UnixAccount] = relationship(viewonly=True, uselist=False) + # /backrefs + + __table_args__ = ( + UniqueConstraint("uid", "login_hash"), + Index( + "uid_only_unique", login_hash, unique=True, postgresql_where=uid.is_(None) + ), + Index( + "login_hash_only_unique", + uid, + unique=True, + postgresql_where=login_hash.is_(None), + ), + CheckConstraint("uid is not null or login_hash is not null"), + ) + __mapper_args__ = {"primary_key": (uid, login_hash)} # fake PKey for mapper + + +# unix account creation +manager.add_function( + User.__table__, + ddl.Function( + "unix_account_ensure_tombstone", + [], + "trigger", + # IF unix_account has a corresponding user + # THEN use that tombstone. + # However, in the scenario where the user's tombstone exists and points to a different uid, + # we throw an error instead. + """ + DECLARE + v_user "user"; + v_login_ts unix_tombstone; + v_ua_ts unix_tombstone; + BEGIN + select * into v_user from "user" u where u.unix_account_id = NEW.id; + select * into v_ua_ts from unix_tombstone ts where ts.uid = NEW.uid; + + select ts.* into v_login_ts from "user" u + join unix_tombstone ts on u.login_hash = ts.login_hash + where u.unix_account_id = NEW.id; + + -- 1) no user, no tombstone + -- 2) no user, tombstone + -- 3) user, no tombstone -> create + -- 4a) user, tombstone with different login hash + -- 4b) user, tombstone with matching login hash + + IF v_user IS NULL THEN + IF v_ua_ts IS NULL THEN + insert into unix_tombstone (uid) values (NEW.uid); + END IF; + RETURN NEW; + END IF; + -- else: user not null + IF v_ua_ts IS NULL THEN + insert into unix_tombstone (uid, login_hash) values (NEW.uid, v_user.login_hash); + ELSE + IF v_ua_ts.login_hash <> v_user.login_hash THEN + RAISE EXCEPTION 'unix_account %%: tombstone login hash (%%) differs from user login hash (%%)', + NEW.id, v_ua_ts.login_hash, v_user.login_hash + USING ERRCODE = 'integrity_constraint_violation'; + END IF; + END IF; + + RETURN NEW; + END; + """, + volatility="volatile", + strict=True, + language="plpgsql", + ), +) + +manager.add_trigger( + User.__table__, + ddl.Trigger( + "unix_account_ensure_tombstone_trigger", + UnixAccount.__table__, + ("INSERT",), + "unix_account_ensure_tombstone()", + when="BEFORE", + ), +) + + class RoomHistoryEntry(IntegerIdModel): active_during: Mapped[Interval[utc.DateTimeTz]] = mapped_column(TsTzRange) diff --git a/tests/model/test_unix_tombstone.py b/tests/model/test_unix_tombstone.py new file mode 100644 index 000000000..c514102ad --- /dev/null +++ b/tests/model/test_unix_tombstone.py @@ -0,0 +1,101 @@ +# Copyright (c) 2023. The Pycroft Authors. See the AUTHORS file. +# This file is part of the Pycroft project and licensed under the terms of +# the Apache License, Version 2.0. See the LICENSE file for details +from hashlib import sha512 + +import pytest +from sqlalchemy import inspect +from sqlalchemy.exc import IntegrityError +from sqlalchemy.orm import Session + +from pycroft.model.user import UnixTombstone +from tests import factories as f + + +L_HASH = sha512(b"login").hexdigest() + + +class TestTombstoneConstraints: + @staticmethod + def test_tombstone_needs_login_hash_or_uid(session): + session.add(UnixTombstone()) + with pytest.raises(IntegrityError, match="CheckViolation"): + session.flush() + + @staticmethod + @pytest.mark.parametrize( + "tombstone_args", + ( + [(None, 10000)] * 2, + [(L_HASH, None)] * 2, + [(None, 10000), (L_HASH, 10000)], + [(L_HASH, 10000), (L_HASH, 10001)], + [(L_HASH, 10000), (L_HASH, None)], + [(None, None)], + ), + ) + def test_tombstone_uniqueness_violations( + session: Session, tombstone_args: list[tuple[str, int]] + ): + session.add_all([UnixTombstone(login_hash=h, uid=u) for h, u in tombstone_args]) + with pytest.raises(IntegrityError): + session.flush() + + @staticmethod + def test_valid_tombstone_combinations(session: Session): + session.add_all( + UnixTombstone(login_hash=h, uid=uid) + for h, uid in ( + (None, 10000), + (L_HASH, None), + (sha512(b"login2").hexdigest(), 10001), + (None, 20000), + ) + ) + try: + session.flush() + except IntegrityError: + pytest.fail("raised IntegrityError") + + +class TestUnixAccountUidFKey: + @staticmethod + @pytest.fixture(scope="class") + def unix_account(class_session): + account = f.UnixAccountFactory() + class_session.flush() + return account + + @staticmethod + def test_unix_account_has_tombstone(unix_account): + assert unix_account.tombstone + + @staticmethod + def test_unix_account_deletion_keeps_tombstone(session, unix_account): + tombstone = unix_account.tombstone + session.delete(unix_account) + session.flush() + session.refresh(tombstone) + assert inspect(tombstone).persistent + + @staticmethod + def test_unix_account_uid_change_does_not_change_tombstone(session, unix_account): + unix_account.uid += 1000 + session.add(unix_account) + with pytest.raises(IntegrityError, match="ForeignKeyViolation"): + session.flush() + + +class TestUserLoginHashFKey: + # TODO test user add, modify, delete + pass + + +class TestUserUnixAccountTombstoneConsistency: + # TODO test that modifications on user/unix_account + # (e.g. creation, attr modification) + # throw an error if both entities point to different tombstones + + # TODO test: adding a unix account pointing to user w/ tombstone w/ different uid + # throws an error + pass