From a34ad21d8433c6fd632a4c92111f4d4d0c397839 Mon Sep 17 00:00:00 2001 From: Luke T Date: Thu, 20 Oct 2022 13:20:54 -0500 Subject: [PATCH 1/3] Add stacktrace to error log when subscriber.process/1 fails --- lib/event_bus/services/notification.ex | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/lib/event_bus/services/notification.ex b/lib/event_bus/services/notification.ex index 2911fc6..cf4032e 100644 --- a/lib/event_bus/services/notification.ex +++ b/lib/event_bus/services/notification.ex @@ -41,10 +41,10 @@ defmodule EventBus.Service.Notification do @spec notify_subscriber(subscriber(), event_shadow()) :: no_return() defp notify_subscriber({subscriber, config}, {topic, id}) do - subscriber.process({config, topic, id}) + subscriber.process({topic, id}) rescue error -> - log_error(subscriber, error) + log_error(subscriber, error, __STACKTRACE__) ObservationManager.mark_as_skipped({{subscriber, config}, {topic, id}}) end @@ -52,7 +52,7 @@ defmodule EventBus.Service.Notification do subscriber.process({topic, id}) rescue error -> - log_error(subscriber, error) + log_error(subscriber, error, __STACKTRACE__) ObservationManager.mark_as_skipped({subscriber, {topic, id}}) end @@ -69,9 +69,9 @@ defmodule EventBus.Service.Notification do Logger.warn(msg) end - @spec log_error(module(), any()) :: no_return() - defp log_error(subscriber, error) do - msg = "#{subscriber}.process/1 raised an error!\n#{inspect(error)}" - Logger.info(msg) + @spec log_error(module(), any(), any()) :: no_return() + defp log_error(subscriber, error, stacktrace) do + msg = "#{subscriber}.process/1 raised an error!\n" <> Exception.format(:error, error, stacktrace) + Logger.error(msg) end end From 20cfcf964730c0d9f415049f48451657b49dc6f7 Mon Sep 17 00:00:00 2001 From: Luke T Date: Thu, 20 Oct 2022 13:26:38 -0500 Subject: [PATCH 2/3] return inproperly deleted "config" param --- lib/event_bus/services/notification.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/event_bus/services/notification.ex b/lib/event_bus/services/notification.ex index cf4032e..bcdd841 100644 --- a/lib/event_bus/services/notification.ex +++ b/lib/event_bus/services/notification.ex @@ -41,7 +41,7 @@ defmodule EventBus.Service.Notification do @spec notify_subscriber(subscriber(), event_shadow()) :: no_return() defp notify_subscriber({subscriber, config}, {topic, id}) do - subscriber.process({topic, id}) + subscriber.process({config, topic, id}) rescue error -> log_error(subscriber, error, __STACKTRACE__) From e736d05e008e75ae5472f1c6b8c96fae720da3b4 Mon Sep 17 00:00:00 2001 From: Luke T Date: Sat, 22 Oct 2022 22:46:41 -0500 Subject: [PATCH 3/3] Add `declare/1` which returns a list of results from each listener --- lib/event_bus.ex | 17 +++++++++++++++++ lib/event_bus/managers/notification.ex | 15 ++++++++++++++- lib/event_bus/services/notification.ex | 15 ++++++++------- 3 files changed, 39 insertions(+), 8 deletions(-) diff --git a/lib/event_bus.ex b/lib/event_bus.ex index d7c4dbf..6e78b9e 100644 --- a/lib/event_bus.ex +++ b/lib/event_bus.ex @@ -80,6 +80,23 @@ defmodule EventBus do to: Notification, as: :notify + @doc """ + Send an event to all subscribers, returning the results of all the computation + for each subscriber. I called this `declare` as a stronger "notify" - stronger + because we get the results returned from each Subscriber. + + ## Examples + + event = %Event{id: 1, topic: :webhook_received, + data: %{"message" => "Hi all!"}} + EventBus.declare(event) + {:ok, [{FirstSubscriber, results}, {SecondSubscriber, results}]} + + """ + defdelegate declare(event), + to: Notification, + as: :declare + @doc """ Check if a topic registered. diff --git a/lib/event_bus/managers/notification.ex b/lib/event_bus/managers/notification.ex index c078793..c0e655f 100644 --- a/lib/event_bus/managers/notification.ex +++ b/lib/event_bus/managers/notification.ex @@ -33,6 +33,15 @@ defmodule EventBus.Manager.Notification do GenServer.cast(__MODULE__, {:notify, event}) end + @doc """ + Notify event to event.topic subscribers in the current node, while + also returning the results of the `process/2` function of each + subscriber. + """ + def declare(%Event{} = event) do + GenServer.call(__MODULE__, {:declare, event}) + end + ########################################################################### # PRIVATE API ########################################################################### @@ -40,7 +49,11 @@ defmodule EventBus.Manager.Notification do @doc false @spec handle_cast({:notify, event()}, term()) :: no_return() def handle_cast({:notify, event}, state) do - @backend.notify(event) + @backend.notify(event) # results discarded... {:noreply, state} end + + def handle_call({:declare, event}, _from, state) do + {:reply, @backend.notify(event), state} + end end diff --git a/lib/event_bus/services/notification.ex b/lib/event_bus/services/notification.ex index bcdd841..f5c5547 100644 --- a/lib/event_bus/services/notification.ex +++ b/lib/event_bus/services/notification.ex @@ -20,23 +20,21 @@ defmodule EventBus.Service.Notification do subscribers = SubscriptionManager.subscribers(topic) if subscribers == [] do - warn_missing_topic_subscription(topic) + {:error, warn_missing_topic_subscription(topic)} else :ok = StoreManager.create(event) :ok = ObservationManager.create({subscribers, {topic, id}}) notify_subscribers(subscribers, {topic, id}) end - - :ok end @spec notify_subscribers(subscribers(), event_shadow()) :: :ok defp notify_subscribers(subscribers, event_shadow) do - Enum.each(subscribers, fn subscriber -> - notify_subscriber(subscriber, event_shadow) + Enum.map(subscribers, fn subscriber -> + res = notify_subscriber(subscriber, event_shadow) + {subscriber, res} end) - :ok end @spec notify_subscriber(subscriber(), event_shadow()) :: no_return() @@ -46,6 +44,7 @@ defmodule EventBus.Service.Notification do error -> log_error(subscriber, error, __STACKTRACE__) ObservationManager.mark_as_skipped({{subscriber, config}, {topic, id}}) + {subscriber, {:error, error}} end defp notify_subscriber(subscriber, {topic, id}) do @@ -54,6 +53,7 @@ defmodule EventBus.Service.Notification do error -> log_error(subscriber, error, __STACKTRACE__) ObservationManager.mark_as_skipped({subscriber, {topic, id}}) + {subscriber, {:error, error}} end @spec registration_status(topic()) :: String.t() @@ -66,7 +66,8 @@ defmodule EventBus.Service.Notification do msg = "Topic(:#{topic}#{registration_status(topic)}) doesn't have subscribers" - Logger.warn(msg) + :ok = Logger.warn(msg) + msg end @spec log_error(module(), any(), any()) :: no_return()