Skip to content

Commit

Permalink
feat: optionally time out if stream goes idle (#15)
Browse files Browse the repository at this point in the history
  • Loading branch information
boringcactus authored Apr 16, 2024
1 parent ea6a880 commit 85b15ea
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 5 deletions.
41 changes: 38 additions & 3 deletions lib/server_sent_event_stage.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ defmodule ServerSentEventStage do
The only required argument is `url`: it can be either a binary of the URL
to connect to or a {module, fun, arguments} tuple.
Other arguments are passed as options to `GenStage.start_link/3`.
Optional arguments:
- `headers`: a list of `{"name", "value"}` pairs
- `idle_timeout`: milliseconds of inactivity after which the connection will be restarted
- `debug`, `name`, `timeout`, `spawn_opt`: options for `GenStage.start_link/3`
"""
def start_link(args) do
_url = Keyword.fetch!(args, :url)
Expand All @@ -38,13 +41,24 @@ defmodule ServerSentEventStage do
end

# Server functions
defstruct [:url, :headers, :connected_url, :conn, :ref, buffer: "", redirecting?: false]
defstruct [
:url,
:headers,
:connected_url,
:conn,
:ref,
:idle_timeout,
:idle_timer,
buffer: "",
redirecting?: false
]

@doc false
def init(args) do
state = %__MODULE__{
url: Keyword.fetch!(args, :url),
headers: Keyword.get(args, :headers, [])
headers: Keyword.get(args, :headers, []),
idle_timeout: Keyword.get(args, :idle_timeout, nil)
}

{:producer, state}
Expand All @@ -57,7 +71,15 @@ defmodule ServerSentEventStage do
{:noreply, [], state}
end

def handle_info(:idle_timeout, state) do
Logger.warn(fn -> "#{__MODULE__} idle_timeout url=#{inspect(state.connected_url)}" end)
do_refresh!()
{:noreply, [], state}
end

def handle_info(message, %{conn: conn} = state) when conn != nil do
state = reset_idle_timeout(state)

case HTTP.stream(state.conn, message) do
{:ok, conn, responses} ->
state = %{state | conn: conn}
Expand Down Expand Up @@ -289,4 +311,17 @@ defmodule ServerSentEventStage do
defp do_refresh! do
send(self(), :connect)
end

defp reset_idle_timeout(state) do
if state.idle_timer != nil do
:ok = Process.cancel_timer(state.idle_timer, async: true, info: false)
end

timer =
if timeout = state.idle_timeout do
Process.send_after(self(), :idle_timeout, timeout)
end

%__MODULE__{state | idle_timer: timer}
end
end
45 changes: 43 additions & 2 deletions test/server_sent_event_stage_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,51 @@ defmodule ServerSentEventStageTest do
GenStage.stop(pid)
end

defp start_producer(bypass) do
test "applies idle timeout", %{bypass: bypass} do
idle_timeout = 200

{:ok, request_count} = Agent.start_link(fn -> 0 end)

Bypass.expect(bypass, fn conn ->
request_count = Agent.get_and_update(request_count, &{&1, &1 + 1})

conn = Plug.Conn.send_chunked(conn, 200)

conn =
case request_count do
0 ->
# ignore the connection termination in Bypass
Bypass.pass(bypass)

Process.sleep(:infinity)
conn

1 ->
# check that comments properly reset idle timeout
Process.sleep(idle_timeout - 10)
{:ok, conn} = Plug.Conn.chunk(conn, ": keep-alive\n")
Process.sleep(idle_timeout - 10)
conn

_ ->
conn
end

{:ok, conn} = Plug.Conn.chunk(conn, ~s(data: #{request_count}\n\n))

conn
end)

start_producer(bypass, idle_timeout: idle_timeout)

refute_receive {:events, _}, idle_timeout
assert_receive {:events, [%Event{data: "1\n"}]}, @assert_receive_timeout
end

defp start_producer(bypass, opts \\ []) do
url = "http://127.0.0.1:#{bypass.port}"
headers = [{"test", "confirmed"}]
{:ok, producer} = start_link(url: url, headers: headers)
{:ok, producer} = start_link([url: url, headers: headers] ++ opts)

{:ok, _consumer} = __MODULE__.SimpleSubscriber.start_link(self(), producer)

Expand Down

0 comments on commit 85b15ea

Please sign in to comment.