Skip to content

Commit

Permalink
fix run params for running capsule and writing to queue
Browse files Browse the repository at this point in the history
  • Loading branch information
arjunsridhar12345 committed Aug 26, 2024
1 parent 6c92827 commit 1f027d2
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 18 deletions.
29 changes: 12 additions & 17 deletions src/npc_lims/jobs/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
Computation,
ComputationState,
)
from codeocean.data_asset import ComputationSource, DataAssetParams, Source
from codeocean.computation import DataAssetsRunParam
from codeocean.data_asset import DataAssetState
from typing_extensions import TypeAlias

import npc_lims
Expand All @@ -23,9 +24,6 @@
SessionID: TypeAlias = Union[str, npc_session.SessionRecord]
JobID: TypeAlias = str

INITIAL_VALUE = "Added to Queue"
INITIAL_INT_VALUE = -1

VIDEO_MODELS = ("dlc_eye", "dlc_side", "dlc_face", "facemap", "LPFaceParts")


Expand Down Expand Up @@ -127,10 +125,12 @@ def create_data_asset(session_id: SessionID, job_id: str, process_name: str) ->
logger.info(f"Failed to create data asset for {session_id}")
return

while not asset_exists(session_id, process_name):
while asset.state not in [DataAssetState.Ready, DataAssetState.Failed]:
time.sleep(10)
asset = codeocean_utils.get_codeocean_client().data_assets.get_data_asset(asset.id)

logger.info(f"Created data asset for {session_id}")
npc_lims.set_asset_viewable_for_everyone(asset.id)
codeocean_utils.set_asset_viewable_for_everyone(asset.id)


def asset_exists(session_id: SessionID, process_name: str) -> bool:
Expand All @@ -146,7 +146,7 @@ def asset_exists(session_id: SessionID, process_name: str) -> bool:
def create_all_data_assets(process_name: str, overwrite_existing_assets: bool) -> None:
sync_json(process_name)

for session_id in read_json(process_name):
for session_id in dict(reversed(list(read_json(process_name).items()))):
job_status = get_current_job_status(session_id, process_name)
if job_status is None:
continue
Expand Down Expand Up @@ -211,14 +211,9 @@ def start(
) -> None:
session_data_asset = npc_lims.get_session_raw_data_asset(session_id)
data_assets = [
DataAssetParams(
name=session_data_asset.name,
source=Source(
computation=ComputationSource(
id=session_data_asset.id,
)
),
mount=session_data_asset.mount,
DataAssetsRunParam(
id=session_data_asset.id,
mount=session_data_asset.mount
),
]
computation = npc_lims.run_capsule_or_pipeline(
Expand Down Expand Up @@ -249,7 +244,7 @@ def process_capsule_or_pipeline_queue(
capsule_pipeline_info = codeocean_utils.CapsulePipelineInfo(
capsule_or_pipeline_id, process_name, is_pipeline
)

add_sessions_to_queue(
capsule_pipeline_info.process_name,
overwrite_exisitng_assets=overwrite_existing_assets,
Expand Down Expand Up @@ -279,7 +274,7 @@ def process_capsule_or_pipeline_queue(

while sync_and_get_num_running_jobs(capsule_pipeline_info.process_name) > 0:
time.sleep(600)

if create_data_assets_from_results:
create_all_data_assets(
capsule_pipeline_info.process_name, overwrite_existing_assets
Expand Down
10 changes: 9 additions & 1 deletion src/npc_lims/metadata/codeocean_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,15 @@ def add_to_computation_queue(
current = read_computation_queue(source)

is_new = session_id not in current
current.update({session_id: serialize_computation(computation)})
for session_queue_id in current:
if not is_new and session_queue_id == session_id:
current[session_queue_id] = serialize_computation(computation)
else:
current[session_queue_id] = serialize_computation(current[session_queue_id])

if is_new:
current.update({session_id: serialize_computation(computation)})

source.write_text(json.dumps(current, indent=4))
logger.info(
f"{'Added' if is_new else 'Updated'} {session_id} {'to' if is_new else 'in'} json"
Expand Down

0 comments on commit 1f027d2

Please sign in to comment.