Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support message batches #7

Merged
merged 2 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
Anthropix is an open-source Elixir client for the Anthropic API, providing a simple and convenient way to integrate Claude, Anthropic's powerful language model, into your applications.

- ✅ API client fully implementing the [Anthropic API](https://docs.anthropic.com/claude/reference/getting-started-with-the-api)
- 🛠️ Tool use (function calling)
- 🧰 Tool use (function calling)
- ⚡ Prompt caching
- 📦 Message batching
- 🛜 Streaming API requests
- Stream to an Enumerable
- Or stream messages to any Elixir process
Expand All @@ -21,13 +23,25 @@ The package can be installed by adding `anthropix` to your list of dependencies
```elixir
def deps do
[
{:anthropix, "~> 0.3"}
{:anthropix, "~> 0.4"}
]
end
```

## Quickstart

> [!NOTE]
> #### Beta features
>
> Anthropic frequently ship new features under a beta flag, requiring headers
to be added to your requests to take advantage of the feature. This library
currently enables the following beta headers by default:
>
> - `prompt-caching-2024-07-31`
> - `message-batches-2024-09-24`
>
> If required, beta headers can be customised with `init/2`.

For more examples, refer to the [Anthropix documentation](https://hexdocs.pm/anthropix).

### Initiate a client.
Expand Down
21 changes: 18 additions & 3 deletions lib/anthropix.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ defmodule Anthropix do
model, into your applications.

- ✅ API client fully implementing the [Anthropic API](https://docs.anthropic.com/claude/reference/getting-started-with-the-api)
- 🛠️ Tool use (function calling)
- 🧰 Tool use (function calling)
- ⚡ Prompt caching
- 📦 Message batching (`Anthropix.Batch`)
- 🛜 Streaming API requests
- Stream to an Enumerable
- Or stream messages to any Elixir process
Expand All @@ -30,7 +32,16 @@ defmodule Anthropix do

## Quickstart

For more examples, refer to the [Anthropix documentation](https://hexdocs.pm/anthropix).
> #### Beta features {: .info}
>
> Anthropic frequently ship new features under a beta flag, requiring headers
to be added to your requests to take advantage of the feature. This library
currently enables the following beta headers by default:
>
> - `prompt-caching-2024-07-31`
> - `message-batches-2024-09-24`
>
> If required, beta headers can be customised with `init/2`.

### Initiate a client.

Expand Down Expand Up @@ -271,7 +282,8 @@ defmodule Anthropix do
{:ok, map() | Enumerable.t() | Task.t()} |
{:error, term()}

@typep req_response() ::
@typedoc false
@type req_response() ::
{:ok, Req.Response.t() | Task.t() | Enum.t()} |
{:error, term()}

Expand Down Expand Up @@ -573,4 +585,7 @@ defmodule Anthropix do
# Tidy up when the streaming request is finished
defp stream_end(%Task{ref: ref}), do: Process.demonitor(ref, [:flush])

@doc false
def chat_schema, do: schema(:chat).schema

end
295 changes: 295 additions & 0 deletions lib/anthropix/batch.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,295 @@
defmodule Anthropix.Batch do
@moduledoc """
The [message batches API](https://docs.anthropic.com/en/docs/build-with-claude/message-batches)
is a powerful, cost-effective way to asynchronously process large volumes of
messages requests. This approach is well-suited to tasks that do not require
immediate responses, reducing costs by 50% while increasing throughput.
"""
use Anthropix.Schemas
alias Anthropix.APIError

schema :list_params, [
before_id: [
type: :string,
doc: "Returns the page of results immediately before this object.",
],
after_id: [
type: :string,
doc: "Returns the page of results immediately after this object.",
],
limit: [
type: :integer,
doc: "Number of items to return per page. Between 1-100 (default 20)."
],
]

@doc """
List all message batches. Most recently created batches are returned first.

## Options

#{doc(:list_params)}

## Examples

```elixir
iex> Anthropix.Batch.list(client)
{:ok, %{"data" => [...]}}

# With pagination options
iex> Anthropix.Batch.list(client, after_id: "msgbatch_01DJuZbTFXpGRhqTdqFH1P2R", limit: 50)
{:ok, %{"data" => [...]}}
```
"""
@spec list(Anthropix.client(), keyword()) :: any
def list(%Anthropix{} = client, params \\ []) do
with {:ok, params} <- NimbleOptions.validate(params, schema(:list_params)) do
client
|> req(:get, "/messages/batches", params: params)
|> res()
end
end

schema :batch_params, [
custom_id: [
type: :string,
required: true,
doc: "Developer-provided ID created for each request in a Message Batch."
],
params: [
type: :map,
keys: Anthropix.chat_schema(),
required: true,
doc: "Messages API creation parameters for the individual request."
]
]

schema :create_params, [
requests: [
type: {:list, {:map, schema(:batch_params).schema}},
required: true,
doc: "List of requests for prompt completion."
]
]

@doc """
Send a batch of message creation requests. Used to process multiple message
requests in a single batch. Processing begins immediately.

## Options

#{doc(:create_params)}

## Examples

```elixir
iex> Anthropix.Batch(client, [
...> %{custom_id: "foo", params: %{model: "claude-3-haiku-20240307", messages: [%{role: "user", content: "Why is the sky blue?"}]}},
...> %{custom_id: "bar", params: %{model: "claude-3-haiku-20240307", messages: [%{role: "user", content: "Why is the sea blue?"}]}},
...> ])
{:ok, %{
"type" => "message_batch",
"id" => "msgbatch_01DJuZbTFXpGRhqTdqFH1P2R"
"processing_status" => "in_progress",
...
}}
```
"""
@spec create(Anthropix.client(), list(keyword())) :: any
def create(%Anthropix{} = client, params \\ []) do
with {:ok, params} <- NimbleOptions.validate(params, schema(:create_params)) do
client
|> req(:post, "/messages/batches", json: Enum.into(params, %{}))
|> res()
end
end

@doc """
Cancels a message batch before processing ends. Depending on when cancellation
is initiated, the number of cancelled messages requests will vary.

## Examples

```elixir
iex> Anthropix.Batch.cancel(client, "msgbatch_01DJuZbTFXpGRhqTdqFH1P2R")
{:ok, %{
"type" => "message_batch",
"id" => "msgbatch_01DJuZbTFXpGRhqTdqFH1P2R"
"processing_status" => "canceling",
...
}}
```
"""
@spec cancel(Anthropix.client(), String.t()) :: any
def cancel(%Anthropix{} = client, batch_id) when is_binary(batch_id) do
client
|> req(:post, "/messages/batches/#{batch_id}/cancel", [])
|> res()
end

@doc """
Retrieve the status of the batch. Can be used to poll for message batch
completion.

When the batch is ready, the `"processing_status"` will be `"ended"` and the
response will include a `"results_url"` value.

## Examples

```elixir
iex> Anthropix.Batch.show(client, "msgbatch_01DJuZbTFXpGRhqTdqFH1P2R")
{:ok, %{
"type" => "message_batch",
"id" => "msgbatch_01DJuZbTFXpGRhqTdqFH1P2R"
"processing_status" => "ended",
"results_url" => "https://api.anthropic.com/v1/messages/batches/msgbatch_01DJuZbTFXpGRhqTdqFH1P2R/results",
...
}}
```
"""
@spec show(Anthropix.client(), String.t()) :: any
def show(%Anthropix{} = client, batch_id) when is_binary(batch_id) do
client
|> req(:get, "/messages/batches/#{batch_id}", [])
|> res()
end

schema :results_params, [
stream: [
type: {:or, [:boolean, :pid]},
default: false,
doc: "Whether to stream the batch results.",
],
]

@doc """
Retrieve the status of the batch. Will respond with or stream a list of chat
message completions.

It is preferred to pass the result of `show/2` directly, but also accepts a
`results_url` or `batch_id` string.

Results are not guaranteed to be in the same order as requests. Use the
`custom_id` field to match results to requests.

## Options

#{doc(:create_params)}

## Examples

```elixir
iex> Anthropix.Batch.results(client, "msgbatch_01DJuZbTFXpGRhqTdqFH1P2R")
{:ok, [
%{"custom_id" => "foo", "result" => %{...}},
%{"custom_id" => "bar", "result" => %{...}},
]}

# Passing true to the :stream option initiates an async streaming request.
iex> Anthropix.Batch.results(client, "msgbatch_01DJuZbTFXpGRhqTdqFH1P2R", stream: true)
{:ok, #Function<52.53678557/2 in Stream.resource/3>}
```
"""
@spec results(Anthropix.client(), String.t() | map(), keyword()) :: any
def results(client, map_or_batch_id, params \\ [])

def results(%Anthropix{} = client, %{"results_url" => url}, params)
when is_binary(url),
do: results(client, url, params)

def results(%Anthropix{} = client, batch_id, params) when is_binary(batch_id) do
url = case String.match?(batch_id, ~r/^https?:\/\//) do
true -> batch_id
false -> "/messages/batches/#{batch_id}/results"
end

with {:ok, params} <- NimbleOptions.validate(params, schema(:results_params)) do
client
|> req(:get, url, params)
|> res()
end
end

# Builds the request from the given params
@spec req(Anthropix.client(), atom(), Req.url(), keyword()) :: Anthropix.req_response()
defp req(%Anthropix{req: req}, method, url, opts) do
opts = Keyword.merge(opts, method: method, url: url)
{stream_opt, opts} = Keyword.pop(opts, :stream, false)
dest = if is_pid(stream_opt), do: stream_opt, else: self()

if stream_opt do
opts = Keyword.put(opts, :into, stream_handler(dest))
task = Task.async(fn -> Req.request(req, opts) |> res() end)

case stream_opt do
true -> {:ok, Stream.resource(fn -> task end, &stream_next/1, &stream_end/1)}
_ -> {:ok, task}
end
else
Req.request(req, opts)
end
end

# Normalizes the response returned from the request
@spec res(Anthropix.req_response()) :: Anthropix.response()
defp res({:ok, %Task{} = task}), do: {:ok, task}
defp res({:ok, enum}) when is_function(enum), do: {:ok, enum}

defp res({:ok, %{status: status, body: body} = res}) when status in 200..299 do
with [header] <- Req.Response.get_header(res, "content-disposition"),
true <- String.match?(header, ~r/\.jsonl/)
do
results = body
|> String.split("\n")
|> Enum.map(&Jason.decode!/1)
{:ok, results}
else
_ -> {:ok, body}
end
end

defp res({:ok, %{body: body}}) do
{:error, APIError.exception(body)}
end

defp res({:error, error}), do: {:error, error}

# Returns a callback to handle streaming responses
@spec stream_handler(pid()) :: fun()
defp stream_handler(pid) do
fn {:data, data}, {req, res} ->
data
|> String.split("\n")
|> Enum.map(&Jason.decode!/1)
|> Enum.each(& Process.send(pid, {self(), {:data, &1}}, []))

{:cont, {req, update_in(res.body, & &1 <> data)}}
end
end

# Recieve messages into a stream
defp stream_next(%Task{pid: pid, ref: ref} = task) do
receive do
{^pid, {:data, data}} ->
{[data], task}

{^ref, {:ok, %Req.Response{status: status}}} when status in 200..299 ->
{:halt, task}

{^ref, {:ok, %Req.Response{body: body}}} ->
raise APIError.exception(body)

{^ref, {:error, error}} ->
raise error

{:DOWN, _ref, _, _pid, _reason} ->
{:halt, task}
after
30_000 -> {:halt, task}
end
end

# Tidy up when the streaming request is finished
defp stream_end(%Task{ref: ref}), do: Process.demonitor(ref, [:flush])

end
Loading
Loading