From b0493d5e04782743f16ac93ed2b08a344dd2f6e0 Mon Sep 17 00:00:00 2001 From: Toan Quach Date: Thu, 23 May 2024 10:54:45 +0700 Subject: [PATCH 1/3] added example for using Notifer register and CoreEventConsumerBase --- .../core/notification/core_event_consumer.py | 10 ++++++++- taipy/core/notification/notifier.py | 22 +++++++++++++++++++ 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/taipy/core/notification/core_event_consumer.py b/taipy/core/notification/core_event_consumer.py index 83a5580d55..17e6a9ba06 100644 --- a/taipy/core/notification/core_event_consumer.py +++ b/taipy/core/notification/core_event_consumer.py @@ -32,10 +32,18 @@ def process_event(self, event: Event): print(f"Received event created at : {event.creation_date}") pass - consumer = MyEventConsumer("consumer_1", event_queue) + registration_id, registered_queue = Notifier.unregister( + entity_type=EventEntityType.CYCLE, + entity_id="CYCLE_cycle_1", + operation=EventOperation.CREATION + ) + + consumer = MyEventConsumer("consumer_1", registered_queue) consumer.start() # ... consumer.stop() + + Notifier.unregister(registration_id) ``` Subclasses should implement the `process_event` method to define their specific event handling behavior. diff --git a/taipy/core/notification/notifier.py b/taipy/core/notification/notifier.py index 11181d45b6..d766acbbd9 100644 --- a/taipy/core/notification/notifier.py +++ b/taipy/core/notification/notifier.py @@ -68,6 +68,16 @@ def register( The topic is defined by the combination of the entity type, the entity id, the operation and the attribute name. + Example usage: + + ```python + registration_id, registered_queue = Notifier.register( + entity_type=EventEntityType.CYCLE, + entity_id="CYCLE_cycle_1", + operation=EventOperation.CREATION + ) + ``` + Parameters: entity_type (Optional[EventEntityType^]): If provided, the listener will be notified for all events related to this entity type. Otherwise, @@ -119,6 +129,18 @@ def register( def unregister(cls, registration_id: str): """Unregister a listener. + Example usage: + + ```python + registration_id, registered_queue = Notifier.unregister( + entity_type=EventEntityType.CYCLE, + entity_id="CYCLE_cycle_1", + operation=EventOperation.CREATION + ) + + Notifier.unregister(registration_id) + ``` + Parameters: registration_id (RegistrationId^): The registration id returned by the `register` method. """ From 4de7ae3aceab6891c420f720af6261f578400d1b Mon Sep 17 00:00:00 2001 From: Toan Quach Date: Fri, 31 May 2024 11:14:17 +0700 Subject: [PATCH 2/3] improve notifier examples --- .../core/notification/core_event_consumer.py | 15 +++++++++----- taipy/core/notification/notifier.py | 20 ++++++++++++------- 2 files changed, 23 insertions(+), 12 deletions(-) diff --git a/taipy/core/notification/core_event_consumer.py b/taipy/core/notification/core_event_consumer.py index 17e6a9ba06..7417bfbdd4 100644 --- a/taipy/core/notification/core_event_consumer.py +++ b/taipy/core/notification/core_event_consumer.py @@ -23,6 +23,8 @@ class CoreEventConsumerBase(threading.Thread): It should be subclassed, and the `process_event` method should be implemented to define the custom logic for handling incoming events. + Subclasses should implement the `process_event` method to define their specific event handling behavior. + Example usage: ```python @@ -32,13 +34,12 @@ def process_event(self, event: Event): print(f"Received event created at : {event.creation_date}") pass - registration_id, registered_queue = Notifier.unregister( - entity_type=EventEntityType.CYCLE, - entity_id="CYCLE_cycle_1", + registration_id, registered_queue = Notifier.register( + entity_type=EventEntityType.SCENARIO, operation=EventOperation.CREATION ) - consumer = MyEventConsumer("consumer_1", registered_queue) + consumer = MyEventConsumer(registration_id, registered_queue) consumer.start() # ... consumer.stop() @@ -46,7 +47,11 @@ def process_event(self, event: Event): Notifier.unregister(registration_id) ``` - Subclasses should implement the `process_event` method to define their specific event handling behavior. + Firstly, we would create a consumer class extending from CoreEventConsumerBase + and decide how to process the incoming events by defining the process_event. + Then, we would specify the type of event we want to receive by registering with the Notifier. + After that, we create an object of the consumer class by providing + the registration_id and registered_queue and start consuming the event. Attributes: queue (SimpleQueue): The queue from which events will be consumed. diff --git a/taipy/core/notification/notifier.py b/taipy/core/notification/notifier.py index d766acbbd9..d18c72e188 100644 --- a/taipy/core/notification/notifier.py +++ b/taipy/core/notification/notifier.py @@ -25,7 +25,7 @@ def _publish_event( attribute_name: Optional[str] = None, attribute_value: Optional[Any] = None, **kwargs, -): +) -> None: """Internal helper function to send events. It basically creates an event corresponding to the given arguments @@ -65,15 +65,21 @@ def register( ) -> Tuple[str, SimpleQueue]: """Register a listener for a specific event topic. - The topic is defined by the combination of the entity type, the entity id, - the operation and the attribute name. + The topic is defined by the combination of an optional entity type, an optional + entity id, an optional operation, and an optional attribute name. The purpose is + to be as flexible as possible. For example, we can register to: + + - All scenario creations + - A specific data node update + - A sequence submission + - A Scenario deletion + - Job failures Example usage: ```python registration_id, registered_queue = Notifier.register( - entity_type=EventEntityType.CYCLE, - entity_id="CYCLE_cycle_1", + entity_type=EventEntityType.SCENARIO, operation=EventOperation.CREATION ) ``` @@ -126,7 +132,7 @@ def register( return registration.registration_id, registration.queue @classmethod - def unregister(cls, registration_id: str): + def unregister(cls, registration_id: str) -> None: """Unregister a listener. Example usage: @@ -159,7 +165,7 @@ def unregister(cls, registration_id: str): del cls._topics_registrations_list[to_remove_registration.topic] @classmethod - def publish(cls, event): + def publish(cls, event) -> None: """Publish a `Core^` service event to all registered listeners whose topic matches the event. Parameters: From 7214db49c4b3bfed4aa9bc78ee19e28bf86b72d6 Mon Sep 17 00:00:00 2001 From: Toan Quach Date: Fri, 31 May 2024 15:02:48 +0700 Subject: [PATCH 3/3] fixed wrong example --- taipy/core/notification/notifier.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/taipy/core/notification/notifier.py b/taipy/core/notification/notifier.py index d18c72e188..6e60383433 100644 --- a/taipy/core/notification/notifier.py +++ b/taipy/core/notification/notifier.py @@ -138,7 +138,7 @@ def unregister(cls, registration_id: str) -> None: Example usage: ```python - registration_id, registered_queue = Notifier.unregister( + registration_id, registered_queue = Notifier.register( entity_type=EventEntityType.CYCLE, entity_id="CYCLE_cycle_1", operation=EventOperation.CREATION