Skip to content

Commit

Permalink
[model] Add UnixTombstone class
Browse files Browse the repository at this point in the history
This encompasses
- A generated column `User.login_hash`
- An FKey User→UnixTombstone
- An FKey UnixAccount→UnixTombstone
- Partial indices on the UnixTombstone table to ensure a composite
  nullable key with optional components, but with equality
- A trigger updating the user's tombstone if a UnixAccount is inserted
  • Loading branch information
lukasjuhrich committed Feb 4, 2023
1 parent b7a8ada commit ed3abb7
Show file tree
Hide file tree
Showing 2 changed files with 207 additions and 0 deletions.
106 changes: 106 additions & 0 deletions pycroft/model/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,25 @@

from flask_login import UserMixin
from sqlalchemy import (
Boolean,
Column,
ForeignKey,
Integer,
String,
and_,
exists,
join,
null,
select,
Sequence,
Date,
func,
UniqueConstraint,
Index,
text,
event,
CheckConstraint,
Computed,
)
from sqlalchemy.dialects.postgresql import ExcludeConstraint
from sqlalchemy.ext.associationproxy import association_proxy
Expand Down Expand Up @@ -96,6 +102,7 @@ class BaseUser(IntegerIdModel):
__abstract__ = True

login: Mapped[str40] = mapped_column(unique=True)
login_hash: Mapped[str] = mapped_column(Computed("digest(login, 'sha512')"))
name: Mapped[str255]
registered_at: Mapped[utc.DateTimeTz]
passwd_hash: Mapped[str_deferred | None]
Expand Down Expand Up @@ -461,6 +468,11 @@ def email_internal(self):
__table_args__ = (UniqueConstraint('swdd_person_id'),)


@event.listens_for(User.__table__, "before_create")
def create_pgcrypto(target, connection, **kw):
connection.execute(text("create extension if not exists pgcrypto"))


