Skip to content

Commit

Permalink
add multi access key funcitionality
Browse files Browse the repository at this point in the history
  • Loading branch information
redvox committed Apr 2, 2024
1 parent a265f8e commit fe0472e
Show file tree
Hide file tree
Showing 14 changed files with 285 additions and 152 deletions.
47 changes: 27 additions & 20 deletions app/aws/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,29 +62,29 @@ def _get_client(profile_name: str, service: str, timeout: int = None, retries: i
return session.client(service)


def has_access_key() -> Result:
def has_access_key(access_key: str) -> Result:
logger.info('has access key')
result = Result()
credentials_file = _load_credentials_file()

if not credentials_file.has_section('access-key'):
error_text = 'could not find profile \'access-key\' in .aws/credentials'
if not credentials_file.has_section(access_key):
error_text = f'could not find access-key \'{access_key}\' in .aws/credentials'
result.error(error_text)
logger.warning(error_text)
return result
result.set_success()
return result


def check_access_key() -> Result:
def check_access_key(access_key: str) -> Result:
logger.info('check access key')
access_key_result = has_access_key()
access_key_result = has_access_key(access_key=access_key)
if not access_key_result.was_success:
return access_key_result

result = Result()
try:
client = _get_client('access-key', 'sts', timeout=2, retries=2)
client = _get_client(access_key, 'sts', timeout=2, retries=2)
client.get_caller_identity()
except ClientError:
error_text = 'access key is not valid'
Expand Down Expand Up @@ -123,14 +123,14 @@ def check_session() -> Result:
return result


def fetch_session_token(mfa_token: str) -> Result:
def fetch_session_token(access_key: str, mfa_token: str) -> Result:
result = Result()
credentials_file = _load_credentials_file()
logger.info('fetch session-token')
profile = 'session-token'

try:
secrets = _get_session_token(mfa_token)
secrets = _get_session_token(access_key=access_key, mfa_token=mfa_token)
except ClientError:
error_text = 'could not fetch session token'
result.error(error_text)
Expand Down Expand Up @@ -183,11 +183,10 @@ def fetch_role_credentials(user_name: str, profile_group: ProfileGroup) -> Resul

def _remove_unused_profiles(credentials_file, profile_group: ProfileGroup):
used_profiles = profile_group.list_profile_names()
used_profiles.append('access-key')
used_profiles.append('session-token')

for profile in credentials_file.sections():
if profile not in used_profiles:
if profile not in used_profiles and not profile.startswith('access-key'):
credentials_file.remove_section(profile)
return credentials_file

Expand Down Expand Up @@ -225,16 +224,24 @@ def _remove_unused_configs(config_file: configparser, profile_group: ProfileGrou
return config_file


def set_access_key(key_id: str, access_key: str) -> None:
def set_access_key(key_name: str, key_id: str, key_secret: str) -> None:
credentials_file = _load_credentials_file()
profile = 'access-key'
if not credentials_file.has_section(profile):
credentials_file.add_section(profile)
credentials_file.set(profile, 'aws_access_key_id', key_id)
credentials_file.set(profile, 'aws_secret_access_key', access_key)
if not credentials_file.has_section(key_name):
credentials_file.add_section(key_name)
credentials_file.set(key_name, 'aws_access_key_id', key_id)
credentials_file.set(key_name, 'aws_secret_access_key', key_secret)
_write_credentials_file(credentials_file)


def get_access_key_list() -> list:
credentials_file = _load_credentials_file()
access_key_list = []
for profile in credentials_file.sections():
if profile.startswith('access-key'):
access_key_list.append(profile)
return access_key_list


def get_access_key_id():
credentials_file = _load_credentials_file()
return credentials_file.get('access-key', 'aws_access_key_id')
Expand All @@ -256,8 +263,8 @@ def _add_profile_config(option_file: configparser, profile: str, region: str) ->
option_file.set(config_name, 'output', 'json')


def get_user_name() -> str:
client = _get_client('access-key', 'sts')
def get_user_name(access_key) -> str:
client = _get_client(access_key, 'sts')
identity = client.get_caller_identity()
return _extract_user_from_identity(identity)

Expand All @@ -266,8 +273,8 @@ def _extract_user_from_identity(identity):
return identity['Arn'].split('/')[-1]


def _get_session_token(mfa_token) -> dict:
client = _get_client('access-key', 'sts')
def _get_session_token(access_key: str, mfa_token: str) -> dict:
client = _get_client(access_key, 'sts')

identity = client.get_caller_identity()
duration = 43200 # 12 * 60 * 60
Expand Down
2 changes: 1 addition & 1 deletion app/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def rotate_access_key(self):
def set_access_key(self):
key_id = getpass(prompt='Key ID: ')
access_key = getpass(prompt='Secret Access Key: ')
self.core.set_access_key(key_id=key_id, access_key=access_key)
self.core.set_access_key(key_id=key_id, key_secret=access_key)
self._info('key was successfully rotated')

@staticmethod
Expand Down
40 changes: 36 additions & 4 deletions app/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,54 @@

from app.core import files

_default_access_key = 'access-key'


class Config:
def __init__(self):
self.profile_groups: Dict[ProfileGroup] = {}
self.access_keys: List[str] = [] # TODO is this still needed?
self.valid = False
self.error = False

self.mfa_shell_command = None
self.default_access_key = None

def load_from_disk(self):
config = files.load_config()
self.mfa_shell_command = config.get('mfa_shell_command', None)
access_key = config.get('default_access_key', None)

accounts = files.load_accounts()
self.set_accounts(accounts)
self.set_accounts(accounts, access_key)

def save_to_disk(self):
files.save_accounts_file(self.to_dict())
files.save_config_file({
'mfa_shell_command': self.mfa_shell_command,
'default_access_key': self.default_access_key,
})

def set_accounts(self, accounts: dict):
def set_accounts(self, accounts: dict, access_key: str):
if not access_key:
self.default_access_key = _default_access_key
else:
self.default_access_key = access_key
self.access_keys.append(self.default_access_key)

for group_name, group_data in accounts.items():
self.profile_groups[group_name] = ProfileGroup(group_name, group_data)
profile_group = ProfileGroup(name=group_name,
group=group_data,
default_access_key=self.default_access_key)
self.profile_groups[group_name] = profile_group
if profile_group.access_key:
self.access_keys.append(profile_group.access_key)

self.validate()

def set_mfa_shell_command(self, mfa_shell_command: str):
self.mfa_shell_command = mfa_shell_command

def validate(self) -> None:
valid = False
error = ''
Expand Down Expand Up @@ -58,11 +79,13 @@ def to_dict(self):


class ProfileGroup:
def __init__(self, name, group: dict):
def __init__(self, name, group: dict, default_access_key: str):
self.name: str = name
self.team: str = group.get('team', None)
self.region: str = group.get('region', None)
self.color: str = group.get('color', None)
self.default_access_key = default_access_key
self.access_key: str = group.get('access_key', None)
self.profiles: List[Profile] = []
self.type: str = group.get("type", "aws") # only aws (default) & gcp as values are allowed

Expand All @@ -76,6 +99,8 @@ def validate(self) -> (bool, str):
return False, f'{self.name} has no region'
if not self.color:
return False, f'{self.name} has no color'
if self.access_key and not self.access_key.startswith('access-key'):
return False, f'access-key {self.access_key} must have the prefix \"access-key\"'
if self.type == "aws" and len(self.profiles) == 0:
return False, f'aws "{self.name}" has no profiles'
for profile in self.profiles:
Expand All @@ -95,13 +120,20 @@ def list_profile_names(self):
def get_default_profile(self):
return next((profile for profile in self.profiles if profile.default), None)

def get_access_key(self):
if self.access_key:
return self.access_key
return self.default_access_key

def to_dict(self):
result_dict = {
'color': self.color,
'team': self.team,
'region': self.region,
'profiles': [profile.to_dict() for profile in self.profiles],
}
if self.access_key != self.default_access_key:
result_dict['access_key'] = self.access_key
if self.type != "aws":
result_dict["type"] = self.type
return result_dict
Expand Down
41 changes: 26 additions & 15 deletions app/core/core.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import sys
from typing import Optional, Callable

from app.aws import credentials, iam
Expand All @@ -16,27 +17,28 @@ def __init__(self):
self.config: Config = Config()
self.config.load_from_disk()
self.active_profile_group: ProfileGroup = None
self.empty_profile_group: ProfileGroup = ProfileGroup('logout', {})
self.empty_profile_group: ProfileGroup = ProfileGroup('logout', {}, '')
self.region_override: str = None

def login(self, profile_group: ProfileGroup, mfa_callback: Callable) -> Result:
result = Result()
logger.info(f'start login {profile_group.name}')
self.active_profile_group = profile_group
access_key = profile_group.get_access_key()

access_key_result = credentials.check_access_key()
access_key_result = credentials.check_access_key(access_key=access_key)
if not access_key_result.was_success:
return access_key_result

session_result = credentials.check_session()
if session_result.was_error:
return session_result
if not session_result.was_success:
renew_session_result = self._renew_session(mfa_callback)
renew_session_result = self._renew_session(access_key=access_key, mfa_callback=mfa_callback)
if not renew_session_result.was_success:
return renew_session_result

user_name = credentials.get_user_name()
user_name = credentials.get_user_name(access_key=access_key)
role_result = credentials.fetch_role_credentials(user_name, profile_group)
if not role_result.was_success:
return role_result
Expand All @@ -51,7 +53,6 @@ def login(self, profile_group: ProfileGroup, mfa_callback: Callable) -> Result:
return result

def login_gcp(self, profile_group: ProfileGroup) -> Result:

result = Result()
self.active_profile_group = profile_group
logger.info('gcp login detected')
Expand Down Expand Up @@ -129,23 +130,28 @@ def get_profile_group_list(self):
def get_active_profile_color(self):
return self.active_profile_group.color

@staticmethod
def rotate_access_key() -> Result:
def rotate_access_key(self, key_name: str) -> Result:
result = Result()
logger.info('initiate key rotation')
logger.info('check access key')
access_key_result = credentials.check_access_key()
access_key_result = credentials.check_access_key(access_key=key_name)
if not access_key_result.was_success:
return access_key_result

logger.info(f'check if access key {key_name} is in use and can be rotated')
if not self.active_profile_group or self.active_profile_group.get_access_key() != key_name:
result = Result()
result.error(f'Please login with a profile that uses \'{key_name}\' first')
return result

logger.info('check session')
check_session_result = credentials.check_session()
if not check_session_result.was_success:
check_session_result.error('Access Denied. Please log first')
return check_session_result

logger.info('create key')
user = credentials.get_user_name()
user = credentials.get_user_name(key_name)
create_access_key_result = iam.create_access_key(user)
if not create_access_key_result.was_success:
return create_access_key_result
Expand All @@ -155,8 +161,9 @@ def rotate_access_key() -> Result:
iam.delete_iam_access_key(user, previous_access_key_id)

logger.info('save key')
credentials.set_access_key(key_id=create_access_key_result.payload['AccessKeyId'],
access_key=create_access_key_result.payload['SecretAccessKey'])
credentials.set_access_key(key_name=key_name,
key_id=create_access_key_result.payload['AccessKeyId'],
key_secret=create_access_key_result.payload['SecretAccessKey'])

result.set_success()
return result
Expand All @@ -173,7 +180,7 @@ def edit_config(self, config: Config) -> Result:
result.set_success()
return result

def _renew_session(self, mfa_callback: Callable) -> Result:
def _renew_session(self, access_key: str, mfa_callback: Callable) -> Result:
logger.info('renew session')
logger.info('get mfa from console')
token = mfa.fetch_mfa_token_from_shell(self.config.mfa_shell_command)
Expand All @@ -184,7 +191,7 @@ def _renew_session(self, mfa_callback: Callable) -> Result:
result = Result()
result.error('invalid mfa token')
return result
session_result = credentials.fetch_session_token(token)
session_result = credentials.fetch_session_token(access_key=access_key, mfa_token=token)
return session_result

@staticmethod
Expand All @@ -193,5 +200,9 @@ def _handle_support_files(profile_group: ProfileGroup):
files.write_active_group_file(profile_group.name)

@staticmethod
def set_access_key(key_id, access_key):
credentials.set_access_key(key_id=key_id, access_key=access_key)
def set_access_key(key_name: str, key_id: str, access_key: str):
credentials.set_access_key(key_name=key_name, key_id=key_id, key_secret=access_key)

@staticmethod
def get_access_key_list():
return credentials.get_access_key_list()
6 changes: 4 additions & 2 deletions app/core/result.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from datetime import datetime
import logging

logger = logging.getLogger('logsmith')


class Result:
Expand All @@ -15,6 +17,6 @@ def add_payload(self, content):
self.payload = content

def error(self, message):
logger.error(message)
self.was_error = True
self.error_message = message
timestamp = datetime.now().strftime('%H:%M:%S')
Loading

0 comments on commit fe0472e

Please sign in to comment.