diff --git a/gimme_aws_creds/__init__.py b/gimme_aws_creds/__init__.py index 36ae2e46..ffec2e79 100644 --- a/gimme_aws_creds/__init__.py +++ b/gimme_aws_creds/__init__.py @@ -1,2 +1,2 @@ __all__ = ['config', 'okta', 'main', 'ui'] -version = '2.0.0' +version = '2.0.1' diff --git a/gimme_aws_creds/main.py b/gimme_aws_creds/main.py index 52d6ddda..95d5c469 100644 --- a/gimme_aws_creds/main.py +++ b/gimme_aws_creds/main.py @@ -539,9 +539,7 @@ def get_resolver(self): def device_token(self): if self.config.action_register_device is True: self.conf_dict['device_token'] = None - elif not self.conf_dict.get('device_token'): - raise errors.GimmeAWSCredsError( - 'No device token in configuration. Try running --action-register-device again.') + return self.conf_dict.get('device_token') @property @@ -558,7 +556,7 @@ def aws_results(self): auth_result['username']) elif self.gimme_creds_server == 'appurl': - auth_result = self.okta.auth_session() + self.okta.auth_session() # bypass lambda & API call # Apps url is required when calling with appurl if self.conf_dict.get('app_url'): @@ -788,11 +786,20 @@ def handle_action_store_json_creds(self, stream=None): def handle_action_register_device(self): # Capture the Device Token and write it to the config file - if self.config.action_register_device is True: + if self.device_token is None or self.config.action_register_device is True: + if not self.config.action_register_device: + self.ui.notify('\n*** No device token found in configuration file, it will be created.') + self.ui.notify('*** You may be prompted for MFA more than once for this run.\n') + auth_result = self.okta.auth_session() self.conf_dict['device_token'] = auth_result['device_token'] self.config.write_config_file(self.conf_dict) - raise errors.GimmeAWSCredsExitSuccess('Device token saved!') + self.okta.device_token = self.conf_dict['device_token'] + + self.ui.notify('\nDevice token saved!\n') + + if self.config.action_register_device is True: + raise errors.GimmeAWSCredsExitSuccess() def handle_action_list_roles(self): if self.config.action_list_roles: diff --git a/gimme_aws_creds/okta.py b/gimme_aws_creds/okta.py index 93c68c2e..b4692983 100644 --- a/gimme_aws_creds/okta.py +++ b/gimme_aws_creds/okta.py @@ -41,13 +41,13 @@ class OktaClient(object): KEYRING_SERVICE = 'gimme-aws-creds' KEYRING_ENABLED = not isinstance(keyring.get_keyring(), FailKeyring) - def __init__(self, ui, okta_org_url, verify_ssl_certs=True, device_token=None): + def __init__(self, gac_ui, okta_org_url, verify_ssl_certs=True, device_token=None): """ - :type ui: ui.UserInterface + :type gac_ui: ui.UserInterface :param okta_org_url: Base URL string for Okta IDP. :param verify_ssl_certs: Enable/disable SSL verification """ - self.ui = ui + self.ui = gac_ui self._okta_org_url = okta_org_url self._verify_ssl_certs = verify_ssl_certs @@ -65,20 +65,28 @@ def __init__(self, ui, okta_org_url, verify_ssl_certs=True, device_token=None): self._oauth_access_token = None self._oauth_id_token = None - jar = requests.cookies.RequestsCookieJar() - - if device_token is not None: - match = re.search('^https://(.*)', okta_org_url) - jar.set('DT', device_token, domain=match.group(1), path='/') + self._jar = requests.cookies.RequestsCookieJar() # Allow up to 5 retries on requests to Okta in case we have network issues self._http_client = requests.Session() - self._http_client.cookies = jar + self._http_client.cookies = self._jar + + self.device_token = device_token retries = Retry(total=5, backoff_factor=1, method_whitelist=['GET', 'POST']) self._http_client.mount('https://', HTTPAdapter(max_retries=retries)) + @property + def device_token(self): + return self._http_client.cookies.get('DT') + + @device_token.setter + def device_token(self, device_token): + if device_token is not None: + match = re.search('^https://(.*)', self._okta_org_url) + self._http_client.cookies.set('DT', device_token, domain=match.group(1), path='/') + def set_username(self, username): self._username = username @@ -464,23 +472,22 @@ def _check_u2f_result(self, state_token, login_data): # should be deprecated soon as OKTA move forward webauthN # just for backward compatibility nonce = login_data['_embedded']['factor']['_embedded']['challenge']['nonce'] - credentialId = login_data['_embedded']['factor']['profile']['credentialId'] - appId = login_data['_embedded']['factor']['profile']['appId'] - version = login_data['_embedded']['factor']['profile']['version'] - response = {} - verif = FactorU2F(self.ui, appId, nonce, credentialId) + credential_id = login_data['_embedded']['factor']['profile']['credentialId'] + app_id = login_data['_embedded']['factor']['profile']['appId'] + + verify = FactorU2F(self.ui, app_id, nonce, credential_id) try: - clientData, signature = verif.verify() + client_data, signature = verify.verify() except: signature = b'fake' - clientData = b'fake' + client_data = b'fake' - clientData = str(base64.urlsafe_b64encode(clientData), "utf-8") - signatureData = str(base64.urlsafe_b64encode(signature), 'utf-8') + client_data = str(base64.urlsafe_b64encode(client_data), "utf-8") + signature_data = str(base64.urlsafe_b64encode(signature), 'utf-8') response = self._http_client.post( login_data['_links']['next']['href'] + "?rememberDevice=false", - json={'stateToken': state_token, 'clientData': clientData, 'signatureData': signatureData}, + json={'stateToken': state_token, 'clientData': client_data, 'signatureData': signature_data}, headers=self._get_headers(), verify=self._verify_ssl_certs ) @@ -498,24 +505,24 @@ def _check_webauthn_result(self, state_token, login_data): """ wait for webauthN challenge """ nonce = login_data['_embedded']['factor']['_embedded']['challenge']['challenge'] - credentialId = login_data['_embedded']['factor']['profile']['credentialId'] + credential_id = login_data['_embedded']['factor']['profile']['credentialId'] response = {} """ Authenticator """ - verif = WebAuthnClient(self.ui, self._okta_org_url, nonce, credentialId) + verif = WebAuthnClient(self.ui, self._okta_org_url, nonce, credential_id) try: - clientData, assertion = verif.verify() + client_data, assertion = verif.verify() except: - clientData = b'fake' + client_data = b'fake' assertion = FakeAssertion() - clientData = str(base64.urlsafe_b64encode(clientData), "utf-8") - signatureData = base64.b64encode(assertion.signature).decode('utf-8') - authData = base64.b64encode(assertion.auth_data).decode('utf-8') + client_data = str(base64.urlsafe_b64encode(client_data), "utf-8") + signature_data = base64.b64encode(assertion.signature).decode('utf-8') + auth_data = base64.b64encode(assertion.auth_data).decode('utf-8') response = self._http_client.post( login_data['_links']['next']['href'] + "?rememberDevice=false", - json={'stateToken': state_token, 'clientData':clientData, 'signatureData': signatureData, 'authenticatorData': authData}, + json={'stateToken': state_token, 'clientData':client_data, 'signatureData': signature_data, 'authenticatorData': auth_data}, headers=self._get_headers(), verify=self._verify_ssl_certs ) @@ -541,12 +548,13 @@ def get_hs_stateToken(self, response): return api_response # no MFA required => we should have a session cookies, login flow ends here - api_response = {} - api_response['status'] = 'SUCCESS' - api_response['sessionToken'] = '' - api_response['session'] = response.cookies['sid'] - api_response['device_token'] = self._http_client.cookies['DT'] - return api_response; + api_response = { + 'status': 'SUCCESS', + 'sessionToken': '', + 'session': response.cookies['sid'], + 'device_token': self._http_client.cookies['DT'] + } + return api_response def get_saml_response(self, url): """ return the base64 SAML value object from the SAML Response""" @@ -580,8 +588,7 @@ def get_saml_response(self, url): return {'SAMLResponse': saml_response, 'RelayState': relay_state, 'TargetUrl': form_action} - def get(self, url, **kwargs): - """ Retrieve resource that is protected by Okta """ + def check_kwargs(self, kwargs): if self._use_oauth_access_token is True: if 'headers' not in kwargs: kwargs['headers'] = {} @@ -591,46 +598,28 @@ def get(self, url, **kwargs): if 'headers' not in kwargs: kwargs['headers'] = {} kwargs['headers']['Authorization'] = "Bearer {}".format(self._oauth_access_token) - return self._http_client.get(url, **kwargs) + + return kwargs + + def get(self, url, **kwargs): + """ Retrieve resource that is protected by Okta """ + parameters = self.check_kwargs(kwargs) + return self._http_client.get(url, **parameters) def post(self, url, **kwargs): """ Create resource that is protected by Okta """ - if self._use_oauth_access_token is True: - if 'headers' not in kwargs: - kwargs['headers'] = {} - kwargs['headers']['Authorization'] = "Bearer {}".format(self._oauth_access_token) - - if self._use_oauth_id_token is True: - if 'headers' not in kwargs: - kwargs['headers'] = {} - kwargs['headers']['Authorization'] = "Bearer {}".format(self._oauth_access_token) - return self._http_client.post(url, **kwargs) + parameters = self.check_kwargs(kwargs) + return self._http_client.post(url, **parameters) def put(self, url, **kwargs): """ Modify resource that is protected by Okta """ - if self._use_oauth_access_token is True: - if 'headers' not in kwargs: - kwargs['headers'] = {} - kwargs['headers']['Authorization'] = "Bearer {}".format(self._oauth_access_token) - - if self._use_oauth_id_token is True: - if 'headers' not in kwargs: - kwargs['headers'] = {} - kwargs['headers']['Authorization'] = "Bearer {}".format(self._oauth_access_token) - return self._http_client.put(url, **kwargs) + parameters = self.check_kwargs(kwargs) + return self._http_client.put(url, **parameters) def delete(self, url, **kwargs): """ Delete resource that is protected by Okta """ - if self._use_oauth_access_token is True: - if 'headers' not in kwargs: - kwargs['headers'] = {} - kwargs['headers']['Authorization'] = "Bearer {}".format(self._oauth_access_token) - - if self._use_oauth_id_token is True: - if 'headers' not in kwargs: - kwargs['headers'] = {} - kwargs['headers']['Authorization'] = "Bearer {}".format(self._oauth_access_token) - return self._http_client.delete(url, **kwargs) + parameters = self.check_kwargs(kwargs) + return self._http_client.delete(url, **parameters) def _choose_factor(self, factors): """ gets a list of available authentication factors and @@ -639,10 +628,15 @@ def _choose_factor(self, factors): self.ui.info("Multi-factor Authentication required.") # filter the factor list down to just the types specified in preferred_mfa_type + preferred_factors = [] if self._preferred_mfa_type is not None: - factors = list(filter(lambda item: item['factorType'] == self._preferred_mfa_type, factors)) + preferred_factors = list(filter(lambda item: item['factorType'] == self._preferred_mfa_type, factors)) + # If the preferred factor isn't in the list of available factors, we'll let the user know before + # prompting to select another. + if not preferred_factors: + self.ui.notify('Preferred factor type of {} not available.'.format(self._preferred_mfa_type)) - if len(factors) == 1: + if len(preferred_factors) == 1: factor_name = self._build_factor_name(factors[0]) self.ui.info(factor_name + ' selected') selection = 0 @@ -679,7 +673,7 @@ def _build_factor_name(factor): elif factor['factorType'] == 'webauthn': return factor['factorType'] + ": " + factor['factorType'] else: - return ("Unknown MFA type: " + factor['factorType']) + return "Unknown MFA type: " + factor['factorType'] def _get_username_password_creds(self): """Get's creds for Okta login from the user.""" @@ -713,7 +707,7 @@ def _get_username_password_creds(self): keyring.set_password(self.KEYRING_SERVICE, username, password) self.ui.info("Password for {} saved in keyring.".format(username)) except RuntimeError as err: - self.ui.warning("Failed to save password in keyring: ", err) + self.ui.warning("Failed to save password in keyring: " + str(err)) if not password: raise errors.GimmeAWSCredsError('Password was not provided. Exiting.')