From 414ab1a094ca7bc58dc79b4085852f3f4110aa18 Mon Sep 17 00:00:00 2001 From: Mark Tripod Date: Tue, 20 Feb 2024 22:00:10 -0500 Subject: [PATCH] feat: add bulk operations methods to admin client Admin.bulk_add_users() Admin.bulk_operations() tests/admin/test_bulk.py --- duo_client/admin.py | 104 ++++++++++++++++++++++++++++++++++++--- tests/admin/test_bulk.py | 38 ++++++++++++++ 2 files changed, 134 insertions(+), 8 deletions(-) create mode 100644 tests/admin/test_bulk.py diff --git a/duo_client/admin.py b/duo_client/admin.py index 6fb5b6c..e962616 100644 --- a/duo_client/admin.py +++ b/duo_client/admin.py @@ -174,17 +174,18 @@ """ from __future__ import absolute_import -import six.moves.urllib - -from . import client, Accounts -from .logs.telephony import Telephony -import six -import warnings +import base64 import json import time -import base64 +import warnings from datetime import datetime, timedelta, timezone +import six +import six.moves.urllib + +from . import Accounts, client +from .logs.telephony import Telephony + USER_STATUS_ACTIVE = "active" USER_STATUS_BYPASS = "bypass" USER_STATUS_DISABLED = "disabled" @@ -493,7 +494,7 @@ def get_authentication_log(self, api_version=1, **kwargs): params = {} if api_version == 1: #v1 - params['mintime'] = kwargs['mintime'] if 'mintime' in kwargs else 0; + params['mintime'] = kwargs['mintime'] if 'mintime' in kwargs else 0 # Sanity check mintime as unix timestamp, then transform to string params['mintime'] = '{:d}'.format(int(params['mintime'])) warnings.warn( @@ -3514,6 +3515,93 @@ def get_policy_summary_v2(self): response = self.json_api_call("GET", path, {}) return response + def bulk_operations(self, operations): + """ + Perform a series of bulk operations sequentially + + Args: + operations (list) - list of JSON objects representing the operations to perform. + + Returns (list) - response objects for each item in the operations list + + Raises RuntimeError on error + + Example: + operations = [ + { + "method": "POST", + "path": "/admin/v1/users", + "body": { + "username": "uname1", + "alias1": "my_alias1", + "alias2": "my_alias2", + "alias3": "my_alias3", + "alias4": "my_alias4", + "email": "user@example.com", + "status": "active", + "notes": "This is a user", + }, + }, + { + "method": "POST", + "path": "/admin/v1/users/DUXXXXXXXXXXXXXXXXX1", + "body": { + "alias2": "updated_alias2", + "email": "user2@example.com", + "status": "active", + "notes": "This is another user", + }, + }, + { + "method": "DELETE", + "path": "/admin/v1/users/DUXXXXXXXXXXXXXXXXX1", + "body": {}, + }, + { + "method": "POST", + "path": "/admin/v1/users/DUXXXXXXXXXXXXXXXXX1/groups", + "body": { + "group_id": "DUXXXXXXXXXXXXXXXXX1", + }, + }, + { + "method": "DELETE", + "path": "/admin/v1/users/DUXXXXXXXXXXXXXXXXX1/groups/DUXXXXXXXXXXXXXXXXX1", + "body": {}, + }, + ] + + """ + + path = "/admin/v1/bulk" + params = {"operations": json.dumps(operations)} + response = self.json_api_call("POST", path, params) + return response + + def bulk_add_users(self, users): + """ + Create users in bulk atomically + + Args: + users (list) - list of JSON objects representing each user object to create. + + Returns (list) - user response objects for each user in the list argument + + Raises RuntimeError on error + + Example: + users = [ + {"username": "example_username_1", "email": "example_user_1@example.com"}, + {"username": "example_username_2", "status": "disabled"}, + ] + + """ + + path = "/admin/v1/bulk_create" + params = {"users": json.dumps(users)} + response = self.json_api_call("POST", path, params) + return response + class AccountAdmin(Admin): """AccountAdmin manages a child account using an Accounts API integration.""" diff --git a/tests/admin/test_bulk.py b/tests/admin/test_bulk.py new file mode 100644 index 0000000..e8e3960 --- /dev/null +++ b/tests/admin/test_bulk.py @@ -0,0 +1,38 @@ +import json + +from .base import TestAdmin + + +class TestBulkOperations(TestAdmin): + def test_bulk_add_users(self): + """Test to bulk create users + """ + response = self.client_list.bulk_add_users( + [{"username": "example_username_1", "email": "example_user_1@example.com"}, + {"username": "example_username_2", "status": "disabled"}, ]) + + result_list = json.loads(json.loads(response[0]['body'])['users']) + + self.assertEqual(result_list[0]['username'], 'example_username_1') + self.assertEqual(result_list[0]['email'], 'example_user_1@example.com') + + def test_bulk_operations(self): + """Test to execute bulk operations + """ + + input_data = [{"method": "POST", "path": "/admin/v1/users", + "body": {"username": "uname1", "alias1": "my_alias1", "alias2": "my_alias2", + "alias3": "my_alias3", "alias4": "my_alias4", "email": "user@example.com", + "status": "active", "notes": "This is a user", }, }, + {"method": "POST", "path": "/admin/v1/users/DUXXXXXXXXXXXXXXXXXX", + "body": {"alias2": "updated_alias2", "email": "user2@example.com", "status": "active", + "notes": "This is another user", }, }, + {"method": "DELETE", "path": "/admin/v1/users/DUXXXXXXXXXXXXXXXXXX", "body": {}, }, + {"method": "POST", "path": "/admin/v1/users/DUXXXXXXXXXXXXXXXXXX/groups", + "body": {"group_id": "DUXXXXXXXXXXXXXXXXXX", }, }, + {"method": "DELETE", "path": "/admin/v1/users/DUXXXXXXXXXXXXXXXXXX/groups/DUXXXXXXXXXXXXXXXXXX", + "body": {}, }, ] + response = self.client_list.bulk_operations(input_data) + + result_list = json.loads(json.loads(response[0]['body'])['operations']) + self.assertEqual(result_list, input_data)