Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Command-Line interface #4 #10

Merged
merged 7 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions app/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,17 @@


@cache
def init(config_file='config.yml') -> tuple[Storage, IPTables, ConfigReader]:
def init(
config_file='config.yml',
with_ipt=True,
) -> tuple[Storage, IPTables | None, ConfigReader]:
"""
Singleton init function to be reused across the app.
Have the option to init without IPTables, since they
require extended privileges.
"""
cfg = ConfigReader(config_file)
active_db = init_db(cfg.database_path)
storage = Storage(active_db)
ipt = IPTables(cfg)
ipt = IPTables(cfg) if with_ipt else None
return storage, ipt, cfg
4 changes: 4 additions & 0 deletions app/backend/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ class Tokens(BaseModel):
expires = pw.DateTimeField(null=True)
reason = pw.CharField(null=True) # Can be an external_id

@property
def is_valid(self) -> bool:
return not self.expires or self.expires > datetime.utcnow()


class AccessRequests(BaseModel):
"""
Expand Down
22 changes: 14 additions & 8 deletions app/backend/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ def get_db(self) -> Database:
def _today(self) -> datetime:
return datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0)

def add_token(self, expiry_days: int = None) -> tuple[UUID, datetime | None]:
def add_token(
self,
expiry_days: int = None,
reason: str = None,
) -> tuple[UUID, datetime | None]:
"""
Generate UUID4 tokens.
"""
Expand All @@ -35,17 +39,18 @@ def add_token(self, expiry_days: int = None) -> tuple[UUID, datetime | None]:
if expiry_days:
_expires = self._today() + timedelta(days=expiry_days)

Tokens.insert(value=_value, expires=_expires).execute()
Tokens.insert(value=_value, expires=_expires, reason=reason).execute()

return _value, _expires

def get_token(self, value: str) -> Tokens | None:
return Tokens.select().where(Tokens.value == value).get_or_none()

def verify_token(self, value: str) -> Tokens | None:
token = (
Tokens.select()
.where(Tokens.value == value)
.where((Tokens.expires.is_null()) | (Tokens.expires > datetime.utcnow()))
)
return token.first() # get_or_none doesn't work on SelectQuery
token = self.get_token(value)
if token and token.is_valid:
return token
return None

def expire_token(self, value: str) -> bool:
"""
Expand All @@ -54,6 +59,7 @@ def expire_token(self, value: str) -> bool:
token = (
Tokens.update({Tokens.expires: self._today()})
.where(Tokens.value == value)
.where((Tokens.expires.is_null()) | (Tokens.expires > datetime.utcnow()))
.execute()
)
return bool(token)
Expand Down
Empty file added app/cli/__init__.py
Empty file.
80 changes: 80 additions & 0 deletions app/cli/tokens.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import argparse
from uuid import UUID

from app.api import init
from app.backend.logging import get_logger

log = get_logger('CLI')
storage, _, _ = init(with_ipt=False)


if __name__ == '__main__':
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(
dest='maincmd',
)
# Main commands
add_tokens = subparsers.add_parser(
'add',
help='Generate a new token',
)
exp_tokens = subparsers.add_parser(
'exp',
help='Expire a token',
)
check_tokens = subparsers.add_parser(
'check',
help='Check if a token exists',
)
### Sub-commands ###
# Add sub-command
add_tokens.add_argument(
'--expires-days',
type=int,
help='Expires in number of days',
)
add_tokens.add_argument(
'--reason',
type=str,
help='Reason for granting access',
)
# Check sub-command
check_tokens.add_argument(
'uuid',
type=UUID,
help='Token UUID to check',
)
# Expire sub-command
exp_tokens.add_argument(
'uuid',
type=UUID,
help='Token UUID to expire',
)
args = parser.parse_args()
# log.info('Arguments: %s', args)
# ADD
if args.maincmd == 'add':
ret = storage.add_token(args.expires_days, args.reason)
log.info('Added token %s, expires=%s', *ret)
# CHECK
if args.maincmd == 'check':
tok = storage.get_token(args.uuid)
if tok and tok.is_valid:
log.info(
'Token Valid %s → %s, Reason: %s',
tok.created,
tok.expires or 'Unlimited',
tok.reason,
)
elif tok:
log.info(
'Token Expired %s, Reason: %s',
tok.expires,
tok.reason,
)
else:
log.info('Token NOT FOUND!')
# EXPIRE
if args.maincmd == 'exp':
ret = storage.expire_token(args.uuid)
log.info('Token is now expired=%s', ret)
2 changes: 1 addition & 1 deletion tests/test_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from app.api import init
from app.bootstrap.bootstrap import create_db

storage, _, _ = init()
storage, _, _ = init(with_ipt=False)
create_db(storage.get_db())


Expand Down
Loading