manager.add_function(
User.__table__,
ddl.Function(
Expand Down Expand Up @@ -632,13 +644,107 @@ class Property(IntegerIdModel):

class UnixAccount(IntegerIdModel):
uid: Mapped[int] = mapped_column(
ForeignKey("unix_tombstone.uid"),
unique=True, server_default=unix_account_uid_seq.next_value()
)
tombstone: Mapped[UnixTombstone] = relationship(viewonly=True)
gid: Mapped[int] = mapped_column(default=100)
login_shell: Mapped[str] = mapped_column(default="/bin/bash")
home_directory: Mapped[str] = mapped_column(unique=True)


class UnixTombstone(ModelBase):
uid: Mapped[int] = mapped_column(unique=True)
login_hash: Mapped[str] = mapped_column(unique=True)

# backrefs
unix_account: Mapped[UnixAccount] = relationship(viewonly=True, uselist=False)
# /backrefs

__table_args__ = (
UniqueConstraint("uid", "login_hash"),
Index(
"uid_only_unique", login_hash, unique=True, postgresql_where=uid.is_(None)
),
Index(
"login_hash_only_unique",
uid,
unique=True,
postgresql_where=login_hash.is_(None),
),
CheckConstraint("uid is not null or login_hash is not null"),
)
__mapper_args__ = {"primary_key": (uid, login_hash)} # fake PKey for mapper


# unix account creation
manager.add_function(
User.__table__,
ddl.Function(
"unix_account_ensure_tombstone",
[],
"trigger",
# IF unix_account has a corresponding user
# THEN use that tombstone.
# However, in the scenario where the user's tombstone exists and points to a different uid,
# we throw an error instead.
"""
DECLARE
v_user "user";
v_login_ts unix_tombstone;
v_ua_ts unix_tombstone;
BEGIN
select * into v_user from "user" u where u.unix_account_id = NEW.id;
select * into v_ua_ts from unix_tombstone ts where ts.uid = NEW.uid;
select ts.* into v_login_ts from "user" u
join unix_tombstone ts on u.login_hash = ts.login_hash
where u.unix_account_id = NEW.id;
-- 1) no user, no tombstone
-- 2) no user, tombstone
-- 3) user, no tombstone -> create
-- 4a) user, tombstone with different login hash
-- 4b) user, tombstone with matching login hash
IF v_user IS NULL THEN
IF v_ua_ts IS NULL THEN
insert into unix_tombstone (uid) values (NEW.uid);
END IF;
RETURN NEW;
END IF;
-- else: user not null
IF v_ua_ts IS NULL THEN
insert into unix_tombstone (uid, login_hash) values (NEW.uid, v_user.login_hash);
ELSE
IF v_ua_ts.login_hash <> v_user.login_hash THEN
RAISE EXCEPTION 'unix_account %%: tombstone login hash (%%) differs from user login hash (%%)',
NEW.id, v_ua_ts.login_hash, v_user.login_hash
USING ERRCODE = 'integrity_constraint_violation';
END IF;
END IF;
RETURN NEW;
END;
""",
volatility="volatile",
strict=True,
language="plpgsql",
),
)

manager.add_trigger(
User.__table__,
ddl.Trigger(
"unix_account_ensure_tombstone_trigger",
UnixAccount.__table__,
("INSERT",),
"unix_account_ensure_tombstone()",
when="BEFORE",
),
)


class RoomHistoryEntry(IntegerIdModel):
active_during: Mapped[Interval[utc.DateTimeTz]] = mapped_column(TsTzRange)

Expand Down
101 changes: 101 additions & 0 deletions tests/model/test_unix_tombstone.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# Copyright (c) 2023. The Pycroft Authors. See the AUTHORS file.
# This file is part of the Pycroft project and licensed under the terms of
# the Apache License, Version 2.0. See the LICENSE file for details
from hashlib import sha512

import pytest
from sqlalchemy import inspect
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session

from pycroft.model.user import UnixTombstone
from tests import factories as f


L_HASH = sha512(b"login").hexdigest()


class TestTombstoneConstraints:
@staticmethod
def test_tombstone_needs_login_hash_or_uid(session):
session.add(UnixTombstone())
with pytest.raises(IntegrityError, match="CheckViolation"):
session.flush()

@staticmethod
@pytest.mark.parametrize(
"tombstone_args",
(
[(None, 10000)] * 2,
[(L_HASH, None)] * 2,
[(None, 10000), (L_HASH, 10000)],
[(L_HASH, 10000), (L_HASH, 10001)],
[(L_HASH, 10000), (L_HASH, None)],
[(None, None)],
),
)
def test_tombstone_uniqueness_violations(
session: Session, tombstone_args: list[tuple[str, int]]
):
session.add_all([UnixTombstone(login_hash=h, uid=u) for h, u in tombstone_args])
with pytest.raises(IntegrityError):
session.flush()

@staticmethod
def test_valid_tombstone_combinations(session: Session):
session.add_all(
UnixTombstone(login_hash=h, uid=uid)
for h, uid in (
(None, 10000),
(L_HASH, None),
(sha512(b"login2").hexdigest(), 10001),
(None, 20000),
)
)
try:
session.flush()
except IntegrityError:
pytest.fail("raised IntegrityError")


class TestUnixAccountUidFKey:
@staticmethod
@pytest.fixture(scope="class")
def unix_account(class_session):
account = f.UnixAccountFactory()
class_session.flush()
return account

@staticmethod
def test_unix_account_has_tombstone(unix_account):
assert unix_account.tombstone

@staticmethod
def test_unix_account_deletion_keeps_tombstone(session, unix_account):
tombstone = unix_account.tombstone
session.delete(unix_account)
session.flush()
session.refresh(tombstone)
assert inspect(tombstone).persistent

@staticmethod
def test_unix_account_uid_change_does_not_change_tombstone(session, unix_account):
unix_account.uid += 1000
session.add(unix_account)
with pytest.raises(IntegrityError, match="ForeignKeyViolation"):
session.flush()


class TestUserLoginHashFKey:
# TODO test user add, modify, delete
pass


class TestUserUnixAccountTombstoneConsistency:
# TODO test that modifications on user/unix_account
# (e.g. creation, attr modification)
# throw an error if both entities point to different tombstones

# TODO test: adding a unix account pointing to user w/ tombstone w/ different uid
# throws an error
pass

0 comments on commit ed3abb7

Please sign in to comment.