Skip to content

Commit

Permalink
feat: in-memory Alert storage (#2121)
Browse files Browse the repository at this point in the history
Uses ServerSentEventStage to consume alerts events from the V3 API and
store them in an ETS table keyed by alert ID.
  • Loading branch information
sloanelybutsurely authored Aug 2, 2024
1 parent f6a1805 commit 41a1a7e
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 1 deletion.
82 changes: 82 additions & 0 deletions lib/screens/alerts/cache.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
defmodule Screens.Alerts.Cache do
@moduledoc """
GenStage Consumer of Alert server sent event data
"""
use GenStage

require Logger

alias Screens.Alerts
alias ServerSentEventStage.Event

@table __MODULE__

defstruct [:table]

def start_link(opts) do
{name, init_arg} = Keyword.pop(opts, :name, __MODULE__)
GenStage.start_link(__MODULE__, init_arg, name: name)
end

@impl true
def init(init_arg) do
subscribe_to = Keyword.get(init_arg, :subscribe_to, [Screens.Streams.Alerts.Producer])

table = @table

^table =
:ets.new(table, [:named_table, :set, read_concurrency: true, write_concurrency: false])

state = %__MODULE__{table: table}

{:consumer, state, subscribe_to: subscribe_to}
end

@impl true
def handle_events(events, _from, state) do
events
|> Enum.map(&decode_data/1)
|> Enum.each(&handle_event(&1, state))

{:noreply, [], state}
end

def all(table \\ @table) do
table
|> :ets.tab2list()
|> Enum.map(&elem(&1, 1))
end

defp handle_event(%Event{event: "reset", data: data}, state) do
alerts =
Enum.map(data, fn data ->
alert = Alerts.Parser.parse_alert(data)
{alert.id, alert}
end)

true = :ets.delete_all_objects(state.table)
true = :ets.insert(state.table, alerts)

:ok
end

defp handle_event(%Event{event: event, data: alert}, state) when event in ~w[add update] do
alert = Alerts.Parser.parse_alert(alert)

true = :ets.insert(state.table, {alert.id, alert})

:ok
end

defp handle_event(%Event{event: "remove", data: %{"id" => id}}, state) do
true = :ets.delete(state.table, id)

:ok
end

defp decode_data(%Event{data: encoded} = event) do
decoded = Jason.decode!(encoded)

%{event | data: decoded}
end
end
3 changes: 2 additions & 1 deletion lib/screens/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ defmodule Screens.Application do
{Screens.ScreensByAlert.SelfRefreshRunner, name: Screens.ScreensByAlert.SelfRefreshRunner},
Screens.OlCrowding.DynamicSupervisor,
{Screens.OlCrowding.Agent, %{}},
{Screens.ScreenApiResponseCache, []}
{Screens.ScreenApiResponseCache, []},
Screens.Streams.Alerts
]

# See https://hexdocs.pm/elixir/Supervisor.html
Expand Down
46 changes: 46 additions & 0 deletions lib/screens/streams/alerts.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
defmodule Screens.Streams.Alerts do
@moduledoc """
Supervisor for streamed producer and consumer(s) of Alerts data from the
V3 API
"""
use Supervisor

@dialyzer {:nowarn_function, children: 1}
@env Mix.env()

def start_link(opts) do
{name, init_arg} = Keyword.pop(opts, :name, __MODULE__)
Supervisor.start_link(__MODULE__, init_arg, name: name)
end

@impl true
def init(_init_arg) do
children()
|> Supervisor.init(strategy: :one_for_all)
end

defp children(env \\ @env)
defp children(:test), do: []

defp children(_env) do
api_url = Application.get_env(:screens, :default_api_v3_url)
api_key = Application.get_env(:screens, :api_v3_key)

url =
api_url
|> URI.merge("/alerts")
|> URI.to_string()

producer = {
ServerSentEventStage,
name: Screens.Streams.Alerts.Producer, url: url, headers: [{"x-api-key", api_key}]
}

consumer = {
Screens.Alerts.Cache,
name: Screens.Alerts.Cache, subscribe_to: [Screens.Streams.Alerts.Producer]
}

[producer, consumer]
end
end

0 comments on commit 41a1a7e

Please sign in to comment.