Skip to content

Commit

Permalink
Performance: Tasks / Struct (#337)
Browse files Browse the repository at this point in the history
Added:
- `Equinox.Core`: `Category` base class, with `Decider` and `Stream` helper `module`s

Changed:
- Performance: Switch surface APIs to `struct` Tuples and Options where relevant, some due to `struct` changes in jet/FsCodec#82, and use `task` in hot paths
- `Equinox.Decider`: `log` is now supplied via `Equinox.Category`
- `Equinox.Decider`: `maxAttempts` with a default policy and an optional argument on `Transact*` APIs re #232 
- `Equinox`: push `Serilog` dependency from `Equinox` out to `Equinox.Core`
  • Loading branch information
bartelink authored Sep 2, 2022
1 parent 482bab8 commit 34b62ea
Show file tree
Hide file tree
Showing 74 changed files with 1,022 additions and 903 deletions.
9 changes: 8 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,30 @@ The `Unreleased` section name is replaced by the expected version of next releas

- `Equinox`: `Decider.Transact`, `TransactAsync`, `TransactExAsync` overloads [#325](https://github.com/jet/equinox/pull/325)
- `Equinox.ISyncContext.StreamEventBytes`: Exposes stored size of events in the stream (initial impl provides it for `DynamoStore` only) [#326](https://github.com/jet/equinox/pull/326)
- `Equinox.Core`: `Category` base class, with `Decider` and `Stream` helper `module`s [#337](https://github.com/jet/equinox/pull/337)
- `CosmosStore.Prometheus`: Add `rut` tag to enable filtering/grouping by Read vs Write activity as per `DynamoStore` [#321](https://github.com/jet/equinox/pull/321)
- `DynamoStore`/`DynamoStore.Prometheus`: Implements the majority of the `CosmosStore` functionality via `FSharp.AWS.DynamoDB` [#321](https://github.com/jet/equinox/pull/321)
- `EventStoreDb`: As per `EventStore` module, but using the modern `EventStore.Client.Grpc.Streams` client [#196](https://github.com/jet/equinox/pull/196)
- `eqx dump`: `-s` flag is now optional

### Changed

- Performance: Switch surface APIs to `struct` Tuples and Options where relevant, some due to `struct` changes in [`FsCodec` #82](https://github.com/jet/FsCodec/pull/82), and use `task` in hot paths [#337](https://github.com/jet/equinox/pull/337)
- `Equinox`: Merge `ResolveOption` and `XXXStoreCategory.FromMemento` as `LoadOption` [#308](https://github.com/jet/equinox/pull/308)
- `Equinox`: Merge `XXXStoreCategory.Resolve(sn, ?ResolveOption)` and `XXXStoreCategory.FromMemento` as option `LoadOption` parameter on all `Transact` and `Query` methods [#308](https://github.com/jet/equinox/pull/308)
- `Equinox.Decider`: `log` is now supplied via `Equinox.Category` [#337](https://github.com/jet/equinox/pull/337)
- `Equinox.Decider`: `maxAttempts` with a default policy and an optional argument on `Transact*` APIs [#337](https://github.com/jet/equinox/pull/337)
- `Equinox`: push `Serilog` dependency out to `Equinox.Core` [#337](https://github.com/jet/equinox/pull/337)
- `Equinox.Core`: push `FsCodec` dependency out to concrete stores [#337](https://github.com/jet/equinox/pull/337)
- `CosmosStore`: Require `Microsoft.Azure.Cosmos` v `3.27.0` [#310](https://github.com/jet/equinox/pull/310)
- `CosmosStore`: Switch to natively using `JsonElement` event bodies [#305](https://github.com/jet/equinox/pull/305) :pray: [@ylibrach](https://github.com/ylibrach)
- `CosmosStore`: Switch to natively using `System.Text.Json` for serialization of all `Microsoft.Azure.Cosmos` round-trips [#305](https://github.com/jet/equinox/pull/305) :pray: [@ylibrach](https://github.com/ylibrach)
- `CosmosStore`: Only log `bytes` when log level is `Debug` [#305](https://github.com/jet/equinox/pull/305)
- `EventStore`: Target `EventStore.Client` v `22.0.0-preview`; rename `Connector` -> `EventStoreConnector` [#317](https://github.com/jet/equinox/pull/317)
- `Equinox.Tool`/`samples/`: switched to use `Equinox.EventStoreDb` [#196](https://github.com/jet/equinox/pull/196)
- Update all non-Client dependencies except `FSharp.Core`, `FSharp.Control.AsyncSeq` [#310](https://github.com/jet/equinox/pull/310)
- Update all Stores to use `FsCodec` v `3.0.0`, with [`EventBody` types switching from `byte[]` to `ReadOnlyMemory<byte>`, see FsCodec#75](https://github.com/jet/FsCodec/pull/75) [#323](https://github.com/jet/equinox/pull/323)
- `FSharp.Core` requirement to `6.0.0` [#337](https://github.com/jet/equinox/pull/337)
- Update all Stores to use `FsCodec` v `3.0.0`, with [`EventBody` types switching from `byte[]` to `ReadOnlyMemory<byte>` and/or `JsonElement` see FsCodec#75](https://github.com/jet/FsCodec/pull/75) [#323](https://github.com/jet/equinox/pull/323)
- `CosmosStore.Core.Initialization.initAux`: Replace hard-coded manual 400 RU with `mode` parameter [#328](https://github.com/jet/equinox/pull/328) :pray: [@brihadish](https://github.com/brihadish)

### Removed
Expand Down
41 changes: 14 additions & 27 deletions DOCUMENTATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ Stored Procedure | JavaScript code stored in a Container that (repeatedl

Term | Description
--------------------------|------------
Category | Group of Streams bearing a common prefix `{Category}-{StreamId}`
Category | Group of Streams bearing a common prefix `{category}-{streamId}`
Event | json or blob payload, together with an Event Type name representing an Event
EventStore | [Open source](https://eventstore.org) Event Sourcing-optimized data store server and programming model with powerful integrated projection facilities
Rolling Snapshot | Event written to an EventStore stream in order to ensure minimal store roundtrips when there is a Cache miss
Expand Down Expand Up @@ -308,7 +308,7 @@ module Aggregate
(* StreamName section *)
let [<Literal>] Category = "category"
let streamName id = FsCodec.StreamName.create Category (Id.toString id)
let streamName id = struct (Category, Id.toString id)
(* Optionally, Helpers/Types *)
Expand Down Expand Up @@ -372,11 +372,7 @@ type Service internal (resolve : Id -> Equinox.Decider<Events.Event, Fold.State)
let decider = resolve id
decider.Transact(decideX inputs)
let create resolveStream =
let resolve id =
let stream = resolveStream (streamName id)
Equinox.Decider(Serilog.Log.ForContext<Service>(), stream, maxAttempts = 3)
Service(resolve)
let create resolve = Service(streamName >> resolve)
```

- `Service`'s constructor is `internal`; `create` is the main way in which one
Expand Down Expand Up @@ -543,10 +539,8 @@ brevity, that implements all the relevant functions above:
```fsharp
(* Event stream naming + schemas *)
let [<Literal>] Category =
"Favorites"
let streamName (id : ClientId) =
FsCodec.StreamName.create Category (ClientId.toString id)
let [<Literal>] Category = "Favorites"
let streamName (id : ClientId) = struct (Category, ClientId.toString id)
type Item = { id: int; name: string; added: DateTimeOffset }
type Event =
Expand Down Expand Up @@ -593,7 +587,7 @@ let toSnapshot state = [Event.Snapshotted (Array.ofList state)]
* The Service defines operations in business terms, neutral to any concrete
* store selection or implementation supplied only a `resolve` function that can
* be used to map from ids (as supplied to the `streamName` function) to an
* Equinox Stream typically the service should be a stateless Singleton
* Equinox.Decider; Typically the service should be a stateless Singleton
*)
type Service internal (resolve : ClientId -> Equinox.Decider<Events.Event, Fold.State>) =
Expand All @@ -615,10 +609,8 @@ type Service internal (resolve : ClientId -> Equinox.Decider<Events.Event, Fold.
member _.List clientId : Async<Events.Favorited []> =
read clientId
let create resolveStream : Service =
let resolve id =
Equinox.Decider(Serilog.Log.ForContext<Service>(), resolveStream (streamName id), maxAttempts = 3)
Service(resolve)
let create resolve : Service =
Service(streamName >> resolve)
```

<a name="api"></a>
Expand Down Expand Up @@ -697,13 +689,13 @@ Equinox’s Command Handling consists of < 200 lines including interfaces and
comments in https://github.com/jet/equinox/tree/master/src/Equinox - the
elements you'll touch in a normal application are:

- [`module Flow`](https://github.com/jet/equinox/blob/master/src/Equinox/Core.fs#L34) -
- [`module Impl`](https://github.com/jet/equinox/blob/master/src/Equinox/Core.fs#L33) -
internal implementation of Optimistic Concurrency Control / retry loop used
by `Decider`. It's recommended to at least scan this file as it defines the
Transaction semantics that are central to Equinox and the overall `Decider` concept.
- [`type Decider`](https://github.com/jet/equinox/blob/master/src/Equinox/Decider.fs#L11) -
- [`type Decider`](https://github.com/jet/equinox/blob/master/src/Equinox/Decider.fs#L7) -
surface API one uses to `Transact` or `Query` against a specific stream's state
- [`type LoadOption` Discriminated Union](https://github.com/jet/equinox/blob/master/src/Equinox/Decider.fs#L59) -
- [`type LoadOption` Discriminated Union](https://github.com/jet/equinox/blob/master/src/Equinox/Decider.fs#L110) -
used to specify optimization overrides to be applied when a `Decider`'s `Query` or `Transact` operations establishes the state of the stream

Its recommended to read the examples in conjunction with perusing the code in
Expand Down Expand Up @@ -846,11 +838,7 @@ type Service internal (resolve : string -> Equinox.Decider<Events.Event, Fold.St
let decider = resolve clientId
decider.Query id
let create resolve =
let resolve clientId =
let streamName = streamName clientId
Equinox.Decider(log, resolve streamName, maxAttempts = 3)
Service(resolve)
let create resolve = Service(streamName >> resolve)
```

`Read` above will do a roundtrip to the Store in order to fetch the most recent
Expand Down Expand Up @@ -921,9 +909,8 @@ result in you ending up with a model that's potentially both:

- the `resolve` parameter affords one a sufficient
[_seam_](http://www.informit.com/articles/article.aspx?p=359417) that
facilitates testing independently with a mocked or stubbed `IStream` (without
adding any references), or a `MemoryStore` (which does necessitate a
reference to a separate Assembly for clarity) as desired.
facilitates testing independently with `MemoryStore` (which does necessitate a
reference to a separate Assembly] as desired.

### Todo[Backend] walkthrough

Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ The components within this repository are delivered as multi-targeted Nuget pack

## Core library

- `Equinox` [![NuGet](https://img.shields.io/nuget/v/Equinox.svg)](https://www.nuget.org/packages/Equinox/): Store-agnostic decision flow runner that manages the optimistic concurrency protocol. ([depends](https://www.fuget.org/packages/Equinox) on `FsCodec` (for the `StreamName` type-contract), `Serilog` (but no specific Serilog sinks, i.e. you configure to emit to `NLog` etc))
- `Equinox` [![NuGet](https://img.shields.io/nuget/v/Equinox.svg)](https://www.nuget.org/packages/Equinox/): Store-agnostic decision flow runner that manages the optimistic concurrency protocol and application-level API surface. ([depends](https://www.fuget.org/packages/Equinox) only on `FSharp.Core` v `6.0.0`

## Serialization support

Expand All @@ -150,7 +150,7 @@ The components within this repository are delivered as multi-targeted Nuget pack

## Data Store libraries

- `Equinox.Core` [![NuGet](https://img.shields.io/nuget/v/Equinox.Core.svg)](https://www.nuget.org/packages/Equinox.Core/): Interfaces and helpers used in the concrete Store implementations, together with the default [`System.Runtime.Caching.Cache`-based] `Cache` implementation. Hosts generic utility types frequently useful alongside Equinox: [`AsyncCacheCell`](https://github.com/jet/equinox/blob/master/src/Equinox.Core/AsyncCacheCell.fs#L36), [`AsyncBatchingGate`](https://github.com/jet/equinox/blob/master/src/Equinox.Core/AsyncBatchingGate.fs#L41). ([depends](https://www.fuget.org/packages/Equinox.Core) on `Equinox`, `System.Runtime.Caching`)
- `Equinox.Core` [![NuGet](https://img.shields.io/nuget/v/Equinox.Core.svg)](https://www.nuget.org/packages/Equinox.Core/): Interfaces and helpers used in the concrete Store implementations, together with the default [`System.Runtime.Caching.Cache`-based] `Cache` implementation. Hosts generic utility types frequently useful alongside Equinox: [`AsyncCacheCell`](https://github.com/jet/equinox/blob/master/src/Equinox.Core/AsyncCacheCell.fs#L36), [`AsyncBatchingGate`](https://github.com/jet/equinox/blob/master/src/Equinox.Core/AsyncBatchingGate.fs#L41). ([depends](https://www.fuget.org/packages/Equinox.Core) on `Equinox`, `System.Runtime.Caching`, `Serilog` (but not specific Serilog sinks, i.e. you configure to emit to `NLog` etc))
- `Equinox.MemoryStore` [![MemoryStore NuGet](https://img.shields.io/nuget/v/Equinox.MemoryStore.svg)](https://www.nuget.org/packages/Equinox.MemoryStore/): In-memory store for integration testing/performance base-lining/providing out-of-the-box zero dependency storage for examples. ([depends](https://www.fuget.org/packages/Equinox.MemoryStore) on `Equinox.Core`, `FsCodec`)
- `Equinox.CosmosStore` [![CosmosStore NuGet](https://img.shields.io/nuget/v/Equinox.CosmosStore.svg)](https://www.nuget.org/packages/Equinox.CosmosStore/): Azure CosmosDB Adapter with integrated 'unfolds' feature, facilitating optimal read performance in terms of latency and RU costs, instrumented to meet Jet's production monitoring requirements. ([depends](https://www.fuget.org/packages/Equinox.CosmosStore) on `Equinox.Core`, `Microsoft.Azure.Cosmos` >= `3.27`, `FsCodec`, `System.Text.Json`, `FSharp.Control.AsyncSeq` >= `2.0.23`)
- `Equinox.CosmosStore.Prometheus` [![CosmosStore.Prometheus NuGet](https://img.shields.io/nuget/v/Equinox.CosmosStore.Prometheus.svg)](https://www.nuget.org/packages/Equinox.CosmosStore.Prometheus/): Integration package providing a `Serilog.Core.ILogEventSink` that extracts detailed metrics information attached to the `LogEvent`s and feeds them to the `prometheus-net`'s `Prometheus.Metrics` static instance. ([depends](https://www.fuget.org/packages/Equinox.CosmosStore.Prometheus) on `Equinox.CosmosStore`, `prometheus-net >= 3.6.0`)
Expand Down
3 changes: 2 additions & 1 deletion samples/Infrastructure/Infrastructure.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="FSharp.Core" Version="6.0.0" />

<PackageReference Include="Argu" Version="6.1.1" />
<PackageReference Include="Destructurama.FSharp" Version="1.2.0" />
<PackageReference Include="FSharp.Core" Version="4.7.2" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="6.0.0" />
<PackageReference Include="Serilog.Sinks.Console" Version="4.0.1" />
<PackageReference Include="Serilog.Sinks.Seq" Version="5.1.1" />
Expand Down
33 changes: 17 additions & 16 deletions samples/Infrastructure/Services.fs
Original file line number Diff line number Diff line change
@@ -1,49 +1,50 @@
module Samples.Infrastructure.Services

open Domain
open Equinox
open FsCodec.SystemTextJson.Interop // use ToJsonElementCodec because we are doing an overkill example
open Microsoft.Extensions.DependencyInjection
open System

type StreamResolver(storage) =
member _.Resolve
( codec : FsCodec.IEventCodec<'event, ReadOnlyMemory<byte>, _>,
type Store(store) =
member _.Category
( codec : FsCodec.IEventCodec<'event, ReadOnlyMemory<byte>, unit>,
fold: 'state -> 'event seq -> 'state,
initial : 'state,
snapshot : ('event -> bool) * ('state -> 'event)) =
match storage with
snapshot : ('event -> bool) * ('state -> 'event)) : Category<'event, 'state, unit> =
match store with
| Storage.StorageConfig.Memory store ->
Equinox.MemoryStore.MemoryStoreCategory(store, codec, fold, initial).Resolve
Equinox.MemoryStore.MemoryStoreCategory(store, codec, fold, initial)
| Storage.StorageConfig.Cosmos (store, caching, unfolds) ->
let accessStrategy = if unfolds then Equinox.CosmosStore.AccessStrategy.Snapshot snapshot else Equinox.CosmosStore.AccessStrategy.Unoptimized
Equinox.CosmosStore.CosmosStoreCategory<'event,'state,_>(store, codec.ToJsonElementCodec(), fold, initial, caching, accessStrategy).Resolve
Equinox.CosmosStore.CosmosStoreCategory<'event,'state,_>(store, codec.ToJsonElementCodec(), fold, initial, caching, accessStrategy)
| Storage.StorageConfig.Dynamo (store, caching, unfolds) ->
let accessStrategy = if unfolds then Equinox.DynamoStore.AccessStrategy.Snapshot snapshot else Equinox.DynamoStore.AccessStrategy.Unoptimized
Equinox.DynamoStore.DynamoStoreCategory<'event,'state,_>(store, FsCodec.Deflate.EncodeTryDeflate codec, fold, initial, caching, accessStrategy).Resolve
Equinox.DynamoStore.DynamoStoreCategory<'event,'state,_>(store, FsCodec.Deflate.EncodeTryDeflate codec, fold, initial, caching, accessStrategy)
| Storage.StorageConfig.Es (context, caching, unfolds) ->
let accessStrategy = if unfolds then Equinox.EventStoreDb.AccessStrategy.RollingSnapshots snapshot |> Some else None
Equinox.EventStoreDb.EventStoreCategory<'event,'state,_>(context, codec, fold, initial, ?caching = caching, ?access = accessStrategy).Resolve
Equinox.EventStoreDb.EventStoreCategory<'event,'state,_>(context, codec, fold, initial, ?caching = caching, ?access = accessStrategy)
| Storage.StorageConfig.Sql (context, caching, unfolds) ->
let accessStrategy = if unfolds then Equinox.SqlStreamStore.AccessStrategy.RollingSnapshots snapshot |> Some else None
Equinox.SqlStreamStore.SqlStreamStoreCategory<'event,'state,_>(context, codec, fold, initial, ?caching = caching, ?access = accessStrategy).Resolve
Equinox.SqlStreamStore.SqlStreamStoreCategory<'event,'state,_>(context, codec, fold, initial, ?caching = caching, ?access = accessStrategy)

type ServiceBuilder(storageConfig, handlerLog) =
let cat = StreamResolver(storageConfig)
let store = Store storageConfig

member _.CreateFavoritesService() =
let fold, initial = Favorites.Fold.fold, Favorites.Fold.initial
let snapshot = Favorites.Fold.isOrigin,Favorites.Fold.snapshot
Favorites.create handlerLog (cat.Resolve(Favorites.Events.codec,fold,initial,snapshot))
let snapshot = Favorites.Fold.isOrigin, Favorites.Fold.snapshot
Favorites.create <| store.Category(Favorites.Events.codec, fold, initial, snapshot).Resolve handlerLog

member _.CreateSaveForLaterService() =
let fold, initial = SavedForLater.Fold.fold, SavedForLater.Fold.initial
let snapshot = SavedForLater.Fold.isOrigin,SavedForLater.Fold.compact
SavedForLater.create 50 handlerLog (cat.Resolve(SavedForLater.Events.codec,fold,initial,snapshot))
let snapshot = SavedForLater.Fold.isOrigin, SavedForLater.Fold.compact
SavedForLater.create 50 <| store.Category(SavedForLater.Events.codec, fold, initial, snapshot).Resolve handlerLog

member _.CreateTodosService() =
let fold, initial = TodoBackend.Fold.fold, TodoBackend.Fold.initial
let snapshot = TodoBackend.Fold.isOrigin, TodoBackend.Fold.snapshot
TodoBackend.create handlerLog (cat.Resolve(TodoBackend.Events.codec,fold,initial,snapshot))
TodoBackend.create <| store.Category(TodoBackend.Events.codec, fold, initial, snapshot).Resolve handlerLog

let register (services : IServiceCollection, storageConfig, handlerLog) =
let regF (factory : IServiceProvider -> 'T) = services.AddSingleton<'T>(fun (sp: IServiceProvider) -> factory sp) |> ignore
Expand Down
9 changes: 3 additions & 6 deletions samples/Store/Domain/Cart.fs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module Domain.Cart

let streamName (id: CartId) = FsCodec.StreamName.create "Cart" (CartId.toString id)
let streamName (id: CartId) = struct ("Cart", CartId.toString id)

// NOTE - these types and the union case names reflect the actual storage formats and hence need to be versioned with care
[<RequireQualifiedAccess>]
Expand Down Expand Up @@ -164,8 +164,5 @@ type Service internal (resolve : CartId -> Equinox.Decider<Events.Event, Fold.St
let decider = resolve cartId
decider.Query(id, Equinox.LoadOption.AllowStale)

let create log resolveStream =
let resolve id =
let stream = resolveStream (streamName id)
Equinox.Decider(log, stream, maxAttempts = 3)
Service(resolve)
let create resolve =
Service(streamName >> resolve)
7 changes: 3 additions & 4 deletions samples/Store/Domain/ContactPreferences.fs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module Domain.ContactPreferences

type Id = Id of email: string
let streamName (Id email) = FsCodec.StreamName.create "ContactPreferences" email // TODO hash >> base64
let streamName (Id email) = struct ("ContactPreferences", email) // TODO hash >> base64

// NOTE - these types and the union case names reflect the actual storage formats and hence need to be versioned with care
module Events =
Expand Down Expand Up @@ -56,6 +56,5 @@ type Service internal (resolve : Id -> Equinox.Decider<Events.Event, Fold.State>
let decider = resolve email
decider.Query(id, Equinox.AllowStale)

let create log resolveStream =
let resolve id = Equinox.Decider(log, resolveStream (streamName id), maxAttempts = 3)
Service(resolve)
let create resolve =
Service(streamName >> resolve)
Loading

0 comments on commit 34b62ea

Please sign in to comment.