Skip to content

Commit

Permalink
Merge branch 'develop' into feature/133-ftrack-2ways-sync
Browse files Browse the repository at this point in the history
  • Loading branch information
iLLiCiTiT authored Dec 5, 2024
2 parents 32d6cd7 + 54616d7 commit e598652
Show file tree
Hide file tree
Showing 2 changed files with 266 additions and 4 deletions.
229 changes: 229 additions & 0 deletions client/ayon_ftrack/event_handlers_user/action_download_reviews.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
import os
import urllib
import json

from platformdirs import user_downloads_dir

from ayon_ftrack.common import (
LocalAction,
create_chunks,
)


class DownloadReviewMedia(LocalAction):
label = "Download Review Media"
identifier = "download.reviewables"
description = "Download review media from selected versions"

review_names = {"ftrackreview-mp4", "ftrackreview-image"}

def discover(self, _session, entities, _event):
# Only show this for Versions when a selection has been made
if not entities:
return False

for entity in entities:
if entity.entity_type.lower() in {
"assetversion", "reviewsession"
}:
return True
return False

def launch(self, session, entities, event):
asset_versions = self._extract_asset_versions(session, entities)
if not asset_versions:
return {
"message": "No review media to download in your selection...",
"success": False,
}

user_id = event["source"].get("user", {}).get("id")
if user_id:
self.show_message(
event, "Preparing information to download...", True
)

asset_ids = {
version["asset_id"]
for version in asset_versions
}
joined_asset_ids = self.join_filter_values(asset_ids)
assets_by_id = {
asset["id"]: asset
for asset in session.query(
f"select id, name from Asset"
f" where id in ({joined_asset_ids})"
).all()
}

asset_versions_by_id = {
version["id"]: version
for version in asset_versions
}
src_component_ids = set()
joined_review_names = self.join_filter_values(self.review_names)
components = []
for chunk_ids in create_chunks(asset_versions_by_id.keys()):
joined_version_ids = self.join_filter_values(chunk_ids)
for component in session.query(
"select id, name, version_id, metadata from Component"
f" where name in ({joined_review_names})"
f" and version_id in ({joined_version_ids})"
).all():
source_component_id = component["metadata"].get(
"source_component_id"
)
if not source_component_id:
components.append(component)
continue
src_component_ids.add(source_component_id)

if src_component_ids:
joined_comp_ids = self.join_filter_values(src_component_ids)
components.extend(
session.query(
"select id, name, version_id from Component"
f" where id in ({joined_comp_ids})"
).all()
)

if not components:
return {
"message": (
"Selected entities don't have available"
" review media to download..."
),
"success": False,
}

job = None
if user_id:
job = session.create("Job", {
"user_id": user_id,
"status": "running",
"data": json.dumps({
"description": "Download review media"
})
})
session.commit()

success = False
try:
self._download_components(
session,
components,
asset_versions_by_id,
assets_by_id,
job,
)
success = True
return {
"message": "Review media downloaded successfully...",
"success": True,
}

except Exception:
return {
"message": "Failed to download review media...",
"success": False
}

finally:
session.recorded_operations.clear()
if job["status"] == "running":
job["status"] = "done" if success else "failed"
session.commit()

def _download_components(
self,
session,
components,
asset_versions_by_id,
assets_by_id,
job,
):
download_dir = user_downloads_dir()
total_count = len(components)
for idx, component in enumerate(components):
job["data"] = json.dumps({
"description": f"Download review media {idx}/{total_count}"
})
session.commit()

url_item = component["component_locations"][0].get("url")
if url_item is None:
continue

download_url = url_item["value"]

ext = component["file_type"].lstrip(".")

basename = component["name"]
if component["name"] in self.review_names:
asset_version_id = component["version_id"]
asset_version = asset_versions_by_id[asset_version_id]
asset_id = asset_version["asset_id"]
asset = assets_by_id[asset_id]

version = asset_version["version"]
asset_name = asset["name"]

basename = f"{asset_name}_{version:0>3}"

