From 1a3d034ad3e438fd3f18b6e03420c1c0a1fbf89e Mon Sep 17 00:00:00 2001 From: Aaron Date: Fri, 17 Feb 2023 22:03:04 -0500 Subject: [PATCH] Revert "Add sig_v5 support to duo_client_python calls and v2 integrations handler (#188)" (#197) This reverts commit 42a5a1dc58cd5f5304ceb87d704c4a863011bd5d. --- duo_client/admin.py | 85 +++------------ duo_client/client.py | 118 ++++----------------- examples/create_integration_sso_generic.py | 67 ------------ tests/accountAdmin/test_billing.py | 23 ++-- tests/admin/test_admins.py | 15 ++- tests/admin/test_groups.py | 5 +- tests/admin/test_integration.py | 78 -------------- tests/admin/test_integrations.py | 10 +- tests/admin/test_logo.py | 4 +- tests/admin/test_settings.py | 66 ++++++------ tests/admin/test_users.py | 63 ++++++----- tests/test_client.py | 16 --- 12 files changed, 125 insertions(+), 425 deletions(-) delete mode 100644 examples/create_integration_sso_generic.py delete mode 100644 tests/admin/test_integration.py diff --git a/duo_client/admin.py b/duo_client/admin.py index d9e56a3..9d527b4 100644 --- a/duo_client/admin.py +++ b/duo_client/admin.py @@ -216,43 +216,11 @@ class Admin(client.Client): account_id = None - admin_sig_version = 5 - def api_call(self, method, path, params, sig_version=admin_sig_version): + def api_call(self, method, path, params): if self.account_id is not None: params['account_id'] = self.account_id - return super(Admin, self).api_call( - method, - path, - params, - sig_version=sig_version - ) - - def json_api_call(self, method, path, params, sig_version=admin_sig_version): - return super(Admin, self).json_api_call( - method, - path, - params, - sig_version=sig_version - ) - - def json_paging_api_call(self, method, path, params, sig_version=admin_sig_version): - return super(Admin, self).json_paging_api_call( - method, - path, - params, - sig_version=sig_version - ) - - def json_cursor_api_call(self, method, path, params, get_records_func, sig_version=admin_sig_version): - return super(Admin, self).json_cursor_api_call( - method, - path, - params, - get_records_func, - sig_version=sig_version - ) - + return super(Admin, self).api_call(method, path, params) @classmethod def _canonicalize_ip_whitelist(klass, ip_whitelist): @@ -2410,8 +2378,8 @@ def get_integrations_generator(self): """ return self.json_paging_api_call( 'GET', - '/admin/v2/integrations', - {}, + '/admin/v1/integrations', + {} ) def get_integrations(self, limit=None, offset=0): @@ -2430,8 +2398,8 @@ def get_integrations(self, limit=None, offset=0): if limit: return self.json_api_call( 'GET', - '/admin/v2/integrations', - {'limit': limit, 'offset': offset}, + '/admin/v1/integrations', + {'limit': limit, 'offset': offset} ) return list(self.get_integrations_generator()) @@ -2449,8 +2417,8 @@ def get_integration(self, integration_key): params = {} response = self.json_api_call( 'GET', - '/admin/v2/integrations/' + integration_key, - params, + '/admin/v1/integrations/' + integration_key, + params ) return response @@ -2473,8 +2441,7 @@ def create_integration(self, ip_whitelist=None, ip_whitelist_enroll_policy=None, groups_allowed=None, - self_service_allowed=None, - sso=None): + self_service_allowed=None): """Creates a new integration. name - The name of the integration (required) @@ -2500,9 +2467,6 @@ def create_integration(self, adminapi_write_resource - |None groups_allowed - self_service_allowed - |None - sso - (optional) - New argument for unreleased feature. Will return an error if used. - Client will be updated again in the future when feature is released. Returns the created integration. @@ -2550,12 +2514,9 @@ def create_integration(self, params['groups_allowed'] = groups_allowed if self_service_allowed is not None: params['self_service_allowed'] = '1' if self_service_allowed else '0' - if sso is not None: - params['sso'] = sso response = self.json_api_call('POST', - '/admin/v2/integrations', - params, - ) + '/admin/v1/integrations', + params) return response def delete_integration(self, integration_key): @@ -2567,12 +2528,8 @@ def delete_integration(self, integration_key): """ integration_key = six.moves.urllib.parse.quote_plus(str(integration_key)) - path = '/admin/v2/integrations/%s' % integration_key - return self.json_api_call( - 'DELETE', - path, - {}, - ) + path = '/admin/v1/integrations/%s' % integration_key + return self.json_api_call('DELETE', path, {}) def update_integration(self, integration_key, @@ -2594,8 +2551,7 @@ def update_integration(self, ip_whitelist=None, ip_whitelist_enroll_policy=None, groups_allowed=None, - self_service_allowed=None, - sso=None): + self_service_allowed=None): """Updates an integration. integration_key - The key of the integration to update. (required) @@ -2620,9 +2576,6 @@ def update_integration(self, reset_secret_key - |None groups_allowed - self_service_allowed - True|False|None - sso - (optional) - New argument for unreleased feature. Will return an error if used. - Client will be updated again in the future when feature is released. If any value other than None is provided for 'reset_secret_key' (for example, 1), then a new secret key will be generated for the @@ -2634,7 +2587,7 @@ def update_integration(self, """ integration_key = six.moves.urllib.parse.quote_plus(str(integration_key)) - path = '/admin/v2/integrations/%s' % integration_key + path = '/admin/v1/integrations/%s' % integration_key params = {} if name is not None: params['name'] = name @@ -2676,17 +2629,11 @@ def update_integration(self, params['groups_allowed'] = groups_allowed if self_service_allowed is not None: params['self_service_allowed'] = '1' if self_service_allowed else '0' - if sso is not None: - params['sso'] = sso if not params: raise TypeError("No new values were provided") - response = self.json_api_call( - 'POST', - path, - params, - ) + response = self.json_api_call('POST', path, params) return response def get_admins(self, limit=None, offset=0): diff --git a/duo_client/client.py b/duo_client/client.py index 8f2688d..f57ba2b 100644 --- a/duo_client/client.py +++ b/duo_client/client.py @@ -54,58 +54,7 @@ def canon_params(params): return '&'.join(args) -def canon_x_duo_headers(additional_headers): - """ - Args: - additional_headers: Dict - Returns: - stringified version of all headers that start with 'X-Duo*'. Which is then hashed. - Note: the keys are also lower-cased for signing. - """ - if additional_headers is None: - additional_headers = {} - - canon_list = [] - added_headers = [] # store headers we've added, use for duplicate checking (case insensitive) - for header_name in sorted(additional_headers.keys()): - # Extract header value and set key to lower case from now on. - value = additional_headers[header_name] - header_name = header_name.lower() if header_name is not None else None - - # Validation gate. We will raise if a problem is found here. - _validate_additional_header(header_name, value, added_headers) - - # Add to the list of values to canonicalize: - canon_list.extend([header_name, value]) - added_headers.append(header_name) - - canon = '\x00'.join(canon_list) - return hashlib.sha512(canon.encode('utf-8')).hexdigest() - - -def _validate_additional_header(header_name, value, added_headers): - """ - Args: - header_name: str - value: str - added_headers: list[str] - headers we've already added - check for duplicates (case insensitive) - Returns: None - - Validates additional headers added to request - headers must comply with the following rules (for V5 sig_version) - """ - if header_name is None or value is None: - raise ValueError("Not allowed 'None' as a header name or value") - if '\x00' in header_name: - raise ValueError("Not allowed 'Null' character in header name") - if '\x00' in value: - raise ValueError("Not allowed 'Null' character in header value") - if not header_name.lower().startswith('x-duo-'): - raise ValueError("Additional headers must start with \'X-Duo-\'") - if header_name.lower() in added_headers: - raise ValueError("Duplicate header passed, header={}".format(header_name)) - - -def canonicalize(method, host, uri, params, date, sig_version, body=None, additional_headers=None): +def canonicalize(method, host, uri, params, date, sig_version, body=None): """ Return a canonical string version of the given request attributes. @@ -142,27 +91,17 @@ def canonicalize(method, host, uri, params, date, sig_version, body=None, additi canon_params(params), hashlib.sha512(body.encode('utf-8')).hexdigest(), ] - elif sig_version == 5: - canon = [ - date, - method.upper(), - host.lower(), - uri, - canon_params(params), - hashlib.sha512(body.encode('utf-8')).hexdigest(), - canon_x_duo_headers(additional_headers), # hashed in canon_x_duo_headers - ] else: raise ValueError("Unknown signature version: {}".format(sig_version)) return '\n'.join(canon) def sign(ikey, skey, method, host, uri, date, sig_version, params, body=None, - digestmod=hashlib.sha512, additional_headers=None): + digestmod=hashlib.sha512): """ Return basic authorization header line with a Duo Web API signature. """ - canonical = canonicalize(method, host, uri, params, date, sig_version, body=body, additional_headers=additional_headers) + canonical = canonicalize(method, host, uri, params, date, sig_version, body=body) if isinstance(skey, six.text_type): skey = skey.encode('utf-8') if isinstance(canonical, six.text_type): @@ -214,7 +153,7 @@ def __init__(self, ikey, skey, host, user_agent=('Duo API Python/' + __version__), timeout=socket._GLOBAL_DEFAULT_TIMEOUT, paging_limit=100, - digestmod=hashlib.sha512, + digestmod=hashlib.sha512, sig_version=2, port=None ): @@ -250,6 +189,9 @@ def __init__(self, ikey, skey, host, if sig_version == 3: raise ValueError('sig_version 3 not supported') + if sig_version == 4 and digestmod != hashlib.sha512: + raise ValueError('sha512 required for sig_version 4') + def set_proxy(self, host, port=None, headers=None, proxy_type='CONNECT'): """ @@ -265,14 +207,7 @@ def set_proxy(self, host, port=None, headers=None, self.proxy_port = port self.proxy_type = proxy_type - def api_call( - self, - method, - path, - params, - additional_headers=None, - sig_version=None, - ): + def api_call(self, method, path, params): """ Call a Duo API method. Return a (response, data) tuple. @@ -280,30 +215,20 @@ def api_call( * path: Full path of the API endpoint. E.g. "/auth/v2/ping". * params: dict mapping from parameter name to stringified value, or a dict to be converted to json. - * sig_version: signature version integer """ params_go_in_body = method in ('POST', 'PUT', 'PATCH') - digestmod = self.digestmod - if additional_headers is None: - additional_headers = {} - if sig_version is None: - sig_version = self.sig_version - - if sig_version in (1, 2): + if self.sig_version in (1, 2): params = normalize_params(params) # v1 and v2 canonicalization don't distinguish between # params and body. There's no separate body input. body = None - elif sig_version in (4, 5): - digestmod = hashlib.sha512 + elif self.sig_version == 4: if params_go_in_body: body = self.canon_json(params) params = {} else: body = '' params = normalize_params(params) - else: - raise ValueError(f"unsupported sig_version {sig_version}") if self.sig_timezone == 'UTC': now = email.utils.formatdate() @@ -319,25 +244,20 @@ def api_call( self.host, path, now, - sig_version, + self.sig_version, params, body=body, - digestmod=digestmod, - additional_headers=additional_headers) + digestmod=self.digestmod) headers = { 'Authorization': auth, 'Date': now, } - if sig_version == 5: - for k, v in additional_headers.items(): - headers[k] = v - if self.user_agent: headers['User-Agent'] = self.user_agent if params_go_in_body: - if sig_version in (4, 5): + if self.sig_version == 4: headers['Content-type'] = 'application/json' else: headers['Content-type'] = 'application/x-www-form-urlencoded' @@ -462,16 +382,16 @@ def normalize_paging_args(self, limit=None, offset=0): return (limit, offset) - def json_api_call(self, method, path, params, sig_version=None): + def json_api_call(self, method, path, params): """ Call a Duo API method which is expected to return a JSON body with a 200 status. Return the response data structure or raise RuntimeError. """ - (response, data) = self.api_call(method, path, params, sig_version=sig_version) + (response, data) = self.api_call(method, path, params) return self.parse_json_response(response, data) - def json_paging_api_call(self, method, path, params, sig_version=None): + def json_paging_api_call(self, method, path, params): """ Call a Duo API method which is expected to return a JSON body with a 200 status. Return a generator that can be used to get @@ -485,13 +405,13 @@ def json_paging_api_call(self, method, path, params, sig_version=None): while next_offset is not None: params['offset'] = str(next_offset) - (response, data) = self.api_call(method, path, params, sig_version=sig_version) + (response, data) = self.api_call(method, path, params) (objects, metadata) = self.parse_json_response_and_metadata(response, data) next_offset = metadata.get('next_offset', None) for obj in objects: yield obj - def json_cursor_api_call(self, method, path, params, get_records_func, sig_version=None): + def json_cursor_api_call(self, method, path, params, get_records_func): """ Call a Duo API endpoint which utilizes a cursor in some responses to page through a set of data. This cursor is supplied through the optional @@ -520,7 +440,7 @@ def json_cursor_api_call(self, method, path, params, get_records_func, sig_versi while True: if next_offset is not None: params['offset'] = str(next_offset) - (http_resp, http_resp_data) = self.api_call(method, path, params, sig_version=sig_version) + (http_resp, http_resp_data) = self.api_call(method, path, params) (response, metadata) = self.parse_json_response_and_metadata( http_resp, http_resp_data, diff --git a/examples/create_integration_sso_generic.py b/examples/create_integration_sso_generic.py deleted file mode 100644 index da3450b..0000000 --- a/examples/create_integration_sso_generic.py +++ /dev/null @@ -1,67 +0,0 @@ -#!/usr/bin/python -from __future__ import absolute_import -from __future__ import print_function -import pprint -import sys - -import duo_client -from six.moves import input - -argv_iter = iter(sys.argv[1:]) -def get_next_arg(prompt): - try: - return next(argv_iter) - except StopIteration: - return input(prompt) - -ikey = get_next_arg('Admin API integration key ("DI..."): ') -skey = get_next_arg('integration secret key: ') -host = get_next_arg('API hostname ("api-....duosecurity.com"): ') - -# Configuration and information about objects to create. -admin_api = duo_client.Admin( - ikey, - skey, - host, -) - -integration = admin_api.create_integration( - name='api-created integration', - integration_type='sso-generic', - sso={ - "saml_config": { - "entity_id": "entity_id", - "acs_urls": [ - { - "url": "https://example.com/acs", - "binding": None, - "isDefault": None, - "index": None, - } - ], - "nameid_format": "urn:oasis:names:tc:SAML:1.1:nameid-format:unspecified", - "nameid_attribute": "mail", - "sign_assertion": False, - "sign_response": True, - "signing_algorithm": "http://www.w3.org/2001/04/xmldsig-more#rsa-sha256", - "mapped_attrs": {}, - "relaystate": "https://example.com/relaystate", - "slo_url": "https://example.com/slo", - "spinitiated_url": "https://example.com/spurl", - "static_attrs": {}, - "role_attrs": { - "bob": { - "ted": ["DGS08MMO53GNRLSFW0D0", "DGETXINZ6CSJO4LRSVKV"], - "frank": ["DGETXINZ6CSJO4LRSVKV"], - } - }, - "attribute_transformations": { - "attribute_1": 'use ""\nprepend text="dev-"', - "attribute_2": 'use ""\nappend additional_attr=""', - } - } - }, -) - -print('Created integration:') -pprint.pprint(integration) diff --git a/tests/accountAdmin/test_billing.py b/tests/accountAdmin/test_billing.py index 21c1d8e..74318b0 100644 --- a/tests/accountAdmin/test_billing.py +++ b/tests/accountAdmin/test_billing.py @@ -1,5 +1,3 @@ -import json - from .. import util import duo_client.admin from .base import TestAccountAdmin @@ -24,13 +22,14 @@ def test_set_business_billing_edition(self): """ response = self.client.set_edition('PLATFORM') uri = response['uri'] + args = response['body'] self.assertEqual(response['method'], 'POST') self.assertEqual(uri, '/admin/v1/billing/edition') - self.assertEqual(json.loads(response['body']), + self.assertEqual(util.params_to_dict(args), { - 'edition': 'PLATFORM', - 'account_id': self.client.account_id, + 'edition': ['PLATFORM'], + 'account_id': [self.client.account_id], }) def test_set_enterprise_billing_edition(self): @@ -38,13 +37,14 @@ def test_set_enterprise_billing_edition(self): """ response = self.client.set_edition('ENTERPRISE') uri = response['uri'] + args = response['body'] self.assertEqual(response['method'], 'POST') self.assertEqual(uri, '/admin/v1/billing/edition') - self.assertEqual(json.loads(response['body']), + self.assertEqual(util.params_to_dict(args), { - 'edition': 'ENTERPRISE', - 'account_id': self.client.account_id, + 'edition': ['ENTERPRISE'], + 'account_id': [self.client.account_id], }) def test_get_telephony_credits(self): @@ -65,11 +65,12 @@ def test_set_telephony_credits(self): """ response = self.client.set_telephony_credits(10) uri = response['uri'] + args = response['body'] self.assertEqual(response['method'], 'POST') self.assertEqual(uri, '/admin/v1/billing/telephony_credits') - self.assertEqual(json.loads(response['body']), + self.assertEqual(util.params_to_dict(args), { - 'credits': '10', - 'account_id': self.client.account_id, + 'credits': ['10'], + 'account_id': [self.client.account_id], }) diff --git a/tests/admin/test_admins.py b/tests/admin/test_admins.py index 60f50a4..807891b 100644 --- a/tests/admin/test_admins.py +++ b/tests/admin/test_admins.py @@ -1,4 +1,3 @@ -import json from .. import util import duo_client.admin from .base import TestAdmin @@ -156,10 +155,10 @@ def test_update_admin_password_mgmt_status(self): self.assertEqual(response['method'], 'POST') self.assertEqual(response['uri'], '/admin/v1/admins/DFAKEADMINID/password_mgmt') self.assertEqual( - json.loads(response['body']), + util.params_to_dict(response['body']), { - 'account_id': self.client.account_id, - 'has_external_password_mgmt': 'False' + 'account_id': [self.client.account_id], + 'has_external_password_mgmt': ['False'] }) def test_update_admin_password_mgmt_status_set_password(self): @@ -169,9 +168,9 @@ def test_update_admin_password_mgmt_status_set_password(self): self.assertEqual(response['method'], 'POST') self.assertEqual(response['uri'], '/admin/v1/admins/DFAKEADMINID/password_mgmt') self.assertEqual( - json.loads(response['body']), + util.params_to_dict(response['body']), { - 'account_id': self.client.account_id, - 'has_external_password_mgmt': 'True', - 'password': 'dolphins' + 'account_id': [self.client.account_id], + 'has_external_password_mgmt': ['True'], + 'password': ['dolphins'] }) diff --git a/tests/admin/test_groups.py b/tests/admin/test_groups.py index 53de0a3..7e516e1 100644 --- a/tests/admin/test_groups.py +++ b/tests/admin/test_groups.py @@ -1,4 +1,3 @@ -import json import warnings from .. import util import duo_client.admin @@ -213,5 +212,5 @@ def test_modify_group(self): response = self.client.modify_group('ABC123') self.assertEqual(response['method'], 'POST') self.assertEqual(response['uri'], '/admin/v1/groups/ABC123') - self.assertEqual(json.loads(response['body']), - {'account_id': self.client.account_id}) + self.assertEqual(util.params_to_dict(response['body']), + {'account_id': [self.client.account_id]}) diff --git a/tests/admin/test_integration.py b/tests/admin/test_integration.py deleted file mode 100644 index 77dcc6b..0000000 --- a/tests/admin/test_integration.py +++ /dev/null @@ -1,78 +0,0 @@ -from .. import util -import json -import duo_client.admin -from .base import TestAdmin - - -class TestIntegration(TestAdmin): - def setUp(self): - super(TestIntegration, self).setUp() - self.integration_key = "DISRYL7L8LZ5YXNWKGNK" - - def test_get_integration(self): - response = self.client.get_integration(self.integration_key) - (uri, args) = response['uri'].split('?') - - self.assertEqual(response['method'], 'GET') - self.assertEqual(uri, '/admin/v2/integrations/{}'.format(self.integration_key)) - self.assertEqual(util.params_to_dict(args), {'account_id': [self.client.account_id]}) - - def test_delete_integration(self): - response = self.client.delete_integration(self.integration_key) - (uri, args) = response['uri'].split('?') - - self.assertEqual(response['method'], 'DELETE') - self.assertEqual(uri, '/admin/v2/integrations/{}'.format(self.integration_key)) - self.assertEqual(util.params_to_dict(args), {'account_id': [self.client.account_id]}) - - def test_create_integration(self): - response = self.client.create_integration( - name="New integration name", - integration_type="sso-generic", - sso={ - "idp_metadata": None, - "saml_config": {} - }, - ) - - self.assertEqual(response['method'], 'POST') - self.assertEqual(response['uri'], '/admin/v2/integrations') - self.assertEqual(json.loads(response['body']), - { - "account_id": self.client.account_id, - "name": "New integration name", - "type": "sso-generic", - "sso": { - "idp_metadata": None, - "saml_config": {} - }, - } - ) - - def test_update_integration_success(self): - response = self.client.update_integration( - self.integration_key, - name="Integration name", - sso={ - "saml_config": { - "nameid_attribute": "mail", - } - }, - ) - - self.assertEqual(response['method'], 'POST') - self.assertEqual(response['uri'], '/admin/v2/integrations/{}'.format(self.integration_key)) - self.assertEqual(json.loads(response['body']), - { - "account_id": self.client.account_id, - "name": "Integration name", - "sso": { - "saml_config": { - "nameid_attribute": "mail", - } - }, - } - ) - -if __name__ == '__main__': - unittest.main() \ No newline at end of file diff --git a/tests/admin/test_integrations.py b/tests/admin/test_integrations.py index 59052c2..6844ba3 100644 --- a/tests/admin/test_integrations.py +++ b/tests/admin/test_integrations.py @@ -11,7 +11,7 @@ def test_get_integrations_generator(self): response = next(generator) self.assertEqual(response['method'], 'GET') (uri, args) = response['uri'].split('?') - self.assertEqual(uri, '/admin/v2/integrations') + self.assertEqual(uri, '/admin/v1/integrations') self.assertEqual( util.params_to_dict(args), { @@ -27,7 +27,7 @@ def test_get_integrations(self): response = response[0] self.assertEqual(response['method'], 'GET') (uri, args) = response['uri'].split('?') - self.assertEqual(uri, '/admin/v2/integrations') + self.assertEqual(uri, '/admin/v1/integrations') self.assertEqual( util.params_to_dict(args), { @@ -43,7 +43,7 @@ def test_get_integrations_with_limit(self): response = response[0] self.assertEqual(response['method'], 'GET') (uri, args) = response['uri'].split('?') - self.assertEqual(uri, '/admin/v2/integrations') + self.assertEqual(uri, '/admin/v1/integrations') self.assertEqual( util.params_to_dict(args), { @@ -59,7 +59,7 @@ def test_get_integrations_with_limit_offset(self): response = response[0] self.assertEqual(response['method'], 'GET') (uri, args) = response['uri'].split('?') - self.assertEqual(uri, '/admin/v2/integrations') + self.assertEqual(uri, '/admin/v1/integrations') self.assertEqual( util.params_to_dict(args), { @@ -75,7 +75,7 @@ def test_get_integrations_with_offset(self): response = response[0] self.assertEqual(response['method'], 'GET') (uri, args) = response['uri'].split('?') - self.assertEqual(uri, '/admin/v2/integrations') + self.assertEqual(uri, '/admin/v1/integrations') self.assertEqual( util.params_to_dict(args), { diff --git a/tests/admin/test_logo.py b/tests/admin/test_logo.py index 5834ff5..b666347 100644 --- a/tests/admin/test_logo.py +++ b/tests/admin/test_logo.py @@ -1,4 +1,3 @@ -import json from .base import TestAdmin import os import base64 @@ -22,6 +21,5 @@ def test_update_logo(self): # Validate response: self.assertTrue( - json.loads(response['body']).get('logo'), - base64_logo + 'logo={}'.format(base64_logo) in response['body'] ) diff --git a/tests/admin/test_settings.py b/tests/admin/test_settings.py index c03344a..2d4e600 100644 --- a/tests/admin/test_settings.py +++ b/tests/admin/test_settings.py @@ -1,4 +1,3 @@ -import json from .. import util import duo_client.admin from .base import TestAdmin @@ -9,7 +8,6 @@ class TestSettings(TestAdmin): def test_update_settings(self): """ Test updating settings """ - self.maxDiff = None response = self.client_list.update_settings( lockout_threshold=10, lockout_expire_duration=60, @@ -46,37 +44,37 @@ def test_update_settings(self): self.assertEqual(response['method'], 'POST') self.assertEqual(response['uri'], '/admin/v1/settings') self.assertEqual( - json.loads(response['body']), + util.params_to_dict(response['body']), { - 'account_id': self.client.account_id, - 'lockout_threshold': '10', - 'lockout_expire_duration': '60', - 'inactive_user_expiration': '30', - 'pending_deletion_days': '5', - 'log_retention_days': '180', - 'sms_batch': '5', - 'sms_expiration': '60', - 'sms_refresh': '1', - 'sms_message': 'test_message', - 'fraud_email': 'test@example.com', - 'fraud_email_enabled': '1', - 'keypress_confirm': '0', - 'keypress_fraud': '9', - 'timezone': 'UTC', - 'telephony_warning_min': '50', - 'caller_id': '+15035551000', - 'user_telephony_cost_max': '10', - 'minimum_password_length': '12', - 'password_requires_upper_alpha': '1', - 'password_requires_lower_alpha': '1', - 'password_requires_numeric': '1', - 'password_requires_special': '1', - 'helpdesk_bypass': 'allow', - 'helpdesk_bypass_expiration': '60', - 'helpdesk_message': 'test_message', - 'helpdesk_can_send_enroll_email': '1', - 'reactivation_url': 'https://www.example.com', - 'reactivation_integration_key': 'DINTEGRATIONKEYTEST0', - 'security_checkup_enabled': '1', - 'user_managers_can_put_users_in_bypass': '0', + 'account_id': [self.client.account_id], + 'lockout_threshold': ['10'], + 'lockout_expire_duration': ['60'], + 'inactive_user_expiration': ['30'], + 'pending_deletion_days': ['5'], + 'log_retention_days': ['180'], + 'sms_batch': ['5'], + 'sms_expiration': ['60'], + 'sms_refresh': ['1'], + 'sms_message': ['test_message'], + 'fraud_email': ['test@example.com'], + 'fraud_email_enabled': ['1'], + 'keypress_confirm': ['0'], + 'keypress_fraud': ['9'], + 'timezone': ['UTC'], + 'telephony_warning_min': ['50'], + 'caller_id': ['+15035551000'], + 'user_telephony_cost_max': ['10'], + 'minimum_password_length': ['12'], + 'password_requires_upper_alpha': ['1'], + 'password_requires_lower_alpha': ['1'], + 'password_requires_numeric': ['1'], + 'password_requires_special': ['1'], + 'helpdesk_bypass': ['allow'], + 'helpdesk_bypass_expiration': ['60'], + 'helpdesk_message': ['test_message'], + 'helpdesk_can_send_enroll_email': ['1'], + 'reactivation_url': ['https://www.example.com'], + 'reactivation_integration_key': ['DINTEGRATIONKEYTEST0'], + 'security_checkup_enabled': ['1'], + 'user_managers_can_put_users_in_bypass': ['0'], }) diff --git a/tests/admin/test_users.py b/tests/admin/test_users.py index 8cd99c6..6ad8323 100644 --- a/tests/admin/test_users.py +++ b/tests/admin/test_users.py @@ -1,4 +1,3 @@ -import json from .. import util import duo_client.admin from .base import TestAdmin @@ -112,28 +111,28 @@ def test_add_user(self): self.assertEqual(response['method'], 'POST') self.assertEqual(response['uri'], '/admin/v1/users') self.assertEqual( - json.loads(response['body']), + util.params_to_dict(response['body']), { - 'realname': 'bar', - 'notes': 'notes', - 'username': 'foo', - 'status': 'active', - 'email': 'foobar@baz.com', - 'firstname': 'fName', - 'lastname': 'lName', - 'account_id': self.client.account_id, - 'alias1': 'alias1', - 'alias2': 'alias2', - 'alias3': 'alias3', - 'alias4': 'alias4', + 'realname': ['bar'], + 'notes': ['notes'], + 'username': ['foo'], + 'status': ['active'], + 'email': ['foobar@baz.com'], + 'firstname': ['fName'], + 'lastname': ['lName'], + 'account_id': [self.client.account_id], + 'alias1': ['alias1'], + 'alias2': ['alias2'], + 'alias3': ['alias3'], + 'alias4': ['alias4'], }) # defaults response = self.client.add_user('bar') self.assertEqual(response['method'], 'POST') self.assertEqual(response['uri'], '/admin/v1/users') self.assertEqual( - json.loads(response['body']), - {'username':'bar', 'account_id':self.client.account_id}) + util.params_to_dict(response['body']), + {'username':['bar'], 'account_id':[self.client.account_id]}) def test_update_user(self): response = self.client.update_user( @@ -145,21 +144,21 @@ def test_update_user(self): self.assertEqual( response['uri'], '/admin/v1/users/DU012345678901234567') self.assertEqual( - json.loads(response['body']), + util.params_to_dict(response['body']), { - 'account_id':self.client.account_id, - 'realname': 'bar', - 'notes': 'notes', - 'username': 'foo', - 'status': 'active', - 'email': 'foobar@baz.com', - 'firstname': 'fName', - 'lastname': 'lName', - 'account_id': self.client.account_id, - 'alias1': 'alias1', - 'alias2': 'alias2', - 'alias3': 'alias3', - 'alias4': 'alias4', + 'account_id':[self.client.account_id], + 'realname': ['bar'], + 'notes': ['notes'], + 'username': ['foo'], + 'status': ['active'], + 'email': ['foobar@baz.com'], + 'firstname': ['fName'], + 'lastname': ['lName'], + 'account_id': [self.client.account_id], + 'alias1': ['alias1'], + 'alias2': ['alias2'], + 'alias3': ['alias3'], + 'alias4': ['alias4'], }) def test_sync_user(self): @@ -170,5 +169,5 @@ def test_sync_user(self): self.assertEqual(response['uri'], '/admin/v1/users/directorysync/test_dir_key/syncuser') self.assertEqual( - json.loads(response['body']), - {'username': 'foo', 'account_id': self.client.account_id}) + util.params_to_dict(response['body']), + {'username': ['foo'], 'account_id': [self.client.account_id]}) diff --git a/tests/test_client.py b/tests/test_client.py index 035ad37..8ce7dfa 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -160,22 +160,6 @@ def test_v4_with_json(self): self.assertEqual(actual, expected) - def test_v5_with_json(self): - hashed_body = hashlib.sha512(JSON_STRING.encode('utf-8')).hexdigest() - headers = {"X-Duo-Header-1": "header_value_1"} - expected = ( - 'Tue, 17 Nov 2020 14:12:00\n' - 'POST\n' - 'foo.bar52.com\n' - '/Foo/BaR2/qux\n\n' + hashed_body - +'\n630b4bfe7e9abd03da2eee8f0a5d4e60a254ec880a839bcc2223bb5b9443e8ef24d58f0' - '254f1f5934bf8c017ebd0fd5b1acf86766bdbe74185e712a4092df3ed') - params = {} - body = duo_client.client.Client.canon_json(JSON_BODY) - actual = duo_client.client.canonicalize( - 'POST', 'foO.BaR52.cOm', '/Foo/BaR2/qux', params, 'Tue, 17 Nov 2020 14:12:00', - sig_version=5, body=body, additional_headers=headers) - def test_invalid_signature_version_raises(self): params = duo_client.client.Client.canon_json(JSON_BODY) with self.assertRaises(ValueError) as e: