Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove :client-events and :transcoder, working directly with CompletableFutures. #67

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,32 @@
## This library follows [Semantic Versioning](https://semver.org).
## This CHANGELOG follows [keepachangelog](https://keepachangelog.com/en/1.0.0/).

### VERSION [4.0.0] - 2023-08-08

### Removed

* Remove the `funcool/promesa` dependency.
* This doesn't affect the return values, they remain `java.util.concurrent.CompletableFuture` as before.
* This reduces the libraries dependencies without sacrificing its functionality.
* Remove the `:client-events` from the options map per client and per operation.
The return values are `java.util.concurrent.CompletableFuture`, and the client is feel to compose additional
functionally on top of these futures.
* The previous `on-success` can be achieved with `CompletableFuture.thenApply` or `CompletableFuture.thenCompose`.
* The previous `on-failure` can be achieved with `CompletableFuture.exceptionally`
or `CompletableFuture.exceptionallyCompose`.
* Remove the `:transcoder` from the options map per client and per operation.
* Mapping the response can be achieved with `CompletableFuture.thenApply` or `CompletableFuture.thenCompose`.
* To map the request's payload, supply a different payload.
* Remove the `advanced-async-hooks.md`.

### Changed

* Update the [README.md](README.md) and the [tutorial.md](doc/tutorial.md) to reflect the above changes.
* Bump `clj-kondo`: `2022.04.25` -> `2023.07.13`.
* Move `batch-record->map` from `aerospike-clj.client` to `aerospike-clj.aerospike-record`.
* All async continuation functions are now reified and assigned as a `def` at compile-time.
* This should reduce the number of allocations due to the removal of `(p/then' (fn [...))`.

