Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Session Affinity toggle to dr-apps CLI #65

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 14 additions & 9 deletions drapps/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@
get_custom_app_source_by_name,
get_custom_app_source_versions_list,
update_application_source_version,
update_cpu_size,
update_num_replicas,
update_resources,
update_runtime_params,
)
from .helpers.custom_apps_functions import (
Expand Down Expand Up @@ -177,6 +176,7 @@ def configure_custom_app_source_version(
runtime_params: List[Dict],
replicas: int,
cpu_size: str,
use_session_affinity: bool,
) -> None:
payload: Dict[str, Any] = {'baseEnvironmentVersionId': base_env_version_id}
project_files = get_project_files_list(project)
Expand Down Expand Up @@ -209,19 +209,14 @@ def configure_custom_app_source_version(

payload = {}
progress.update(len(file_chunk))
update_num_replicas(
update_resources(
session=session,
endpoint=endpoint,
source_id=custom_app_source_id,
version_id=custom_app_source_version_id,
replicas=replicas,
)
update_cpu_size(
session=session,
endpoint=endpoint,
source_id=custom_app_source_id,
version_id=custom_app_source_version_id,
cpu_size=cpu_size,
session_affinity=use_session_affinity,
)
# Finally, add runtime params
update_runtime_params(
Expand All @@ -242,6 +237,7 @@ def create_app_from_project(
runtime_params: List[Dict],
replicas: int,
cpu_size: str,
use_session_affinity: bool,
) -> Dict[str, Any]:
base_env_version_id = get_base_env_version(session, endpoint, base_env)
source_name = f'{app_name}Source'
Expand All @@ -258,6 +254,7 @@ def create_app_from_project(
runtime_params=runtime_params,
replicas=replicas,
cpu_size=cpu_size,
use_session_affinity=use_session_affinity,
)
app_payload = {'name': app_name, 'applicationSourceId': custom_app_source_id}
click.echo(f'Starting {app_name} custom application.')
Expand Down Expand Up @@ -432,6 +429,12 @@ def parse_env_vars(ctx, param, value):
callback=parse_env_vars,
help='Numeric environment variable in the format KEY=VALUE',
)
@click.option(
'--use-session-affinity',
is_flag=True,
default=False,
help='Controls whether you want requests to go to the same instance when you have multiple replicas. This can be useful for the streamlit file upload widget, which can raise 401 errors without session stickiness or if you need to store persistent information in local memory/files.',
)
@click.argument('application_name', type=click.STRING, required=True)
def create(
token: str,
Expand All @@ -445,6 +448,7 @@ def create(
application_name: str,
replicas: int,
cpu_size: str,
use_session_affinity: bool,
) -> None:
"""
Creates new custom application from docker image or base environment.
Expand Down Expand Up @@ -482,6 +486,7 @@ def create(
runtime_params=runtime_params,
replicas=replicas,
cpu_size=cpu_size,
use_session_affinity=use_session_affinity,
)

if skip_wait or not app_data.get('statusUrl'):
Expand Down
41 changes: 20 additions & 21 deletions drapps/helpers/custom_app_sources_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
# This is proprietary source code of DataRobot, Inc. and its affiliates.
# Released under the terms of DataRobot Tool and Utility Agreement.
#
import json
import posixpath
from typing import Any, Dict, List
from typing import Any, Dict, List, Optional

from requests import Session

Expand Down Expand Up @@ -96,32 +97,30 @@ def update_runtime_params(
handle_dr_response(response)


def update_num_replicas(
def update_resources(
session: Session,
endpoint: str,
source_id: str,
version_id: str,
replicas: int,
replicas: Optional[int] = None,
cpu_size: Optional[str] = None,
session_affinity: Optional[bool] = None,
):
resources = dict()
if replicas is not None:
resources["replicas"] = replicas
if cpu_size is not None:
if cpu_size == '2xsmall':
cpu_size = 'nano'
elif cpu_size == 'xsmall':
cpu_size = 'micro'
resources["resourceLabel"] = f'cpu.{cpu_size}' # type: ignore
if session_affinity is not None:
resources["sessionAffinity"] = session_affinity
if not resources:
return
url = posixpath.join(endpoint, f"customApplicationSources/{source_id}/versions/{version_id}/")
form_data = {"resources": (None, f'{{"replicas":{replicas}}}', 'application/json')}
rsp = session.patch(url, files=form_data)
handle_dr_response(rsp)


def update_cpu_size(
session: Session,
endpoint: str,
source_id: str,
version_id: str,
cpu_size: str,
):
if cpu_size == '2xsmall':
cpu_size = 'nano'
elif cpu_size == 'xsmall':
cpu_size = 'micro'
url = posixpath.join(endpoint, f"customApplicationSources/{source_id}/versions/{version_id}/")
form_data = {"resources": (None, f'{{"resourceLabel":"cpu.{cpu_size}"}}', 'application/json')}
form_data = {"resources": (None, json.dumps(resources), 'application/json')}
rsp = session.patch(url, files=form_data)
handle_dr_response(rsp)

Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
[metadata]
version = 10.2.6
version = 10.2.7
54 changes: 33 additions & 21 deletions tests/cli/test_create.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ def test_create_from_docker_image(api_endpoint_env, api_token_env, wait_till_rea
@responses.activate
@pytest.mark.parametrize('use_environment_id', (False, True))
@pytest.mark.parametrize('wait_till_ready', (False, True))
@pytest.mark.parametrize('use_session_affinity', (False, True))
@pytest.mark.parametrize('n_instances', (2, None)) # None == unset
@pytest.mark.parametrize('desired_cpu_size', ('2xsmall', None))
def test_create_from_project(
api_endpoint_env,
api_token_env,
Expand All @@ -132,20 +135,21 @@ def test_create_from_project(
entrypoint_script_content,
use_environment_id,
wait_till_ready,
use_session_affinity,
n_instances,
desired_cpu_size,
):
"""
Sort-of a mega test for the create app + src from a code-based project (non docker image). This tests:
1. Creating the source + app
2. Runtime params that are strings / numeric
3. The number of instances is passed
4. The tee-shirt size is passed (requires MLOPS_RESOURCE_REQUEST_BUNDLES feature flag)
4. The tee-shirt size is passed
Note:
The responses API is not great at handling cases where we call an API multiple times with multiple different
parameters, like we do with the /customApplicationSources/<app-src-id>/versions/<version-id>/.
So rather than use a form-data matcher, it makes more sense to just check it after the test run.
"""
n_instances = 2
desired_cpu_size = '2xsmall'
app_name = 'new_app'
project_folder = 'project-folder'
ee_name = 'ExecutionEnv'
Expand Down Expand Up @@ -253,11 +257,18 @@ def test_create_from_project(
'INT_VAL=3',
'--numericEnvVar',
'FLOAT_VAL=3.14',
'--replicas',
str(n_instances),
'--cpu-size',
desired_cpu_size,
]
if use_session_affinity:
cli_parameters.append('--use-session-affinity')
if n_instances:
cli_parameters.append('--replicas')
cli_parameters.append(str(n_instances))
if desired_cpu_size:
cli_parameters.append('--cpu-size')
cli_parameters.append(desired_cpu_size)

if not wait_till_ready:
cli_parameters.append('--skip-wait')

Expand Down Expand Up @@ -298,28 +309,29 @@ def test_create_from_project(
assert float(param['value']) == float(numeric_env_vars[param['fieldName']])
else:
pytest.fail(f"Unexpected environment variable: {param['fieldName']}")

# Assertions to verify instances were properly specified
env_var_requests = [
resource_request = [
call
for call in responses.calls
if call.request.url.endswith(f'/versions/{custom_app_source_version_id}/')
and call.request.method == 'PATCH' # noqa: W503
and 'replicas' in call.request.body.decode('utf-8') # noqa: W503
and '"resources"' in call.request.body.decode('utf-8') # noqa: W503
]
assert len(env_var_requests) == 1
assert f'{{"replicas":{n_instances}}}'.encode() in env_var_requests[0].request.body
# Assertions to make sure tee shirt CPU size is applied
cpu_sz_requests = [
call
for call in responses.calls
if call.request.url.endswith(f'/versions/{custom_app_source_version_id}/')
and call.request.method == 'PATCH' # noqa: W503
and 'resourceLabel' in call.request.body.decode('utf-8') # noqa: W503
]
assert len(cpu_sz_requests) == 1
# NOTE: We have to map 2xsmall -> nano
assert '{"resourceLabel":"cpu.nano"}'.encode() in cpu_sz_requests[0].request.body
assert len(resource_request) == 1

content_type = resource_request[0].request.headers["Content-Type"]
sent_payload = json.loads(
next(
part
for part in decoder.MultipartDecoder(
resource_request[0].request.body, content_type
).parts
if b'name="resources' in part.headers[b'Content-Disposition']
).text
)
assert sent_payload.get('replicas') == n_instances or 1
assert sent_payload.get("resourceLabel") == "cpu.nano" or 'cpu.small'
assert sent_payload.get("sessionAffinity") == use_session_affinity


@pytest.mark.usefixtures('api_endpoint_env', 'api_token_env')
Expand Down