# Calculate the full download path and URL to pull from
download_path = os.path.join(
download_dir, f"{basename}.{ext}"
)
if os.path.exists(download_path):
idx = 1
while True:
filename = f"{basename} ({idx}).{ext}"
_download_path = os.path.join(download_dir, filename)
if not os.path.exists(_download_path):
download_path = _download_path
break
idx += 1

urllib.request.urlretrieve(download_url, download_path)

job["data"] = json.dumps({
"description": "Download review media finished"
})
job["status"] = "done"
session.commit()

def _extract_asset_versions(self, session, entities):
asset_version_ids = set()
review_session_ids = set()
for entity in entities:
entity_type_low = entity.entity_type.lower()
if entity_type_low == "assetversion":
asset_version_ids.add(entity["id"])
elif entity_type_low == "reviewsession":
review_session_ids.add(entity["id"])

for version_id in self._get_asset_version_ids_from_review_sessions(
session, review_session_ids
):
asset_version_ids.add(version_id)

asset_versions = session.query((
"select id, version, asset_id from AssetVersion where id in ({})"
).format(self.join_query_keys(asset_version_ids))).all()

return asset_versions

def _get_asset_version_ids_from_review_sessions(
self, session, review_session_ids
):
if not review_session_ids:
return set()
review_session_objects = session.query((
"select version_id from ReviewSessionObject"
" where review_session_id in ({})"
).format(self.join_query_keys(review_session_ids))).all()

return {
review_session_object["version_id"]
for review_session_object in review_session_objects
}
41 changes: 37 additions & 4 deletions services/processor/processor/ftrack_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import collections

import appdirs
import arrow
import requests
import ftrack_api
import ftrack_api.session
Expand All @@ -19,12 +20,16 @@
from weakref import WeakMethod

from ayon_api import (
get_service_addon_name,
get_service_name,
enroll_event_job,
get_event,
get_events,
update_event,
)

# 10 minutes
EVENT_PROCESS_TIMEOUT = 60 * 10


class ProcessEventHub(ftrack_api.event.hub.EventHub):
_server_con = None
Expand All @@ -35,7 +40,7 @@ def get_next_ftrack_event(self):
return enroll_event_job(
source_topic="ftrack.leech",
target_topic="ftrack.proc",
sender=get_service_addon_name(),
sender=get_service_name(),
description="Event processing",
sequential=True,
)
Expand All @@ -50,7 +55,7 @@ def finish_job(self, job):

update_event(
event_id,
sender=get_service_addon_name(),
sender=get_service_name(),
status="finished",
description=description,
)
Expand All @@ -72,6 +77,7 @@ def wait(self, duration=None):
"""

started = time.time()
last_loaded_job = 0
while True:
job = None
empty_queue = False
Expand All @@ -91,7 +97,12 @@ def wait(self, duration=None):
if not self.connected:
break

if not self.load_event_from_jobs():
if self.load_event_from_jobs():
last_loaded_job = time.time()
elif time.time() - last_loaded_job > 5 * 60:
if not self._check_stuck_events():
time.sleep(5)
else:
time.sleep(0.1)
continue

Expand All @@ -116,6 +127,28 @@ def _handle_packet(self, code, packet_identifier, path, data):

return super()._handle_packet(code, packet_identifier, path, data)

def _check_stuck_events(self) -> bool:
"""Check if there are stuck events and mark them as failed"""
now = arrow.utcnow()
changed_status = False
for event in get_events(
topics={"ftrack.proc"},
statuses={"pending"},
):
created_at = arrow.get(event["createdAt"]).to("local")
delta = now - created_at
if delta.seconds > EVENT_PROCESS_TIMEOUT:
event_id = event["id"]
print(f"Failing stuck event '{event_id}'")
changed_status = True
update_event(
event_id,
sender=get_service_name(),
status="failed",
description="Stuck event",
)
return changed_status


class CustomEventHubSession(ftrack_api.session.Session):
"""An isolated session for interaction with an ftrack server."""
Expand Down

0 comments on commit e598652

Please sign in to comment.