Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Processor service: Event processing issues fix attempt 3 #134

Merged
merged 9 commits into from
Sep 13, 2024
179 changes: 122 additions & 57 deletions client/ayon_ftrack/common/event_handlers/ftrack_action_handler.py

Large diffs are not rendered by default.

215 changes: 138 additions & 77 deletions client/ayon_ftrack/common/event_handlers/ftrack_base_handler.py

Large diffs are not rendered by default.

41 changes: 25 additions & 16 deletions client/ayon_ftrack/common/event_handlers/ftrack_event_handler.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
from typing import Optional

import ftrack_api

from .ftrack_base_handler import BaseHandler


Expand All @@ -9,32 +13,34 @@ class BaseEventHandler(BaseHandler):
By default is listening to "ftrack.update". To change it override
'register' method of change 'subscription_topic' attribute.
"""
__ignore_handler_class: bool = True

subscription_topic = "ftrack.update"
handler_type = "Event"
subscription_topic: str = "ftrack.update"
handler_type: str = "Event"

def register(self):
"""Register to subscription topic."""

self.session.event_hub.subscribe(
"topic={}".format(self.subscription_topic),
self._process,
priority=self.priority
)

def process(self, event):
def process(self, event: ftrack_api.event.base.Event):
"""Callback triggered on event with matching topic.

Args:
session (ftrack_api.Session): Ftrack session which triggered
the event.
event (ftrack_api.Event): Ftrack event to process.
"""

"""
return self.launch(self.session, event)


def launch(self, session, event):
def launch(
self,
session: ftrack_api.Session,
event: ftrack_api.event.base.Event
):
"""Deprecated method used for backwards compatibility.

Override 'process' method rather then 'launch'. Method name 'launch'
Expand All @@ -45,36 +51,39 @@ def launch(self, session, event):
session (ftrack_api.Session): Ftrack session which triggered
the event.
event (ftrack_api.Event): Ftrack event to process.
"""

"""
raise NotImplementedError()

def _process(self, event):
def _process(self, event: ftrack_api.event.base.Event):
return self._launch(event)

def _launch(self, event):
def _launch(self, event: ftrack_api.event.base.Event):
"""Callback kept for backwards compatibility.

Will be removed when default
"""

self.session.rollback()
self.session._local_cache.clear()

try:
self.process(event)

except Exception as exc:
self.session.rollback()
self.session._configure_locations()
self.log.error(
"Event \"{}\" Failed: {}".format(
self.__class__.__name__, str(exc)
),
exc_info=True
)
self.session.rollback()
self.session._configure_locations()

def _translate_event(self, event, session=None):
def _translate_event(
self,
event: ftrack_api.event.base.Event,
session: Optional[ftrack_api.Session] = None
):
"""Receive entity objects based on event.

Args:
Expand All @@ -83,8 +92,8 @@ def _translate_event(self, event, session=None):

