From 1f027d2a8ac71e563b58a51f37dd659ed5208300 Mon Sep 17 00:00:00 2001 From: "arjun.sridhar12345" Date: Mon, 26 Aug 2024 09:33:54 -0700 Subject: [PATCH] fix run params for running capsule and writing to queue --- src/npc_lims/jobs/queue.py | 29 ++++++++++-------------- src/npc_lims/metadata/codeocean_utils.py | 10 +++++++- 2 files changed, 21 insertions(+), 18 deletions(-) diff --git a/src/npc_lims/jobs/queue.py b/src/npc_lims/jobs/queue.py index 06e8127..5fe508a 100644 --- a/src/npc_lims/jobs/queue.py +++ b/src/npc_lims/jobs/queue.py @@ -11,7 +11,8 @@ Computation, ComputationState, ) -from codeocean.data_asset import ComputationSource, DataAssetParams, Source +from codeocean.computation import DataAssetsRunParam +from codeocean.data_asset import DataAssetState from typing_extensions import TypeAlias import npc_lims @@ -23,9 +24,6 @@ SessionID: TypeAlias = Union[str, npc_session.SessionRecord] JobID: TypeAlias = str -INITIAL_VALUE = "Added to Queue" -INITIAL_INT_VALUE = -1 - VIDEO_MODELS = ("dlc_eye", "dlc_side", "dlc_face", "facemap", "LPFaceParts") @@ -127,10 +125,12 @@ def create_data_asset(session_id: SessionID, job_id: str, process_name: str) -> logger.info(f"Failed to create data asset for {session_id}") return - while not asset_exists(session_id, process_name): + while asset.state not in [DataAssetState.Ready, DataAssetState.Failed]: time.sleep(10) + asset = codeocean_utils.get_codeocean_client().data_assets.get_data_asset(asset.id) + logger.info(f"Created data asset for {session_id}") - npc_lims.set_asset_viewable_for_everyone(asset.id) + codeocean_utils.set_asset_viewable_for_everyone(asset.id) def asset_exists(session_id: SessionID, process_name: str) -> bool: @@ -146,7 +146,7 @@ def asset_exists(session_id: SessionID, process_name: str) -> bool: def create_all_data_assets(process_name: str, overwrite_existing_assets: bool) -> None: sync_json(process_name) - for session_id in read_json(process_name): + for session_id in dict(reversed(list(read_json(process_name).items()))): job_status = get_current_job_status(session_id, process_name) if job_status is None: continue @@ -211,14 +211,9 @@ def start( ) -> None: session_data_asset = npc_lims.get_session_raw_data_asset(session_id) data_assets = [ - DataAssetParams( - name=session_data_asset.name, - source=Source( - computation=ComputationSource( - id=session_data_asset.id, - ) - ), - mount=session_data_asset.mount, + DataAssetsRunParam( + id=session_data_asset.id, + mount=session_data_asset.mount ), ] computation = npc_lims.run_capsule_or_pipeline( @@ -249,7 +244,7 @@ def process_capsule_or_pipeline_queue( capsule_pipeline_info = codeocean_utils.CapsulePipelineInfo( capsule_or_pipeline_id, process_name, is_pipeline ) - + add_sessions_to_queue( capsule_pipeline_info.process_name, overwrite_exisitng_assets=overwrite_existing_assets, @@ -279,7 +274,7 @@ def process_capsule_or_pipeline_queue( while sync_and_get_num_running_jobs(capsule_pipeline_info.process_name) > 0: time.sleep(600) - + if create_data_assets_from_results: create_all_data_assets( capsule_pipeline_info.process_name, overwrite_existing_assets diff --git a/src/npc_lims/metadata/codeocean_utils.py b/src/npc_lims/metadata/codeocean_utils.py index e27dfb8..7e89358 100644 --- a/src/npc_lims/metadata/codeocean_utils.py +++ b/src/npc_lims/metadata/codeocean_utils.py @@ -762,7 +762,15 @@ def add_to_computation_queue( current = read_computation_queue(source) is_new = session_id not in current - current.update({session_id: serialize_computation(computation)}) + for session_queue_id in current: + if not is_new and session_queue_id == session_id: + current[session_queue_id] = serialize_computation(computation) + else: + current[session_queue_id] = serialize_computation(current[session_queue_id]) + + if is_new: + current.update({session_id: serialize_computation(computation)}) + source.write_text(json.dumps(current, indent=4)) logger.info( f"{'Added' if is_new else 'Updated'} {session_id} {'to' if is_new else 'in'} json"