diff --git a/taipy/core/notification/core_event_consumer.py b/taipy/core/notification/core_event_consumer.py index 83a5580d55..7417bfbdd4 100644 --- a/taipy/core/notification/core_event_consumer.py +++ b/taipy/core/notification/core_event_consumer.py @@ -23,6 +23,8 @@ class CoreEventConsumerBase(threading.Thread): It should be subclassed, and the `process_event` method should be implemented to define the custom logic for handling incoming events. + Subclasses should implement the `process_event` method to define their specific event handling behavior. + Example usage: ```python @@ -32,13 +34,24 @@ def process_event(self, event: Event): print(f"Received event created at : {event.creation_date}") pass - consumer = MyEventConsumer("consumer_1", event_queue) + registration_id, registered_queue = Notifier.register( + entity_type=EventEntityType.SCENARIO, + operation=EventOperation.CREATION + ) + + consumer = MyEventConsumer(registration_id, registered_queue) consumer.start() # ... consumer.stop() + + Notifier.unregister(registration_id) ``` - Subclasses should implement the `process_event` method to define their specific event handling behavior. + Firstly, we would create a consumer class extending from CoreEventConsumerBase + and decide how to process the incoming events by defining the process_event. + Then, we would specify the type of event we want to receive by registering with the Notifier. + After that, we create an object of the consumer class by providing + the registration_id and registered_queue and start consuming the event. Attributes: queue (SimpleQueue): The queue from which events will be consumed. diff --git a/taipy/core/notification/notifier.py b/taipy/core/notification/notifier.py index 11181d45b6..6e60383433 100644 --- a/taipy/core/notification/notifier.py +++ b/taipy/core/notification/notifier.py @@ -25,7 +25,7 @@ def _publish_event( attribute_name: Optional[str] = None, attribute_value: Optional[Any] = None, **kwargs, -): +) -> None: """Internal helper function to send events. It basically creates an event corresponding to the given arguments @@ -65,8 +65,24 @@ def register( ) -> Tuple[str, SimpleQueue]: """Register a listener for a specific event topic. - The topic is defined by the combination of the entity type, the entity id, - the operation and the attribute name. + The topic is defined by the combination of an optional entity type, an optional + entity id, an optional operation, and an optional attribute name. The purpose is + to be as flexible as possible. For example, we can register to: + + - All scenario creations + - A specific data node update + - A sequence submission + - A Scenario deletion + - Job failures + + Example usage: + + ```python + registration_id, registered_queue = Notifier.register( + entity_type=EventEntityType.SCENARIO, + operation=EventOperation.CREATION + ) + ``` Parameters: entity_type (Optional[EventEntityType^]): If provided, the listener will @@ -116,9 +132,21 @@ def register( return registration.registration_id, registration.queue @classmethod - def unregister(cls, registration_id: str): + def unregister(cls, registration_id: str) -> None: """Unregister a listener. + Example usage: + + ```python + registration_id, registered_queue = Notifier.register( + entity_type=EventEntityType.CYCLE, + entity_id="CYCLE_cycle_1", + operation=EventOperation.CREATION + ) + + Notifier.unregister(registration_id) + ``` + Parameters: registration_id (RegistrationId^): The registration id returned by the `register` method. """ @@ -137,7 +165,7 @@ def unregister(cls, registration_id: str): del cls._topics_registrations_list[to_remove_registration.topic] @classmethod - def publish(cls, event): + def publish(cls, event) -> None: """Publish a `Core^` service event to all registered listeners whose topic matches the event. Parameters: