From 87d99ed1ea26d87a75d688cc837740e231405eee Mon Sep 17 00:00:00 2001 From: Lukas Juhrich Date: Fri, 5 Jul 2024 17:40:23 +0200 Subject: [PATCH] WIP --- pycroft/model/user.py | 71 +++++++++++++++++++++++++++++- tests/model/test_unix_tombstone.py | 40 ++++++++++++++++- 2 files changed, 108 insertions(+), 3 deletions(-) diff --git a/pycroft/model/user.py b/pycroft/model/user.py index 7a34d3584..d1c789263 100644 --- a/pycroft/model/user.py +++ b/pycroft/model/user.py @@ -29,6 +29,7 @@ Sequence, func, UniqueConstraint, + ForeignKeyConstraint, Index, text, event, @@ -212,6 +213,12 @@ class User(BaseUser, UserMixin): account_id: Mapped[int] = mapped_column(ForeignKey("account.id"), index=True) account: Mapped[Account] = relationship(back_populates="user") + # login_hash: Mapped[str] = mapped_column(Computed("digest(login, 'sha512')"), ForeignKey("UnixTombstone.login_hash")) + # ordering issue: need `UnixTombstone` to be part of the mapper already, because… …??? + tombstone: Mapped[UnixTombstone] = relationship( + viewonly=True, + primaryjoin="UnixTombstone.login_hash == User.login_hash" + ) unix_account_id: Mapped[int | None] = mapped_column( ForeignKey("unix_account.id"), unique=True ) @@ -462,7 +469,7 @@ def permission_level(self) -> int: def email_internal(self): return f"{self.login}@agdsn.me" - __table_args__ = (UniqueConstraint('swdd_person_id'),) + __table_args__ = (UniqueConstraint('swdd_person_id'), ForeignKeyConstraint(("login_hash",), ("unix_tombstone.login_hash",))) @event.listens_for(User.__table__, "before_create") @@ -717,6 +724,7 @@ class UnixTombstone(ModelBase): IF v_ua_ts IS NULL THEN insert into unix_tombstone (uid, login_hash) values (NEW.uid, v_user.login_hash); ELSE + -- TODO do this in a separate constraint! IF v_ua_ts.login_hash <> v_user.login_hash THEN RAISE EXCEPTION 'unix_account %%: tombstone login hash (%%) differs from user login hash (%%)', NEW.id, v_ua_ts.login_hash, v_user.login_hash @@ -743,7 +751,68 @@ class UnixTombstone(ModelBase): when="BEFORE", ), ) +# TODO add related constraint trigger for the consistency check + +# unix account creation +manager.add_function( + User.__table__, + ddl.Function( + "user_ensure_tombstone", + [], + "trigger", + """ + DECLARE + v_ua unix_account; + v_login_ts unix_tombstone; + v_ua_ts unix_tombstone; + v_u_login_hash character varying; + BEGIN + -- hash not generated! + select * into v_ua from unix_account ua where ua.id = NEW.unix_account_id; + select digest(NEW.login, 'sha512') into v_u_login_hash; + + select ts.* into v_login_ts from "user" u + join unix_tombstone ts on v_u_login_hash = ts.login_hash + where u.id = NEW.id; + + IF v_ua IS NULL THEN + IF v_login_ts IS NULL THEN + insert into unix_tombstone (uid, login_hash) values (null, v_u_login_hash) on conflict do nothing; + END IF; + -- ELSE: user tombstone exists, no need to do anything + ELSE + select * into v_ua_ts from unix_tombstone ts where ts.uid = v_ua.uid; + IF v_ua_ts.login_hash IS NULL THEN + update unix_tombstone ts set login_hash = v_u_login_hash where ts.uid = v_ua_ts.uid; + ELSE + -- TODO do this in a separat constraint! + IF v_ua_ts.login_hash <> v_u_login_hash THEN + RAISE EXCEPTION 'user %%: tombstone login hash (%%) differs from user login hash (%%)', + NEW.id, v_ua_ts.login_hash, v_u_login_hash + USING ERRCODE = 'integrity_constraint_violation'; + END IF; + END IF; + END IF; + + RETURN NEW; + END; + """, + volatility="volatile", + strict=True, + language="plpgsql", + ), +) +manager.add_trigger( + User.__table__, + ddl.Trigger( + "user_ensure_tombstone_trigger", + User.__table__, + ("INSERT", "UPDATE"), + "user_ensure_tombstone()", + when="BEFORE", + ), +) class RoomHistoryEntry(IntegerIdModel): active_during: Mapped[Interval[utc.DateTimeTz]] = mapped_column(TsTzRange) diff --git a/tests/model/test_unix_tombstone.py b/tests/model/test_unix_tombstone.py index c514102ad..e9051558f 100644 --- a/tests/model/test_unix_tombstone.py +++ b/tests/model/test_unix_tombstone.py @@ -4,11 +4,11 @@ from hashlib import sha512 import pytest -from sqlalchemy import inspect +from sqlalchemy import inspect, update, text from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import Session -from pycroft.model.user import UnixTombstone +from pycroft.model.user import UnixTombstone, User from tests import factories as f @@ -92,6 +92,33 @@ class TestUserLoginHashFKey: class TestUserUnixAccountTombstoneConsistency: + @pytest.fixture(scope="class") + def user(self, class_session) -> User: + user = f.UserFactory(with_unix_account=True) + class_session.flush() + return user + + def test_user_login_change_fails(self, session, user): + # changing `user.login` does not work due to custom validator + from sqlalchemy import update + + with pytest.raises(IntegrityError, match="user_login_hash_fkey"): + stmt = update(User).where(User.id == user.id).values(login=user.login + "_").returning(User.login) + _new_login = session.scalars(stmt) + + def test_user_login_change_works_when_changing_tombstone(self, session, user): + login_new = user.login + "_" + tombstone = user.tombstone + with session.begin_nested(): + session.execute(text("set constraints all deferred")) + session.execute(update(User).where(User.id == user.id).values(login=login_new).returning(User.login)) + tombstone.login_hash = user.login_hash + session.add(tombstone) + session.execute(text("set constraints all immediate")) + + def test_user_login_change_fails_when_creating_new_tombstone(self, session, user): + pytest.fail("TODO") + # TODO test that modifications on user/unix_account # (e.g. creation, attr modification) # throw an error if both entities point to different tombstones @@ -99,3 +126,12 @@ class TestUserUnixAccountTombstoneConsistency: # TODO test: adding a unix account pointing to user w/ tombstone w/ different uid # throws an error pass + +class TestTombstoneLifeCycle: + # TODO: FIXTURE: isolated unix tombstone + + def test_cannot_set_uid_null(self, session): + pytest.fail("TODO") + + def test_cannot_set_login_hash_null(self, session): + pytest.fail("TODO")