Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CASMCMS-9225: Improve performance of large GET components requests #413

Merged
merged 1 commit into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [2.0.48] - 2024-12-18
### Changed
- Improve performance of large `GET` components requests.

### Dependencies
- Bumped `certifi` version to resolve CVE.
- Bumped `grpcio` version to resolve CVE.

## [2.0.47] - 2024-10-22
### Added
#### BOS option
Expand Down
107 changes: 68 additions & 39 deletions src/bos/server/controllers/v2/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE.
#
import connexion
from functools import partial
import logging

import connexion

from bos.common.utils import exc_type_msg, get_current_timestamp
from bos.common.values import Phase, Action, Status, EMPTY_STAGED_STATE, EMPTY_BOOT_ARTIFACTS
from bos.server import redis_db_utils as dbutils
Expand All @@ -44,7 +46,6 @@ def get_v2_components(ids="", enabled=None, session=None, staged_session=None, p
LOGGER.debug("GET /v2/components invoked get_v2_components with ids=%s enabled=%s session=%s "
"staged_session=%s phase=%s status=%s", ids, enabled, session, staged_session,
phase, status)
id_list = []
if ids:
try:
id_list = ids.split(',')
Expand All @@ -53,45 +54,86 @@ def get_v2_components(ids="", enabled=None, session=None, staged_session=None, p
return connexion.problem(
status=400, title="Error parsing the ids provided.",
detail=str(err))
LOGGER.debug("GET /v2/components with %d IDs specified", len(id_list))
else:
id_list = None
LOGGER.debug("GET /v2/components with %d IDs specified", len(id_list) if id_list else 0)
response = get_v2_components_data(id_list=id_list, enabled=enabled, session=session, staged_session=staged_session,
phase=phase, status=status)
phase=phase, status=status, delete_timestamp=True)
LOGGER.debug("GET /v2/components returning data on %d components", len(response))
for component in response:
del_timestamp(component)
return response, 200


def get_v2_components_data(id_list=None, enabled=None, session=None, staged_session=None,
phase=None, status=None):
phase=None, status=None, delete_timestamp: bool = False):
"""Used by the GET /components API operation

Allows filtering using a comma separated list of ids.
"""
response = []
if id_list:
for component_id in id_list:
data = DB.get(component_id)
if data:
response.append(data)
if id_list is not None:
# If id_list is not None but is empty, that means no components in the system
# will match our filter, so we can return an empty list immediately.
if not id_list:
return []
id_set = set(id_list)
else:
# TODO: On large scale systems, this response may be too large
# and require paging to be implemented
response = DB.get_all()
# The status must be set before using _matches_filter as the status is one of the matching criteria.
response = [_set_status(r) for r in response if r]
if enabled is not None or session is not None or staged_session is not None or phase is not None or status is not None:
response = [r for r in response if _matches_filter(r, enabled, session, staged_session, phase, status)]
return response
id_set = None

if any([id_set, enabled, session, staged_session, phase, status]):
_component_filter_func = partial(_filter_component,
id_set=id_set,
enabled=enabled,
session=session or None,
staged_session=staged_session or None,
phase=phase or None,
status=status or None,
delete_timestamp=delete_timestamp)
else:
_component_filter_func = partial(_set_status,
delete_timestamp=delete_timestamp)

return DB.get_all_filtered(filter_func=_component_filter_func)


def _filter_component(data: dict,
id_set=None,
enabled=None,
session=None,
staged_session=None,
phase=None,
status=None,
delete_timestamp: bool = False) -> dict | None:
# Do all of the checks we can before calculating status, to avoid doing it needlessly
if id_set is not None and data["id"] not in id_set:
return None
if enabled is not None and data.get('enabled', None) != enabled:
return None
if session is not None and data.get('session', None) != session:
return None
if staged_session is not None and \
data.get('staged_state', {}).get('session', None) != staged_session:
return None
updated_data = _set_status(data)

status_data = updated_data.get('status')
if phase is not None and status_data.get('phase') != phase:
return None
if status is not None and status_data.get('status') not in status.split(
','):
return None
if delete_timestamp:
del_timestamp(updated_data)
return updated_data


def _set_status(data):
def _set_status(data, delete_timestamp: bool = False):
"""
This sets the status field of the overall status.
"""
if "status" not in data:
data["status"] = {"phase": "", "status_override": ""}
data['status']['status'] = _calculate_status(data)
if delete_timestamp:
del_timestamp(data)
return data


Expand All @@ -107,11 +149,13 @@ def _calculate_status(data):
return override

phase = status_data.get('phase', '')
last_action = data.get('last_action', {}).get('action', '')
last_action_dict = data.get('last_action', {})
last_action = last_action_dict.get('action', '')
component = data.get('id', '')
now = get_current_timestamp()
if phase == Phase.powering_on:
if last_action == Action.power_on and not data.get('last_action', {}).get('failed', False):
if last_action == Action.power_on and not last_action_dict.get(
'failed', False):
LOGGER.debug(f"{now} Component: {component} Phase: {phase} Status: {Status.power_on_called}")
return Status.power_on_called
else:
Expand All @@ -135,21 +179,6 @@ def _calculate_status(data):
return Status.stable


def _matches_filter(data, enabled, session, staged_session, phase, status):
if enabled is not None and data.get('enabled', None) != enabled:
return False
if session is not None and data.get('session', None) != session:
return False
if staged_session is not None and data.get('staged_state', {}).get('session', None) != staged_session:
return False
status_data = data.get('status')
if phase is not None and status_data.get('phase') != phase:
return False
if status is not None and status_data.get('status') not in status.split(','):
return False
return True


@dbutils.redis_error_handler
def put_v2_components():
"""Used by the PUT /components API operation"""
Expand Down
26 changes: 25 additions & 1 deletion src/bos/server/redis_db_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@
# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE.
#
import connexion
from itertools import batched
import json
import logging
from typing import Callable

import connexion
import redis

from bos.common.utils import exc_type_msg
Expand Down Expand Up @@ -91,6 +94,27 @@ def get_all(self):
data.append(single_data)
return data

def get_all_filtered(self,
filter_func: Callable[[dict], dict | None]) -> list[dict]:
"""
Get an array of data for all keys after passing them through the specified filter
(discarding any for which the filter returns None)
"""
data = []
for value in self.iter_values():
filtered_value = filter_func(value)
if filtered_value is not None:
data.append(filtered_value)
return data

def iter_values(self):
"""
Iterate through every item in the database. Parse each item as JSON and yield it.
"""
for next_keys in batched(self.client.scan_iter(), 500):
for datastr in self.client.mget(next_keys):
yield json.loads(datastr) if datastr else None

def put(self, key, new_data):
"""Put data in to the database, replacing any old data."""
datastr = json.dumps(new_data)
Expand Down
Loading