Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add stacktrace to error log when subscriber.process/1 fails #177

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions lib/event_bus.ex
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,23 @@ defmodule EventBus do
to: Notification,
as: :notify

@doc """
Send an event to all subscribers, returning the results of all the computation
for each subscriber. I called this `declare` as a stronger "notify" - stronger
because we get the results returned from each Subscriber.

## Examples

event = %Event{id: 1, topic: :webhook_received,
data: %{"message" => "Hi all!"}}
EventBus.declare(event)
{:ok, [{FirstSubscriber, results}, {SecondSubscriber, results}]}

"""
defdelegate declare(event),
to: Notification,
as: :declare

@doc """
Check if a topic registered.

Expand Down
15 changes: 14 additions & 1 deletion lib/event_bus/managers/notification.ex
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,27 @@ defmodule EventBus.Manager.Notification do
GenServer.cast(__MODULE__, {:notify, event})
end

@doc """
Notify event to event.topic subscribers in the current node, while
also returning the results of the `process/2` function of each
subscriber.
"""
def declare(%Event{} = event) do
GenServer.call(__MODULE__, {:declare, event})
end

###########################################################################
# PRIVATE API
###########################################################################

@doc false
@spec handle_cast({:notify, event()}, term()) :: no_return()
def handle_cast({:notify, event}, state) do
@backend.notify(event)
@backend.notify(event) # results discarded...
{:noreply, state}
end

def handle_call({:declare, event}, _from, state) do
{:reply, @backend.notify(event), state}
end
end
27 changes: 14 additions & 13 deletions lib/event_bus/services/notification.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,40 +20,40 @@ defmodule EventBus.Service.Notification do
subscribers = SubscriptionManager.subscribers(topic)

if subscribers == [] do
warn_missing_topic_subscription(topic)
{:error, warn_missing_topic_subscription(topic)}
else
:ok = StoreManager.create(event)
:ok = ObservationManager.create({subscribers, {topic, id}})

notify_subscribers(subscribers, {topic, id})
end

:ok
end

@spec notify_subscribers(subscribers(), event_shadow()) :: :ok
defp notify_subscribers(subscribers, event_shadow) do
Enum.each(subscribers, fn subscriber ->
notify_subscriber(subscriber, event_shadow)
Enum.map(subscribers, fn subscriber ->
res = notify_subscriber(subscriber, event_shadow)
{subscriber, res}
end)
:ok
end

@spec notify_subscriber(subscriber(), event_shadow()) :: no_return()
defp notify_subscriber({subscriber, config}, {topic, id}) do
subscriber.process({config, topic, id})
rescue
error ->
log_error(subscriber, error)
log_error(subscriber, error, __STACKTRACE__)
ObservationManager.mark_as_skipped({{subscriber, config}, {topic, id}})
{subscriber, {:error, error}}
end

defp notify_subscriber(subscriber, {topic, id}) do
subscriber.process({topic, id})
rescue
error ->
log_error(subscriber, error)
log_error(subscriber, error, __STACKTRACE__)
ObservationManager.mark_as_skipped({subscriber, {topic, id}})
{subscriber, {:error, error}}
end

@spec registration_status(topic()) :: String.t()
Expand All @@ -66,12 +66,13 @@ defmodule EventBus.Service.Notification do
msg =
"Topic(:#{topic}#{registration_status(topic)}) doesn't have subscribers"

Logger.warn(msg)
:ok = Logger.warn(msg)
msg
end

@spec log_error(module(), any()) :: no_return()
defp log_error(subscriber, error) do
msg = "#{subscriber}.process/1 raised an error!\n#{inspect(error)}"
Logger.info(msg)
@spec log_error(module(), any(), any()) :: no_return()
defp log_error(subscriber, error, stacktrace) do
msg = "#{subscriber}.process/1 raised an error!\n" <> Exception.format(:error, error, stacktrace)
Logger.error(msg)
end
end