Skip to content

Commit

Permalink
Accept Streams as attachments
Browse files Browse the repository at this point in the history
Add support for uploading streams to S3.
  • Loading branch information
fastjames committed Nov 5, 2024
1 parent 8b058e5 commit 3bd1c57
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 9 deletions.
7 changes: 5 additions & 2 deletions lib/waffle/actions/store.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ defmodule Waffle.Actions.Store do
* A map with a filename and path keys (eg, a `%Plug.Upload{}`)
* A map with a filename and binary keys (eg, `%{filename: "image.png", binary: <<255,255,255,...>>}`)
* A map with a filename and stream keys (eg, `%{filename: "image.png", stream: %Stream{...}}`)
* A two-element tuple consisting of one of the above file formats as well as a scope map
Expand Down Expand Up @@ -50,11 +51,13 @@ defmodule Waffle.Actions.Store do
end
end

def store(definition, {file, scope}) when is_binary(file) or is_map(file) do
def store(definition, {file, scope})
when is_binary(file) or is_map(file) do
put(definition, {Waffle.File.new(file, definition), scope})
end

def store(definition, filepath) when is_binary(filepath) or is_map(filepath) do
def store(definition, filepath)
when is_binary(filepath) or is_map(filepath) do
store(definition, {filepath, nil})
end

Expand Down
9 changes: 8 additions & 1 deletion lib/waffle/file.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule Waffle.File do
@moduledoc false

defstruct [:path, :file_name, :binary, :is_tempfile?]
defstruct [:path, :file_name, :binary, :is_tempfile?, :stream]

def generate_temporary_path(item \\ nil) do
do_generate_temporary_path(item)
Expand Down Expand Up @@ -92,6 +92,13 @@ defmodule Waffle.File do
end
end

#
# Handle a stream
#
def new(%{filename: filename, stream: stream}, _definition) when is_struct(stream) do
%Waffle.File{stream: stream, file_name: Path.basename(filename)}
end

#
# Support functions
#
Expand Down
49 changes: 43 additions & 6 deletions lib/waffle/storage/s3.ex
Original file line number Diff line number Diff line change
Expand Up @@ -192,22 +192,41 @@ defmodule Waffle.Storage.S3 do
end
end

# If the file is a stream, send it to AWS as a multi-part upload
defp do_put(file = %Waffle.File{stream: file_stream}, {s3_bucket, s3_key, s3_options})
when is_struct(file_stream) do
file_stream
|> chunk_stream()
|> S3.upload(s3_bucket, s3_key, s3_options)
|> ExAws.request()
|> case do
{:ok, %{status_code: 200}} -> {:ok, file.file_name}
{:ok, :done} -> {:ok, file.file_name}
{:error, error} -> {:error, error}
end
rescue
e in ExAws.Error ->
Logger.error(inspect(e))
Logger.error(e.message)
{:error, :invalid_bucket}
end

# Stream the file and upload to AWS as a multi-part upload
defp do_put(file, {s3_bucket, s3_key, s3_options}) do
file.path
|> Upload.stream_file()
|> S3.upload(s3_bucket, s3_key, s3_options)
|> ExAws.request()
|> case do
{:ok, %{status_code: 200}} -> {:ok, file.file_name}
{:ok, :done} -> {:ok, file.file_name}
{:error, error} -> {:error, error}
end
{:ok, %{status_code: 200}} -> {:ok, file.file_name}
{:ok, :done} -> {:ok, file.file_name}
{:error, error} -> {:error, error}
end
rescue
e in ExAws.Error ->
Logger.error(inspect(e))
Logger.error(e.message)
{:error, :invalid_bucket}
Logger.error(e.message)
{:error, :invalid_bucket}
end

defp build_url(definition, version, file_and_scope, _options) do
Expand Down Expand Up @@ -264,4 +283,22 @@ defmodule Waffle.Storage.S3 do

defp parse_bucket({:system, env_var}) when is_binary(env_var), do: System.get_env(env_var)
defp parse_bucket(name), do: name

defp chunk_stream(stream, chunk_size \\ 5 * 1024 * 1024) do
Stream.chunk_while(
stream,
"",
fn element, acc ->
if String.length(acc) >= chunk_size do
{:cont, acc, element}
else
{:cont, acc <> element}
end
end,
fn
[] -> {:cont, []}
acc -> {:cont, acc, []}
end
)
end
end
9 changes: 9 additions & 0 deletions test/storage/s3_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,15 @@ defmodule WaffleTest.Storage.S3 do
delete_and_assert_not_found(DummyDefinition, "image.png")
end

@tag :s3
@tag timeout: 15_000
test "public put stream" do
img_map = %{filename: "image.png", stream: File.stream!(@img)}
assert {:ok, "image.png"} == DummyDefinition.store(img_map)
assert_public(DummyDefinition, "image.png")
delete_and_assert_not_found(DummyDefinition, "image.png")
end

@tag :s3
@tag timeout: 15_000
test "private put and signed get" do
Expand Down

0 comments on commit 3bd1c57

Please sign in to comment.