Skip to content

Commit

Permalink
Merge pull request #28 from satoren/persistence
Browse files Browse the repository at this point in the history
BREAKING CHANGE: Initial state can now be passed to Persistence
  • Loading branch information
satoren authored Jul 25, 2024
2 parents e4b407f + 8b96a00 commit 8176e57
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 19 deletions.
24 changes: 16 additions & 8 deletions lib/managed/shared_doc.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ defmodule Yex.Managed.SharedDoc do

@type launch_param ::
{:doc_name, String.t()}
| {:persistence, module()}
| {:persistence, {module() | {module(), init_arg :: term()}}}
| {:idle_timeout, integer()}
| {:pg_scope, atom()}
| {:local_pubsub, module()}
Expand Down Expand Up @@ -44,7 +44,13 @@ defmodule Yex.Managed.SharedDoc do
@impl true
def init(option) do
doc_name = Keyword.fetch!(option, :doc_name)
persistence = Keyword.get(option, :persistence)

{persistence, persistence_init_arg} =
case Keyword.get(option, :persistence) do
{module, init_arg} -> {module, init_arg}
module -> {module, nil}
end

timeout = Keyword.get(option, :idle_timeout, @default_idle_timeout)
pg_scope = Keyword.get(option, :pg_scope, nil)
local_pubsub = Keyword.get(option, :local_pubsub, nil)
Expand All @@ -54,8 +60,10 @@ defmodule Yex.Managed.SharedDoc do
Awareness.clean_local_state(awareness)

persistence_state =
if function_exported?(persistence, :bind, 2) do
persistence.bind(doc_name, doc)
if function_exported?(persistence, :bind, 3) do
persistence.bind(persistence_init_arg, doc_name, doc)
else
persistence_init_arg
end

{:ok, step1_data} = Sync.get_sync_step1(doc)
Expand Down Expand Up @@ -99,7 +107,7 @@ defmodule Yex.Managed.SharedDoc do

@impl true
def handle_call(:doc_name, _from, state) do
{:reply, state.doc_name, state}
{:reply, state.doc_name, state, state.timeout}
end

@impl true
Expand Down Expand Up @@ -281,10 +289,10 @@ defmodule Yex.Managed.SharedDoc do
Persistence behavior for SharedDoc
"""

@callback bind(doc_name :: String.t(), doc :: Doc.t()) :: term()
@callback unbind(term :: term(), doc_name :: String.t(), doc :: Doc.t()) :: :ok
@callback bind(state :: term(), doc_name :: String.t(), doc :: Doc.t()) :: term()
@callback unbind(state :: term(), doc_name :: String.t(), doc :: Doc.t()) :: :ok
@callback update_v1(
term :: term(),
state :: term(),
update :: binary(),
doc_name :: String.t(),
doc :: Doc.t()
Expand Down
9 changes: 5 additions & 4 deletions lib/managed/shared_doc_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ defmodule Yex.Managed.SharedDocSupervisor do
@dynamic_supervisor Yex.Managed.SharedDocSupervisor.DynamicSupervisor

@type launch_param ::
{:persistence, module()}
{:persistence, {module() | {module(), init_arg :: term()}}}
| {:idle_timeout, integer()}
| {:pg_scope, atom()}
| {:local_pubsub, module()}
Expand Down Expand Up @@ -71,17 +71,18 @@ defmodule Yex.Managed.SharedDocSupervisor do
Supervisor.init(children, strategy: :one_for_one)
end

def start_child(doc_name) do
def start_child(doc_name, start_arg \\ []) do
name = via_name(doc_name)

@dynamic_supervisor.start_child(doc_name: doc_name, name: name)
option = [doc_name: doc_name, name: name] ++ start_arg
@dynamic_supervisor.start_child(option)

try do
# check started
SharedDoc.doc_name(name)
catch
_ ->
@dynamic_supervisor.start_child(doc_name: doc_name, name: name)
@dynamic_supervisor.start_child(option)
end

{:ok, name}
Expand Down
72 changes: 71 additions & 1 deletion test/managed/shared_doc_supervisor_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,27 @@ defmodule Yex.Managed.SharedDocSupervisorTest do
end

defp random_docname() do
:crypto.strong_rand_bytes(10)
for _ <- 1..10,
into: "",
do:
Enum.random([
"0",
"1",
"2",
"3",
"4",
"5",
"6",
"7",
"8",
"9",
"a",
"b",
"c",
"d",
"e",
"f"
])
end

defp receive_and_handle_reply_with_timeout(doc, timeout \\ 10) do
Expand Down Expand Up @@ -83,4 +103,54 @@ defmodule Yex.Managed.SharedDocSupervisorTest do
Task.await(client1)
Task.await(client2)
end

describe "override option" do
defmodule TestPersistence do
@behaviour Yex.Managed.SharedDoc.PersistenceBehaviour

def bind(state, _doc_name, doc) do
Doc.get_array(doc, "array")
|> Array.insert(0, "initial_data")

state
end

def unbind(state, doc_name, doc) do
case Yex.encode_state_as_update(doc) do
{:ok, update} ->
File.write!(Path.join(state.out_dir, doc_name), update, [:write, :binary])

_ ->
[]
end

:ok
end

def update_v1(state, _update, _doc_name, _doc) do
state
end
end

@tag :tmp_dir
test "persistence and arg", %{tmp_dir: tmp_dir} do
docname = random_docname()

{:ok, remote_shared_doc} =
SharedDocSupervisor.start_child(docname,
persistence: {TestPersistence, %{out_dir: tmp_dir}},
idle_timeout: 1
)

remote_shared_doc = GenServer.whereis(remote_shared_doc)
Process.monitor(remote_shared_doc)

assert_receive {:DOWN, _, :process, ^remote_shared_doc, _}

# see TestPersistence.unbind/3
persistence_data_path = Path.join(tmp_dir, docname)
data = File.read!(persistence_data_path)
assert byte_size(data) > 0
end
end
end
8 changes: 2 additions & 6 deletions test/managed/shared_doc_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ defmodule Yex.Managed.SharedDocTest do
defmodule PersistenceTest do
@behaviour Yex.Managed.SharedDoc.PersistenceBehaviour

def bind(_doc_name, doc) do
def bind(_arg, _doc_name, doc) do
Doc.get_array(doc, "array")
|> Array.insert(0, "initial_data")

Expand Down Expand Up @@ -203,7 +203,7 @@ defmodule Yex.Managed.SharedDocTest do
defmodule PersistenceFileWriteTest do
@behaviour Yex.Managed.SharedDoc.PersistenceBehaviour

def bind(_doc_name, doc) do
def bind(_state, _doc_name, doc) do
Doc.get_array(doc, "array")
|> Array.insert(0, "initial_data")

Expand All @@ -221,10 +221,6 @@ defmodule Yex.Managed.SharedDocTest do

:ok
end

def update_v1(state, update, _doc_name, _doc) do
[update | state]
end
end

docname = random_docname()
Expand Down

0 comments on commit 8176e57

Please sign in to comment.