### VERSION [3.0.0]: https://github.com/AppsFlyer/aerospike-clj/pull/62
#### Changed
* use aerospike client version 6.1.10
Expand Down
106 changes: 64 additions & 42 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,129 +5,150 @@ An opinionated Clojure library wrapping Aerospike Java Client.
[![Clojars Project](https://img.shields.io/clojars/v/com.appsflyer/aerospike-clj.svg)](https://clojars.org/com.appsflyer/aerospike-clj)

[![Build Status](https://img.shields.io/github/workflow/status/AppsFlyer/aerospike-clj/Push%20CI%20-%20master?event=push&branch=master&label=build%20%26%20test)](https://github.com/AppsFlyer/aerospike-clj/actions)

# Docs:

[Generated docs](https://appsflyer.github.io/aerospike-clj/)

## Tutorial

[here.](https://appsflyer.github.io/aerospike-clj/tutorial.html)

## More advanced docs

* [Advanced asynchronous hooks.](https://appsflyer.github.io/aerospike-clj/advanced-async-hooks.html)

# Requirements

- Java 8
- Clojure 1.8
- Aerospike server version >= `4.9.0`
- Clojure version >= `1.11.0`

# Features

- Converts Java client's callback model into Java(8) `CompletableFuture` based API.
- Expose passing functional (asynchronous) transcoders over payloads (both put/get).
- Health-check utility.
- Functions return Clojure records.

# Maturity

- Feature completeness: ~~mostly~~ near complete.
- Stability: production ready. Actively and widely used in production.

# Opinionated
- Non blocking only: Expose only the non-blocking API. Block with `deref` if you like.

- Non-blocking only: Expose only the non-blocking API. Block with `deref` if you like.
- Futures instead of callbacks. Futures (and functional chaining) are more composable and less cluttered.
If synchronous behaviour is still desired, the calling code can still `deref` (`@`) the returned future object.
For a more sophisticated coordination, a variety of control mechanisms can be used by directly using Java's
`CompletableFuture` API or the more Clojure friendly [promesa](https://github.com/funcool/promesa) (which is also used internally),
or via the library using [transcoders](https://appsflyer.github.io/aerospike-clj/index.html) or
[hooks](https://appsflyer.github.io/aerospike-clj/advanced-async-hooks.html).
If synchronous behaviour is still desired, the calling code can still `deref` (`@`) the returned future object.
For a more sophisticated coordination, a variety of control mechanisms can be used by directly using Java's
`CompletableFuture` API.
- Follows the method names of the underlying Java APIs.
- TTLs should be explicit, and developers should think about them. Forces passing a TTL and not use the cluster default
(This can be still achieved by passing the [special values](https://www.aerospike.com/apidocs/java/com/aerospike/client/policy/WritePolicy.html#expiration) -2,-1 or 0).
(This can be still achieved by passing
the [special values](https://www.aerospike.com/apidocs/java/com/aerospike/client/policy/WritePolicy.html#expiration)
-2,-1 or 0).
- Minimal dependencies.
- Single client per Aerospike namespace. Namespaces in Aerospike usually indicate different cluster configurations.
In order to reduce overhead for clusters with more than a single namespace create 2 client instances and share an event
loop between them.
In order to reduce overhead for clusters with more than a single namespace create 2 client instances and share an
event loop between them.

## Usage

```clojure
user=> (require '[aerospike-clj.client :as aero])
nil
user=> (def c (aero/init-simple-aerospike-client
#_=> ["aerospike-001.com", "aerospik-002.com"] "my-ns" {:enable-logging true}))
#_=> ["aerospike-001.com", "aerospik-002.com"] "my-ns" {:enable-logging true}))
```

It is possible to inject additional asynchronous user-defined behaviour. To do that add an implementation of the
`ClientEvents` protocol during client initialization or per operation.
Some useful info is passed in-order to support metering and to read client configuration. `op-start-time` is
`(System/nanoTime)`.
see more [here](https://appsflyer.github.io/aerospike-clj/advanced-async-hooks.html).

```clojure
user=> (import java.util.concurrent.CompletableFuture)
=> java.util.concurrent.CompletableFuture
user=> (import java.util.function.Function)
=> java.util.function.Function

(let [c (aero/init-simple-aerospike-client
["localhost"]
"test"
{:client-events (reify ClientEvents
(on-success [_ op-name op-result index op-start-time]
(println op-name "success!")))
(on-failure [_ op-name op-ex index op-start-time]
(println "oh-no" op-name "failed on index" index)))})]

(get-single c "index" "set-name"))
; for better performance, a `deftype` might be preferred over `reify`, if possible.
"test")]

(-> ^CompletableFuture (get-single c "index" "set-name")
(.thenApply (reify Function
(apply [_ m]
(println "`get-single` success!")
m)))
(.exceptionally (reify Function
(apply [_ ex]
(println "oh-no `get-single` failed on index `index`")
(throw ex))))))
```

### Query/Put

For demo purposes we will use a docker based local DB:

```shell
$ sudo docker run -d --name aerospike -p 3000:3000 -p 3001:3001 -p 3002:3002 -p 3003:3003 aerospike
```

And connect to it:

```clojure
user=> (def c (aero/init-simple-aerospike-client ["localhost"] "test"))
#'user/db
```

```clojure
user=> (require '[promesa.core :as p])
nil
user=> (import java.util.concurrent.CompletableFuture)
=> java.util.concurrent.CompletableFuture
user=> (import java.util.function.Function)
=> java.util.function.Function
user=> (aero/put c "index" "set-name" 42 1000)
#object[java.util.concurrent.CompletableFuture 0x6264b083 "pending"]
user=> (def f (aero/get-single c "index" "set-name"))
#'user/f
user=> (p/chain (aero/get-single c "index" "set-name")
#_=> :ttl
#_=> aero/expiry-unix
#_=> #(java.time.Instant/ofEpochSecond %)
#_=> str
#_=> println)
user=> (-> ^CompletableFuture (aero/get-single c "index" "set-name")
(.thenApply (reify Function
(apply [_ res]
(println (str (java.time.Instant/ofEpochSecond (aero/expiry-unix (:ttl res)))))))))
2020-08-13T09:52:49Z
#object[java.util.concurrent.CompletableFuture 0x654830f5 "pending"]
```

We actually get back a record with the payload, the DB generation and the TTL (in an Aerospike style EPOCH format).

```clojure
user=> @(aero/get-single c "index" "set-name")
#aerospike_clj.client.AerospikeRecord{:payload 42, :gen 1, :ttl 285167713}
```

#### Unix EPOCH TTL
Aerospike returns a TTL on the queried records that is epoch style, but with a different "beginning of time" which is "2010-01-01T00:00:00Z".

Aerospike returns a TTL on the queried records that is epoch style, but with a different "beginning of time" which is "
2010-01-01T00:00:00Z".
Call `expiry-unix` with the returned TTL to get a TTL relative to the UNIX epoch.

## Testing

### Unit tests

Executed via running `lein test`.

### Integration tests

Testing is performed against a local Aerospike docker container.

#### Mocking in application unit tests

For unit tests purposes you can use a mock client that implements the client protocols: `MockClient`.

Usage:

```clojure
(ns com-example.app
(ns com-example.app
(:require [clojure.test :refer [deftest use-fixtures]]
[aerospike-clj.protocols :as pt]
[aerospike-clj.mock-client :as mock])
[aerospike-clj.protocols :as pt]
[aerospike-clj.mock-client :as mock])
(:import [aerospike_clj.client SimpleAerospikeClient]))

(def ^:dynamic ^SimpleAerospikeClient client nil)
Expand All @@ -147,15 +168,16 @@ so you can just invoke all client protocol methods on it.
Note: If the production client is initiated using a state management framework,
you would also need to stop and restart the state on each test run.


## Contributing

PRs are welcome with these rules:

* A PR should increment the project's version in [`project.clj`](project.clj) according
to Semantic Versioning.
to Semantic Versioning.
* A PR should have its above version set to `SNAPSHOT`, e.g. `1.0.2-SNAPSHOT`.
Once it will be merged into `master` this suffix would be trimmed before release.
Once it will be merged into `master` this suffix would be trimmed before release.
* All PRs would be linted and tested. Passing lint and tests is a reuirement for
maintainers to review the PR.
maintainers to review the PR.

## License

Expand Down
55 changes: 0 additions & 55 deletions doc/advanced-async-hooks.md

This file was deleted.

34 changes: 0 additions & 34 deletions doc/tutorial.md
Original file line number Diff line number Diff line change
Expand Up @@ -204,37 +204,3 @@ user=> @(p/then (aero/get-multiple c (map str (range 5)) (repeat "set-name"))
##### Sync Querying
Since the returned future objects can be easily `deref`ed, simply adding a `@`
before queries makes them synchronous.

#### Using Transcoders
The library takes advantage of futures' ability to compose and allows you to configure
a `:transcoder` to conveniently set this logic:
* `get` Transcoders are functions of the **AerospikeRecord instance**, not the
`deferred` value of it.
* `put` Transcoders are functions on the passed **payload**. They are called _before_
the request is even put on the event-loop.

##### On get:
```clojure
user=> (pt/put c "index" "set-name" 42 1000)
#object[java.util.concurrent.CompletableFuture 0x4a9620a9 "pending"]
user=> (defn inc-transcoder [rec] (when rec
#_=> (update rec :payload inc)))
#'user/inc-transcoder
user=> (p/chain (pt/get-single c "index" "set-name" {:transcoder inc-transcoder})
#_=> :payload
#_=> println)
#object[java.util.concurrent.CompletableFuture 0x4a9620af "pending"]
43
```

##### On put:

The transcoder here is a function on the _payload_ itself
```clojure
user=> (pt/put c "17" "set-name" 1 1000 {:transcoder str})
#object[java.util.concurrent.CompletableFuture 0x4d025d9b "pending"]
user=> @(pt/get-single c "17" "set-name" {:transcoder #(:payload %1)})
"1"
```
The transcoder option saves some boilerplate and can be easily used to do more
useful stuff, like de-serializing or (de)compressing data on the client side.
7 changes: 3 additions & 4 deletions project.clj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(defproject com.appsflyer/aerospike-clj "3.0.0-SNAPSHOT"
(defproject com.appsflyer/aerospike-clj "4.0.0-SNAPSHOT"
:description "An Aerospike Clojure client."
:url "https://github.com/AppsFlyer/aerospike-clj"
:license {:name "Eclipse Public License"
Expand All @@ -14,16 +14,15 @@
:password :env/clojars_password
:sign-releases false}]]
:dependencies [[org.clojure/tools.logging "1.2.4"]
[com.aerospike/aerospike-client "6.1.10"]
[funcool/promesa "8.0.450"]]
[com.aerospike/aerospike-client "6.1.10"]]
:profiles {:dev {:plugins [[lein-eftest "0.5.9"]]
:dependencies [[org.clojure/clojure "1.11.1"]
[clj-test-containers "0.7.4"]
[criterium "0.4.6"]
[cheshire "5.11.0"]
[tortue/spy "2.14.0"]
[com.fasterxml.jackson.core/jackson-databind "2.11.2"]
[clj-kondo "2022.04.25"]]
[clj-kondo "2023.07.13"]]
:eftest {:multithread? false
:report eftest.report.junit/report
:report-to-file "target/junit.xml"}
Expand Down
9 changes: 8 additions & 1 deletion src/main/clojure/aerospike_clj/aerospike_record.clj
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
(ns aerospike-clj.aerospike-record
(:require [aerospike-clj.utils :as utils])
(:import (com.aerospike.client Record)
(:import (com.aerospike.client BatchRecord Record)
(java.util Map)))

(defrecord AerospikeRecord [payload ^Integer gen ^Integer ttl])
Expand Down Expand Up @@ -28,3 +28,10 @@
payload
^Integer (.generation ^Record record)
^Integer (.expiration ^Record record)))))

(defn batch-record->map [^BatchRecord batch-record]
(let [k (.key batch-record)]
(-> (record->map (.record batch-record))
(assoc :index (.toString (.userKey k)))
(assoc :set (.setName k))
(assoc :result-code (.resultCode batch-record)))))
Loading