Returns:
List[ftrack_api.Entity]: Queried entities based on event data.
"""

"""
return self._get_entities(
event,
session,
Expand Down
230 changes: 151 additions & 79 deletions client/ayon_ftrack/common/ftrack_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,32 @@
import logging
import traceback
import types
import inspect

import ftrack_api

from .python_module_tools import modules_from_path
from .event_handlers import BaseHandler


class FtrackServer:
"""Helper wrapper to run ftrack server with event handlers.

Handlers are discovered based on a list of paths. Each path is scanned for
python files which are imported as modules. Each module is checked for
'register' function or classes inheriting from 'BaseHandler'. If class
inheriting from 'BaseHandler' is found it is instantiated and 'register'
method is called. If 'register' function is found it is called with
ftrack session as argument and 'BaseHandler' from the file are ignored.

Function 'register' tells discovery system to skip looking for classes.

Classes that start with '_' are ignored. It is possible to define
attribute `__ignore_handler_class = True` on class definition to mark
a "base class" that will be ignored on discovery, so you can safely import
custom base classes in the files.
"""
def __init__(self, handler_paths=None):
"""
- 'type' is by default set to 'action' - Runs Action server
- enter 'event' for Event server

EXAMPLE FOR EVENT SERVER:
...
server = FtrackServer()
server.run_server()
..
"""

# set Ftrack logging to Warning only - OPTIONAL
ftrack_log = logging.getLogger("ftrack_api")
ftrack_log.setLevel(logging.WARNING)
Expand All @@ -31,68 +38,42 @@ def __init__(self, handler_paths=None):
self._stopped = True
self._is_running = False

self.handler_paths = handler_paths or []
if handler_paths is None:
handler_paths = []

def stop_session(self):
self._stopped = True
if self.session.event_hub.connected is True:
self.session.event_hub.disconnect()
self.session.close()
self.session = None
self._handler_paths = handler_paths

def set_files(self, paths):
# Iterate all paths
register_functions = []
for path in paths:
# Try to format path with environments
try:
path = path.format(**os.environ)
except BaseException:
pass

# Get all modules with functions
modules, crashed = modules_from_path(path)
for filepath, exc_info in crashed:
self.log.warning("Filepath load crashed {}.\n{}".format(
filepath, "".join(traceback.format_exception(*exc_info))
))
self._session = None
self._cached_modules = []
self._cached_objects = []

for filepath, module in modules:
register_function = None
for name, attr in module.__dict__.items():
if (
name == "register"
and isinstance(attr, types.FunctionType)
):
register_function = attr
break
def stop_session(self):
session = self._session
self._session = None
self._stopped = True
if session.event_hub.connected is True:
session.event_hub.disconnect()
session.close()

if not register_function:
self.log.warning(
"\"{}\" - Missing register method".format(filepath)
)
continue
def get_session(self):
return self._session

register_functions.append(
(filepath, register_function)
)
def get_handler_paths(self):
return self._handler_paths

if not register_functions:
self.log.warning((
"There are no events with `register` function"
" in registered paths: \"{}\""
).format("| ".join(paths)))
def set_handler_paths(self, paths):
if self._is_running:
raise ValueError(
"Cannot change handler paths when server is running."
)
self._handler_paths = paths

for filepath, register_func in register_functions:
try:
register_func(self.session)
except Exception:
self.log.warning(
"\"{}\" - register was not successful".format(filepath),
exc_info=True
)
session = property(get_session)
handler_paths = property(get_handler_paths, set_handler_paths)

def run_server(self, session=None, load_files=True):
def run_server(self, session=None):
if self._is_running:
raise ValueError("Server is already running.")
self._stopped = False
self._is_running = True
if not session:
Expand All @@ -115,24 +96,115 @@ def run_server(self, session=None, load_files=True):
self.log.info("Connecting event hub")
session.event_hub.connect()

self.session = session
if load_files:
if not self.handler_paths:
self.log.warning((
"Paths to event handlers are not set."
" Ftrack server won't launch."
))
self._is_running = False
return
self._session = session
if not self._handler_paths:
self.log.warning((
"Paths to event handlers are not set."
" Ftrack server won't launch."
))
self._is_running = False
return

self.set_files(self.handler_paths)
self._load_handlers()

msg = "Registration of event handlers has finished!"
self.log.info(len(msg) * "*")
self.log.info(msg)
msg = "Registration of event handlers has finished!"
self.log.info(len(msg) * "*")
self.log.info(msg)

# keep event_hub on session running
try:
self.session.event_hub.wait()
session.event_hub.wait()
finally:
self._is_running = False
self._cached_modules = []

def _load_handlers(self):
register_functions = []
handler_classes = []

# Iterate all paths
paths = self._handler_paths
for path in paths:
# Try to format path with environments
try:
path = path.format(**os.environ)
except BaseException:
pass

# Get all modules with functions
modules, crashed = modules_from_path(path)
for filepath, exc_info in crashed:
self.log.warning("Filepath load crashed {}.\n{}".format(
filepath, "".join(traceback.format_exception(*exc_info))
))

for filepath, module in modules:
self._cached_modules.append(module)
register_function = getattr(module, "register", None)
if register_function is not None:
if isinstance(register_function, types.FunctionType):
register_functions.append(
(filepath, register_function)
)
else:
self.log.warning(
f"\"{filepath}\""
" - Found 'register' but it is not a function."
)
continue

for attr_name in dir(module):
if attr_name.startswith("_"):
self.log.debug(
f"Skipping private class '{attr_name}'"
)
continue

attr = getattr(module, attr_name, None)
if (
not inspect.isclass(attr)
or not issubclass(attr, BaseHandler)
or attr.ignore_handler_class()
):
continue

if inspect.isabstract(attr):
self.log.warning(
f"Skipping abstract class '{attr_name}'."
)
continue
handler_classes.append(attr)

if not handler_classes:
self.log.warning(
f"\"{filepath}\""
" - No 'register' function"
" or 'BaseHandler' classes found."
)

if not register_functions and not handler_classes:
self.log.warning((
"There are no files with `register` function or 'BaseHandler'"
" classes in registered paths:\n- \"{}\""
).format("- \n".join(paths)))

for filepath, register_func in register_functions:
try:
register_func(self._session)
except Exception:
self.log.warning(
f"\"{filepath}\" - register was not successful",
exc_info=True
)

for handler_class in handler_classes:
try:
obj = handler_class(self._session)
obj.register()
self._cached_objects.append(obj)

except Exception:
self.log.warning(
f"\"{handler_class}\" - register was not successful",
exc_info=True
)
Original file line number Diff line number Diff line change
Expand Up @@ -559,9 +559,3 @@ def path_from_represenation(self, representation, anatomy):
return (None, None)

return (os.path.normpath(path), sequence_path)


def register(session):
'''Register plugin. Called when used as an plugin.'''

DeleteOldVersions(session).register()
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,3 @@ def _get_tasks_for_selection(
report[NOT_SYNCHRONIZED_TITLE].append(task_path)

return output


def register(session):
FillWorkfileAttributeAction(session).register()
Loading