From b948d66ef05e569440869dcc070fae04f6d594ba Mon Sep 17 00:00:00 2001 From: Ben Cutler <90279826+ben-cutler-datarobot@users.noreply.github.com> Date: Thu, 19 Dec 2024 10:02:00 -0500 Subject: [PATCH] Add Session Affinity toggle to dr-apps CLI (#65) --- drapps/create.py | 23 ++++---- .../helpers/custom_app_sources_functions.py | 41 +++++++------- setup.cfg | 2 +- tests/cli/test_create.py | 54 +++++++++++-------- 4 files changed, 68 insertions(+), 52 deletions(-) diff --git a/drapps/create.py b/drapps/create.py index 1b543a2..10529a8 100644 --- a/drapps/create.py +++ b/drapps/create.py @@ -23,8 +23,7 @@ get_custom_app_source_by_name, get_custom_app_source_versions_list, update_application_source_version, - update_cpu_size, - update_num_replicas, + update_resources, update_runtime_params, ) from .helpers.custom_apps_functions import ( @@ -177,6 +176,7 @@ def configure_custom_app_source_version( runtime_params: List[Dict], replicas: int, cpu_size: str, + use_session_affinity: bool, ) -> None: payload: Dict[str, Any] = {'baseEnvironmentVersionId': base_env_version_id} project_files = get_project_files_list(project) @@ -209,19 +209,14 @@ def configure_custom_app_source_version( payload = {} progress.update(len(file_chunk)) - update_num_replicas( + update_resources( session=session, endpoint=endpoint, source_id=custom_app_source_id, version_id=custom_app_source_version_id, replicas=replicas, - ) - update_cpu_size( - session=session, - endpoint=endpoint, - source_id=custom_app_source_id, - version_id=custom_app_source_version_id, cpu_size=cpu_size, + session_affinity=use_session_affinity, ) # Finally, add runtime params update_runtime_params( @@ -242,6 +237,7 @@ def create_app_from_project( runtime_params: List[Dict], replicas: int, cpu_size: str, + use_session_affinity: bool, ) -> Dict[str, Any]: base_env_version_id = get_base_env_version(session, endpoint, base_env) source_name = f'{app_name}Source' @@ -258,6 +254,7 @@ def create_app_from_project( runtime_params=runtime_params, replicas=replicas, cpu_size=cpu_size, + use_session_affinity=use_session_affinity, ) app_payload = {'name': app_name, 'applicationSourceId': custom_app_source_id} click.echo(f'Starting {app_name} custom application.') @@ -432,6 +429,12 @@ def parse_env_vars(ctx, param, value): callback=parse_env_vars, help='Numeric environment variable in the format KEY=VALUE', ) +@click.option( + '--use-session-affinity', + is_flag=True, + default=False, + help='Controls whether you want requests to go to the same instance when you have multiple replicas. This can be useful for the streamlit file upload widget, which can raise 401 errors without session stickiness or if you need to store persistent information in local memory/files.', +) @click.argument('application_name', type=click.STRING, required=True) def create( token: str, @@ -445,6 +448,7 @@ def create( application_name: str, replicas: int, cpu_size: str, + use_session_affinity: bool, ) -> None: """ Creates new custom application from docker image or base environment. @@ -482,6 +486,7 @@ def create( runtime_params=runtime_params, replicas=replicas, cpu_size=cpu_size, + use_session_affinity=use_session_affinity, ) if skip_wait or not app_data.get('statusUrl'): diff --git a/drapps/helpers/custom_app_sources_functions.py b/drapps/helpers/custom_app_sources_functions.py index 07b99d1..016ef14 100644 --- a/drapps/helpers/custom_app_sources_functions.py +++ b/drapps/helpers/custom_app_sources_functions.py @@ -5,8 +5,9 @@ # This is proprietary source code of DataRobot, Inc. and its affiliates. # Released under the terms of DataRobot Tool and Utility Agreement. # +import json import posixpath -from typing import Any, Dict, List +from typing import Any, Dict, List, Optional from requests import Session @@ -96,32 +97,30 @@ def update_runtime_params( handle_dr_response(response) -def update_num_replicas( +def update_resources( session: Session, endpoint: str, source_id: str, version_id: str, - replicas: int, + replicas: Optional[int] = None, + cpu_size: Optional[str] = None, + session_affinity: Optional[bool] = None, ): + resources = dict() + if replicas is not None: + resources["replicas"] = replicas + if cpu_size is not None: + if cpu_size == '2xsmall': + cpu_size = 'nano' + elif cpu_size == 'xsmall': + cpu_size = 'micro' + resources["resourceLabel"] = f'cpu.{cpu_size}' # type: ignore + if session_affinity is not None: + resources["sessionAffinity"] = session_affinity + if not resources: + return url = posixpath.join(endpoint, f"customApplicationSources/{source_id}/versions/{version_id}/") - form_data = {"resources": (None, f'{{"replicas":{replicas}}}', 'application/json')} - rsp = session.patch(url, files=form_data) - handle_dr_response(rsp) - - -def update_cpu_size( - session: Session, - endpoint: str, - source_id: str, - version_id: str, - cpu_size: str, -): - if cpu_size == '2xsmall': - cpu_size = 'nano' - elif cpu_size == 'xsmall': - cpu_size = 'micro' - url = posixpath.join(endpoint, f"customApplicationSources/{source_id}/versions/{version_id}/") - form_data = {"resources": (None, f'{{"resourceLabel":"cpu.{cpu_size}"}}', 'application/json')} + form_data = {"resources": (None, json.dumps(resources), 'application/json')} rsp = session.patch(url, files=form_data) handle_dr_response(rsp) diff --git a/setup.cfg b/setup.cfg index 5af8858..5ca3d12 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,2 +1,2 @@ [metadata] -version = 10.2.6 +version = 10.2.7 diff --git a/tests/cli/test_create.py b/tests/cli/test_create.py index 0e8491c..e01a207 100644 --- a/tests/cli/test_create.py +++ b/tests/cli/test_create.py @@ -121,6 +121,9 @@ def test_create_from_docker_image(api_endpoint_env, api_token_env, wait_till_rea @responses.activate @pytest.mark.parametrize('use_environment_id', (False, True)) @pytest.mark.parametrize('wait_till_ready', (False, True)) +@pytest.mark.parametrize('use_session_affinity', (False, True)) +@pytest.mark.parametrize('n_instances', (2, None)) # None == unset +@pytest.mark.parametrize('desired_cpu_size', ('2xsmall', None)) def test_create_from_project( api_endpoint_env, api_token_env, @@ -132,20 +135,21 @@ def test_create_from_project( entrypoint_script_content, use_environment_id, wait_till_ready, + use_session_affinity, + n_instances, + desired_cpu_size, ): """ Sort-of a mega test for the create app + src from a code-based project (non docker image). This tests: 1. Creating the source + app 2. Runtime params that are strings / numeric 3. The number of instances is passed - 4. The tee-shirt size is passed (requires MLOPS_RESOURCE_REQUEST_BUNDLES feature flag) + 4. The tee-shirt size is passed Note: The responses API is not great at handling cases where we call an API multiple times with multiple different parameters, like we do with the /customApplicationSources//versions//. So rather than use a form-data matcher, it makes more sense to just check it after the test run. """ - n_instances = 2 - desired_cpu_size = '2xsmall' app_name = 'new_app' project_folder = 'project-folder' ee_name = 'ExecutionEnv' @@ -253,11 +257,18 @@ def test_create_from_project( 'INT_VAL=3', '--numericEnvVar', 'FLOAT_VAL=3.14', - '--replicas', - str(n_instances), '--cpu-size', desired_cpu_size, ] + if use_session_affinity: + cli_parameters.append('--use-session-affinity') + if n_instances: + cli_parameters.append('--replicas') + cli_parameters.append(str(n_instances)) + if desired_cpu_size: + cli_parameters.append('--cpu-size') + cli_parameters.append(desired_cpu_size) + if not wait_till_ready: cli_parameters.append('--skip-wait') @@ -298,28 +309,29 @@ def test_create_from_project( assert float(param['value']) == float(numeric_env_vars[param['fieldName']]) else: pytest.fail(f"Unexpected environment variable: {param['fieldName']}") - # Assertions to verify instances were properly specified - env_var_requests = [ + resource_request = [ call for call in responses.calls if call.request.url.endswith(f'/versions/{custom_app_source_version_id}/') and call.request.method == 'PATCH' # noqa: W503 - and 'replicas' in call.request.body.decode('utf-8') # noqa: W503 + and '"resources"' in call.request.body.decode('utf-8') # noqa: W503 ] - assert len(env_var_requests) == 1 - assert f'{{"replicas":{n_instances}}}'.encode() in env_var_requests[0].request.body - # Assertions to make sure tee shirt CPU size is applied - cpu_sz_requests = [ - call - for call in responses.calls - if call.request.url.endswith(f'/versions/{custom_app_source_version_id}/') - and call.request.method == 'PATCH' # noqa: W503 - and 'resourceLabel' in call.request.body.decode('utf-8') # noqa: W503 - ] - assert len(cpu_sz_requests) == 1 - # NOTE: We have to map 2xsmall -> nano - assert '{"resourceLabel":"cpu.nano"}'.encode() in cpu_sz_requests[0].request.body + assert len(resource_request) == 1 + + content_type = resource_request[0].request.headers["Content-Type"] + sent_payload = json.loads( + next( + part + for part in decoder.MultipartDecoder( + resource_request[0].request.body, content_type + ).parts + if b'name="resources' in part.headers[b'Content-Disposition'] + ).text + ) + assert sent_payload.get('replicas') == n_instances or 1 + assert sent_payload.get("resourceLabel") == "cpu.nano" or 'cpu.small' + assert sent_payload.get("sessionAffinity") == use_session_affinity @pytest.mark.usefixtures('api_endpoint_env', 'api_token_env')