Skip to content

Commit

Permalink
Add some comments and clean up slightly #417
Browse files Browse the repository at this point in the history
  • Loading branch information
joelvdavies committed Dec 5, 2024
1 parent 6de588c commit 996e7a8
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 96 deletions.
3 changes: 2 additions & 1 deletion inventory_management_system_api/core/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ def start_session_transaction(action_description: str) -> Generator[ClientSessio
Also handles write conflicts.
:param action_description: Description of what the transaction is doing so it can be used in any raised errors.
:param action_description: Description of what the contents of the transaction is doing so it can be used in any
raised errors.
:raises WriteConflictError: If there a write conflict during the transaction.
"""

Expand Down
37 changes: 23 additions & 14 deletions inventory_management_system_api/repositories/catalogue_item.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,6 @@ def has_child_elements(self, catalogue_item_id: CustomObjectId, session: ClientS
return item is not None

def list_ids(self, catalogue_category_id: Optional[str] = None, session: ClientSession = None) -> List[ObjectId]:
# TODO: Update description and tests
# TODO: Make sure where it is used elsewhere specifies catalogue_category_id =
"""
Retrieve a list of all catalogue item ids with a specific catalogue_category_id from a MongoDB
database. Performs a projection to only include _id. (Required for mass updates of properties
Expand All @@ -150,20 +148,23 @@ def list_ids(self, catalogue_category_id: Optional[str] = None, session: ClientS
:return: A list object catalogue item ObjectId's or an empty list if no catalogue items are returned by
the database.
"""
logger.info(
"Finding the id's of all catalogue items within the catalogue category with ID '%s' in the database",
# TODO: Update this when catalogue_category_id is None?
catalogue_category_id,
)

query = {}
if catalogue_category_id:
catalogue_category_id = CustomObjectId(catalogue_category_id)
query["catalogue_category_id"] = catalogue_category_id

message = "Retrieving IDs of all catalogue items from the database"
if not query:
logger.info(message)
else:
logger.info("%s matching the provided catalogue category ID filter", message)
logger.debug("Provided catalogue category ID filter: %s", catalogue_category_id)

# Using distinct has a 16MB limit
# https://stackoverflow.com/questions/29771192/how-do-i-get-a-list-of-just-the-objectids-using-pymongo
# For 100000 documents, using list comprehension takes about 0.85 seconds vs 0.50 seconds for distinct
return self._catalogue_items_collection.find(
{"catalogue_category_id": CustomObjectId(catalogue_category_id)} if catalogue_category_id else {},
{"_id": 1},
session=session,
).distinct("_id")
return self._catalogue_items_collection.find(query, {"_id": 1}, session=session).distinct("_id")

def insert_property_to_all_matching(
self, catalogue_category_id: str, property_in: PropertyIn, session: ClientSession = None
Expand Down Expand Up @@ -222,9 +223,17 @@ def update_number_of_spares(
catalogue_item_id: ObjectId,
number_of_spares: Optional[int],
session: Optional[ClientSession] = None,
):
) -> None:
"""
Updates the `number_of_spares` field for a catalogue item.
self._catalogue_items_collection.update_many(
:param catalogue_item_id: The ID of the catalogue item to update.
:param number_of_spares: New number of spares to update to.
:param session: PyMongo ClientSession to use for database operations
"""
logger.info("Updating the number of spares of the catalogue item with ID %s", catalogue_item_id)

self._catalogue_items_collection.update_one(
{"_id": catalogue_item_id},
{"$set": {"number_of_spares": number_of_spares}},
session=session,
Expand Down
12 changes: 9 additions & 3 deletions inventory_management_system_api/repositories/item.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,15 @@ def count_with_usage_statuses_ids_in(
catalogue_item_id: ObjectId,
usage_status_ids: List[CustomObjectId],
session: Optional[ClientSession] = None,
):
# TODO: Comment/log
# TODO: Look at https://stackoverflow.com/questions/60237515/mongodb-returning-wrong-count
) -> int:
"""
Counts the number of items within a catalogue item with a `usage_status_id` contained within the given list.
:param catalogue_item_id: ID of the catalogue item for which items should be counted.
:param usage_status_id: List of usage status IDs which should be included in the count.
:param session: PyMongo ClientSession to use for database operations.
:return: Number of items counted.
"""

return self._items_collection.count_documents(
{"catalogue_item_id": catalogue_item_id, "usage_status_id": {"$in": usage_status_ids}}, session=session
Expand Down
124 changes: 60 additions & 64 deletions inventory_management_system_api/services/item.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
"""

import logging
from typing import Annotated, List, Optional
from contextlib import contextmanager
from typing import Annotated, Generator, List, Optional

from fastapi import Depends
from pymongo.client_session import ClientSession

from inventory_management_system_api.core.custom_object_id import CustomObjectId
from inventory_management_system_api.core.database import start_session_transaction
Expand Down Expand Up @@ -80,6 +82,7 @@ def create(self, item: ItemPostSchema) -> ItemOut:
catalogue_item = self._catalogue_item_repository.get(catalogue_item_id)
if not catalogue_item:
raise MissingRecordError(f"No catalogue item found with ID: {catalogue_item_id}")
catalogue_item_id = CustomObjectId(catalogue_item_id)

try:
catalogue_category_id = catalogue_item.catalogue_category_id
Expand All @@ -101,35 +104,13 @@ def create(self, item: ItemPostSchema) -> ItemOut:
defined_properties = catalogue_category.properties
properties = utils.process_properties(defined_properties, supplied_properties)

# Need to recalculate number of spares after adding these need to either succeed or fail together. Also need
# to be able to write lock the catalogue item document in the process to prevent further changes while
# recalculating
with start_session_transaction("adding item") as session:
# Write lock the catalogue item to prevent any further item updates for it until the transaction
# completes
# TODO: Change to just str in the prepare_for_number_of_spares_update? Same below
utils.prepare_for_number_of_spares_update(
CustomObjectId(catalogue_item_id), self._catalogue_item_repository, session
)

new_item = self._item_repository.create(
# Update number of spares when creating
with self._start_transaction_impacting_number_of_spares("creating item", catalogue_item_id) as session:
return self._item_repository.create(
ItemIn(**{**item.model_dump(), "properties": properties, "usage_status": usage_status.value}),
session=session,
)

# Obtain the current spares definition
spares_definition = self._setting_repository.get(SparesDefinitionOut, session=session)
if spares_definition:
utils.perform_number_of_spares_update(
CustomObjectId(catalogue_item_id),
utils.get_usage_status_ids(spares_definition),
self._catalogue_item_repository,
self._item_repository,
session=session,
)

return new_item

def get(self, item_id: str) -> Optional[ItemOut]:
"""
Retrieve an item by its ID
Expand All @@ -149,6 +130,7 @@ def list(self, system_id: Optional[str], catalogue_item_id: Optional[str]) -> Li
"""
return self._item_repository.list(system_id, catalogue_item_id)

# pylint:disable=too-many-locals
def update(self, item_id: str, item: ItemPatchSchema) -> ItemOut:
"""
Update an item by its ID.
Expand Down Expand Up @@ -190,7 +172,6 @@ def update(self, item_id: str, item: ItemPatchSchema) -> ItemOut:
# If catalogue item ID not supplied then it will be fetched, and its parent catalogue category.
# the defined (at a catalogue category level) and supplied properties will be used to find
# missing supplied properties. They will then be processed and validated.

if "properties" in update_data:
catalogue_item = self._catalogue_item_repository.get(stored_item.catalogue_item_id)

Expand All @@ -209,59 +190,34 @@ def update(self, item_id: str, item: ItemPatchSchema) -> ItemOut:

update_data["properties"] = utils.process_properties(defined_properties, supplied_properties)

# TODO: Comment and cleanup
if not updating_usage_status:
return self._item_repository.update(item_id, ItemIn(**{**stored_item.model_dump(), **update_data}))
else:
with start_session_transaction("updating item") as session:
utils.prepare_for_number_of_spares_update(catalogue_item_id, self._catalogue_item_repository, session)

new_item = self._item_repository.update(
# Spares only need updating if the usage status is being updated
if updating_usage_status:
with self._start_transaction_impacting_number_of_spares("updating item", catalogue_item_id) as session:
return self._item_repository.update(
item_id, ItemIn(**{**stored_item.model_dump(), **update_data}), session=session
)
else:
return self._item_repository.update(item_id, ItemIn(**{**stored_item.model_dump(), **update_data}))

# Obtain the current spares definition
spares_definition = self._setting_repository.get(SparesDefinitionOut, session=session)
if spares_definition:
utils.perform_number_of_spares_update(
catalogue_item_id,
utils.get_usage_status_ids(spares_definition),
self._catalogue_item_repository,
self._item_repository,
session=session,
)
return new_item
# pylint:enable=too-many-locals

def delete(self, item_id: str) -> None:
"""
Delete an item by its ID.
:param item_id: The ID of the item to delete.
:raises MissingRecordError: If the item doesn't exist.
"""
# TODO: Update comment and tests for raising error instead of repo delete
item = self.get(item_id)
if item is None:
raise MissingRecordError(f"No item found with ID: {str(item_id)}")

# TODO: Comment below
with start_session_transaction("deleting item") as session:
catalogue_item_id = CustomObjectId(item.catalogue_item_id)

utils.prepare_for_number_of_spares_update(catalogue_item_id, self._catalogue_item_repository, session)

# Update number of spares when deleting
with self._start_transaction_impacting_number_of_spares(
"deleting item", CustomObjectId(item.catalogue_item_id)
) as session:
self._item_repository.delete(item_id, session=session)

# Obtain the current spares definition
spares_definition = self._setting_repository.get(SparesDefinitionOut, session=session)
if spares_definition:
utils.perform_number_of_spares_update(
catalogue_item_id,
utils.get_usage_status_ids(spares_definition),
self._catalogue_item_repository,
self._item_repository,
session=session,
)

def _merge_missing_properties(
self, properties: List[PropertyOut], supplied_properties: List[PropertyPostSchema]
) -> List[PropertyPostSchema]:
Expand All @@ -287,3 +243,43 @@ def _merge_missing_properties(
else:
merged_properties.append(PropertyPostSchema(**prop.model_dump()))
return merged_properties

@contextmanager
def _start_transaction_impacting_number_of_spares(
self, action_description: str, catalogue_item_id: CustomObjectId
) -> Generator[ClientSession, None, None]:
"""
Handles recalculation of the `number_of_spares` of a catalogue item for updates that will impact it.
Starts a MongoDB session and transaction, then write locks the catalogue item before yielding to allow
an update to take place using the returned session. Once any tasks using the session context have finished
it will finish by recalculating the number of spares for the catalogue item before finishing the transaction.
This write lock should prevent similar actions from occurring during the update to prevent an incorrect
spares calculation e.g. if another item was added between counting documents and then updating the spares field
it would cause a miscount. It also ensures any action executed using the session will either fail or succeed
with the spares update.
:param action_description: Description of what the contents of the transaction is doing so it can be used in any
raised errors.
:param catalogue_item_id: ID of the effected catalogue item which will need its `number_of_spares`
recalculating
"""

with start_session_transaction(action_description) as session:
# Write lock the catalogue item to prevent any further item updates for it until the transaction completes
utils.prepare_for_number_of_spares_recalculation(
catalogue_item_id, self._catalogue_item_repository, session
)

yield session

# Update the number of spares
spares_definition = self._setting_repository.get(SparesDefinitionOut, session=session)
if spares_definition:
utils.perform_number_of_spares_recalculation(
catalogue_item_id,
utils.get_usage_status_ids_from_spares_definition(spares_definition),
self._catalogue_item_repository,
self._item_repository,
session=session,
)
11 changes: 6 additions & 5 deletions inventory_management_system_api/services/setting.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

from fastapi import Depends

from inventory_management_system_api.core.custom_object_id import CustomObjectId
from inventory_management_system_api.core.database import start_session_transaction
from inventory_management_system_api.core.exceptions import MissingRecordError
from inventory_management_system_api.models.setting import SparesDefinitionIn, SparesDefinitionOut
Expand Down Expand Up @@ -62,7 +61,7 @@ def update_spares_definition(self, spares_definition: SparesDefinitionPutSchema)

# Need all updates to the number of spares to succeed or fail together with assigning the new definition
# Also need to be able to write lock the catalogue item documents in the process to prevent further changes
# while recalculating.
# while recalculating
with start_session_transaction("updating spares definition") as session:
# Update spares definition first to ensure write locked to prevent further updates while calculating below
new_spares_definition = self._setting_repository.upsert(
Expand All @@ -74,12 +73,14 @@ def update_spares_definition(self, spares_definition: SparesDefinitionPutSchema)

# Usage status id that constitute a spare in the new definition (obtain it now to save processing
# repeatedly)
usage_status_ids = utils.get_usage_status_ids(new_spares_definition)
usage_status_ids = utils.get_usage_status_ids_from_spares_definition(new_spares_definition)

# Recalculate for each catalogue item
for catalogue_item_id in catalogue_item_ids:
utils.prepare_for_number_of_spares_update(catalogue_item_id, self._catalogue_item_repository, session)
utils.perform_number_of_spares_update(
utils.prepare_for_number_of_spares_recalculation(
catalogue_item_id, self._catalogue_item_repository, session
)
utils.perform_number_of_spares_recalculation(
catalogue_item_id, usage_status_ids, self._catalogue_item_repository, self._item_repository, session
)

Expand Down
38 changes: 32 additions & 6 deletions inventory_management_system_api/services/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,28 +250,54 @@ def _merge_non_mandatory_properties(
return properties


def get_usage_status_ids(spares_definition: SparesDefinitionOut) -> list[CustomObjectId]:
# TODO: Comment
def get_usage_status_ids_from_spares_definition(spares_definition: SparesDefinitionOut) -> list[CustomObjectId]:
"""
Returns a list of usage status ids from a spares definition model in the format required for
`perform_number_of_spares_update`.
:param spares_definition: Spares definition.
:return: List of usage status ids converted to CustomObjectId's.
"""

return [CustomObjectId(usage_status.id) for usage_status in spares_definition.usage_statuses]


def prepare_for_number_of_spares_update(
def prepare_for_number_of_spares_recalculation(
catalogue_item_id: ObjectId, catalogue_item_repository: CatalogueItemRepo, session: ClientSession
) -> None:
# TODO: Comment
"""
Prepares a catalogue item for a recalculation of the `number_of_spares` field.
Should be called prior to `perform_number_of_spares_recalculation` in order to ensure the catalogue item is write
locked to avoid other similar updates interfering.
:param catalogue_item_id: ID of the catalogue item that the spares will be recalculated for.
:param catalogue_item_repository: `CatalogueItemRepo` repository to use
:param session: Session to use for the recalculation. A transaction should already have been started.
"""

catalogue_item_repository.update_number_of_spares(catalogue_item_id, None, session=session)


def perform_number_of_spares_update(
def perform_number_of_spares_recalculation(
catalogue_item_id: ObjectId,
usage_status_ids: list[CustomObjectId],
catalogue_item_repository: CatalogueItemRepo,
item_repository: ItemRepo,
session: ClientSession,
) -> None:
# TODO: Comment - include must use prepare first
"""
Performs a recalculation of the `number_of_spares` field for a catalogue item.
Should after `perform_number_of_spares_recalculation` in order to ensure the catalogue item is write locked to avoid
other similar updates interfering.
:param catalogue_item_id: ID of the catalogue item that the spares will be recalculated for.
:param usage_status_ids: Usage status ids that should constitute a spare.
:param catalogue_item_repository: `CatalogueItemRepo` repository to use.
:param item_repository: `ItemRepo` repository to use.
:param session: Session to use for the recalculation. A transaction should already have been started.
"""

# Now calculate the new number of spares
new_number_of_spares = item_repository.count_with_usage_statuses_ids_in(
Expand Down
Loading

0 comments on commit 996e7a8

Please sign in to comment.