diff --git a/lib/screens/alerts/cache.ex b/lib/screens/alerts/cache.ex new file mode 100644 index 000000000..3219c91f8 --- /dev/null +++ b/lib/screens/alerts/cache.ex @@ -0,0 +1,82 @@ +defmodule Screens.Alerts.Cache do + @moduledoc """ + GenStage Consumer of Alert server sent event data + """ + use GenStage + + require Logger + + alias Screens.Alerts + alias ServerSentEventStage.Event + + @table __MODULE__ + + defstruct [:table] + + def start_link(opts) do + {name, init_arg} = Keyword.pop(opts, :name, __MODULE__) + GenStage.start_link(__MODULE__, init_arg, name: name) + end + + @impl true + def init(init_arg) do + subscribe_to = Keyword.get(init_arg, :subscribe_to, [Screens.Streams.Alerts.Producer]) + + table = @table + + ^table = + :ets.new(table, [:named_table, :set, read_concurrency: true, write_concurrency: false]) + + state = %__MODULE__{table: table} + + {:consumer, state, subscribe_to: subscribe_to} + end + + @impl true + def handle_events(events, _from, state) do + events + |> Enum.map(&decode_data/1) + |> Enum.each(&handle_event(&1, state)) + + {:noreply, [], state} + end + + def all(table \\ @table) do + table + |> :ets.tab2list() + |> Enum.map(&elem(&1, 1)) + end + + defp handle_event(%Event{event: "reset", data: data}, state) do + alerts = + Enum.map(data, fn data -> + alert = Alerts.Parser.parse_alert(data) + {alert.id, alert} + end) + + true = :ets.delete_all_objects(state.table) + true = :ets.insert(state.table, alerts) + + :ok + end + + defp handle_event(%Event{event: event, data: alert}, state) when event in ~w[add update] do + alert = Alerts.Parser.parse_alert(alert) + + true = :ets.insert(state.table, {alert.id, alert}) + + :ok + end + + defp handle_event(%Event{event: "remove", data: %{"id" => id}}, state) do + true = :ets.delete(state.table, id) + + :ok + end + + defp decode_data(%Event{data: encoded} = event) do + decoded = Jason.decode!(encoded) + + %{event | data: decoded} + end +end diff --git a/lib/screens/application.ex b/lib/screens/application.ex index a09ce781d..f3ba04269 100644 --- a/lib/screens/application.ex +++ b/lib/screens/application.ex @@ -36,7 +36,8 @@ defmodule Screens.Application do {Screens.ScreensByAlert.SelfRefreshRunner, name: Screens.ScreensByAlert.SelfRefreshRunner}, Screens.OlCrowding.DynamicSupervisor, {Screens.OlCrowding.Agent, %{}}, - {Screens.ScreenApiResponseCache, []} + {Screens.ScreenApiResponseCache, []}, + Screens.Streams.Alerts ] # See https://hexdocs.pm/elixir/Supervisor.html diff --git a/lib/screens/streams/alerts.ex b/lib/screens/streams/alerts.ex new file mode 100644 index 000000000..a5e0a5f1c --- /dev/null +++ b/lib/screens/streams/alerts.ex @@ -0,0 +1,46 @@ +defmodule Screens.Streams.Alerts do + @moduledoc """ + Supervisor for streamed producer and consumer(s) of Alerts data from the + V3 API + """ + use Supervisor + + @dialyzer {:nowarn_function, children: 1} + @env Mix.env() + + def start_link(opts) do + {name, init_arg} = Keyword.pop(opts, :name, __MODULE__) + Supervisor.start_link(__MODULE__, init_arg, name: name) + end + + @impl true + def init(_init_arg) do + children() + |> Supervisor.init(strategy: :one_for_all) + end + + defp children(env \\ @env) + defp children(:test), do: [] + + defp children(_env) do + api_url = Application.get_env(:screens, :default_api_v3_url) + api_key = Application.get_env(:screens, :api_v3_key) + + url = + api_url + |> URI.merge("/alerts") + |> URI.to_string() + + producer = { + ServerSentEventStage, + name: Screens.Streams.Alerts.Producer, url: url, headers: [{"x-api-key", api_key}] + } + + consumer = { + Screens.Alerts.Cache, + name: Screens.Alerts.Cache, subscribe_to: [Screens.Streams.Alerts.Producer] + } + + [producer, consumer] + end +end