diff --git a/deps.edn b/deps.edn index 844418b..b2ef645 100644 --- a/deps.edn +++ b/deps.edn @@ -2,7 +2,8 @@ :deps {} :aliases {:test {:extra-paths ["test"] - :extra-deps {com.xtdb/xtdb-core {:mvn/version "1.23.2"} + :extra-deps {clojure.java-time/clojure.java-time {:mvn/version "1.3.0"} + com.xtdb/xtdb-core {:mvn/version "1.23.2"} lambdaisland/kaocha {:mvn/version "1.70.1086"} org.slf4j/slf4j-nop {:mvn/version "1.7.36"}} :main-opts ["-m" "kaocha.runner"]} diff --git a/src/com/cohesic/xtdb.clj b/src/com/cohesic/xtdb.clj index 7209be4..d94a455 100644 --- a/src/com/cohesic/xtdb.clj +++ b/src/com/cohesic/xtdb.clj @@ -153,13 +153,60 @@ This version of the function is memoized." iterator-seq (reverse-history-before-time entity-id as-of-time opts)))) +(defn submit-matching-entities + "Submit a transaction containing the entities in input atomically. + + Atomicity is implemented this way: if the new entity coincides with + the one on disk, no transaction is sent over and `nil` is returned. + + This function also returns `nil` if entities is empty. + + Finally, it uses `xt/await-tx` for making sure we block until a + transaction is indexed. + + Otherwise this function returns the transaction returned by + `xt/submit-tx`. + + Shout out @emccue: + https://clojurians.slack.com/archives/CG3AM2F7V/p1638606871304400?thread_ts=1638590188.303400&cid=CG3AM2F7V + + Opts are: + {:start-valid-time java.util.Date + :end-valid-time java.util.Date} + + Note that time options are NOT applied to each entity, but to the + transaction as whole." + [xtdb-node entities & {:keys [start-valid-time] :as opts}] + (when (seq entities) + (let [entity-ids (mapv :xt/id entities) + results (-> (xt/db xtdb-node start-valid-time) + (xt/q '{:find [(pull ?e [*])] :where [[?e :xt/id ?id]] :in [[?id ...]]} + entity-ids)) + current-entity-by-id (->> results + (map first) + (group-by :xt/id)) + opts (assoc opts :atomic? true) + tx-ops (into [] + (mapcat #(idempotent-put-ops (-> current-entity-by-id + (get (:xt/id %)) + first) + % + opts)) + entities)] + (clojure.pprint/pprint tx-ops) + (when (seq tx-ops) + (let [tx (xt/submit-tx xtdb-node tx-ops)] + (xt/await-tx xtdb-node tx) + tx))))) + (defn submit-matching-entity "Submit a transaction with the new entity atomically. - If the new entity coincides with the one on disk, no transaction is - sent over and `nil` is return. + Atomicity is implemented this way: if the new entity coincides with + the one on disk, no transaction is sent over and `nil` is returned. - Otherwise what xt/submit-tx return is returned. + Otherwise this function returns the transaction returned by + `xt/submit-tx`. Shout out @emccue: https://clojurians.slack.com/archives/CG3AM2F7V/p1638606871304400?thread_ts=1638590188.303400&cid=CG3AM2F7V @@ -167,15 +214,5 @@ This version of the function is memoized." Opts are: {:start-valid-time java.util.Date :end-valid-time java.util.Date}" - [xtdb-node new-entity & {:keys [start-valid-time] :as opts}] - (let [entity-id (:xt/id new-entity) - current-entity (-> (xt/db xtdb-node start-valid-time) - (xt/q '{:find [(pull ?e [*])] :where [[?e :xt/id ?id]] :in [?id]} - entity-id) - ffirst) - opts (assoc opts :atomic? true) - tx-ops (idempotent-put-ops current-entity new-entity opts)] - (when (seq tx-ops) - (as-> (xt/submit-tx xtdb-node tx-ops) tx - (xt/await-tx xtdb-node tx) - tx)))) + [xtdb-node new-entity & opts] + (submit-matching-entities xtdb-node [new-entity] opts)) diff --git a/test/com/cohesic/kv_store_test.clj b/test/com/cohesic/kv_store_test.clj new file mode 100644 index 0000000..3d5463c --- /dev/null +++ b/test/com/cohesic/kv_store_test.clj @@ -0,0 +1,128 @@ +(ns com.cohesic.kv-store-test + (:require + [clojure.test :as test :refer [deftest is testing use-fixtures]] + [com.cohesic.xtdb :as sut] + [com.cohesic.xtdb.fixtures :as fixtures] + [com.cohesic.xtdb.test :as xtest] + [java-time.api :as jtime] + [xtdb.api :as xt])) + +(use-fixtures :each fixtures/with-kv-store) + +(deftest submit-matching-entity-sanity-check + (let [node xtest/*node* + p1 (xtest/random-person)] + (testing "submitting against empty database" + (xtest/is-committed? node (sut/submit-matching-entity node p1)) + (is (= (:xt/id p1) + (-> (xt/db node) + (xt/q {:find '[e] :where [['e :person/name (:person/name p1)]]}) + ffirst)))))) + +(deftest submit-matching-entities-empty-entities + (let [node xtest/*node* entities []] (is (nil? (sut/submit-matching-entities node entities))))) + +(deftest submit-matching-entities-sanity-check + (let [node xtest/*node* + p1 (xtest/random-person) + p2 (xtest/random-person) + entities [p1 p2] + entity-ids (into #{} (map :xt/id entities))] + (testing "submitting against empty database" + (xtest/is-committed? node (sut/submit-matching-entities node entities)) + (is (= entity-ids + (-> (xt/db node) + (xt/q '{:find [e] :in [[?id ...]] :where [[e :xt/id ?id]]} + entity-ids) + (xtest/result-set))))) + (testing "submitting against populated database" + (testing "changing a document" + (let [p1-last-name "Foo" + p1-bis (assoc p1 :person/last-name p1-last-name) + entities [p1-bis]] + (xtest/is-committed? node (sut/submit-matching-entities node entities)) + (is (= p1-last-name + (-> (xt/db node) + (xt/q {:find '[?last-name] + :where [['e :person/name (:person/name p1)] + '[e :person/last-name ?last-name]]}) + ffirst))))) + (testing "submitting a document unchanged" + (let [entities [p2]] + (is (nil? (sut/submit-matching-entities node entities)) + "the function returns `nil` when there is nothing to do")))))) + +(deftest submit-matching-entities-at-start-valid-time + (let [node xtest/*node* + p1 (xtest/random-person) + p2 (xtest/random-person) + entities [p1 p2] + entity-ids (into #{} (map :xt/id entities))] + (testing "submitting entities at :start-valid-time" + (let [start-valid-time (jtime/instant)] + (xtest/is-committed? node + (sut/submit-matching-entities + node + entities + :start-valid-time + (jtime/java-date start-valid-time))) + (testing "fetching before :start-valid-time" + (let [before-start-time (jtime/minus start-valid-time (jtime/days 1))] + (is (empty? + (-> (xt/db node (jtime/java-date before-start-time)) + (xt/q '{:find [e] :in [[?name ...]] :where [[e :person/name ?name]]} + (mapv :person/name entities)))) + "nothing shows up before :start-valid-time"))) + (testing "fetching after :start-valid-time" + (let [after-start-time (jtime/plus start-valid-time (jtime/days 1))] + (is (= entity-ids + (-> (xt/db node (jtime/java-date after-start-time)) + (xt/q '{:find [e] :in [[?name ...]] :where [[e :person/name ?name]]} + (mapv :person/name entities)) + (xtest/result-set)))))))))) + +(deftest submit-matching-entities-at-start+end-valid-time + (let [node xtest/*node* + p1 (xtest/random-person) + p2 (xtest/random-person) + entities [p1 p2] + entity-ids (into #{} (map :xt/id entities))] + (testing "submitting entities at :start-valid-time *and* :end-valid-time" + (let [start-valid-time (jtime/instant) + end-valid-time (jtime/plus (jtime/instant) (jtime/days 3))] + (println end-valid-time) + (xtest/is-committed? node + (sut/submit-matching-entities + node + entities + :start-valid-time (jtime/java-date start-valid-time) + :end-valid-time (jtime/java-date end-valid-time))) + (testing "fetching before :start-valid-time" + (let [before-start-time (jtime/minus start-valid-time (jtime/days 1))] + (is (empty? + (-> (xt/db node (jtime/java-date before-start-time)) + (xt/q '{:find [e] :in [[?name ...]] :where [[e :person/name ?name]]} + (mapv :person/name entities)))) + "nothing shows up after :end-valid-time"))) + (testing "fetching after :start-valid-time" + (let [after-start-time (jtime/plus start-valid-time (jtime/days 1))] + (is (= entity-ids + (-> (xt/db node (jtime/java-date after-start-time)) + (xt/q '{:find [e] :in [[?name ...]] :where [[e :person/name ?name]]} + (mapv :person/name entities)) + (xtest/result-set))) + "nothing shows up after :end-valid-time"))) + (testing "fetching before :end-valid-time" + (let [before-end-time (jtime/plus (jtime/instant) (jtime/days 2))] + (is (= entity-ids + (-> (xt/db node (jtime/java-date before-end-time)) + (xt/q '{:find [e] :in [[?name ...]] :where [[e :person/name ?name]]} + (mapv :person/name entities)) + (xtest/result-set)))))) + (testing "fetching after :end-valid-time" + (let [after-end-time (jtime/plus end-valid-time (jtime/days 1))] + (is (empty? + (-> (xt/db node (jtime/java-date after-end-time)) + (xt/q '{:find [e] :in [[?name ...]] :where [[e :person/name ?name]]} + (mapv :person/name entities)))) + "nothing shows up after :end-valid-time"))))))) diff --git a/test/com/cohesic/xtdb/fixtures.clj b/test/com/cohesic/xtdb/fixtures.clj new file mode 100644 index 0000000..b15526e --- /dev/null +++ b/test/com/cohesic/xtdb/fixtures.clj @@ -0,0 +1,28 @@ +(ns com.cohesic.xtdb.fixtures + (:require + [clojure.java.io :as io] + [com.cohesic.xtdb.test :as test] + [xtdb.api :as xt] + [xtdb.io :as xio] + [xtdb.mem-kv]) + (:import + java.nio.file.Files + java.nio.file.attribute.FileAttribute)) + +(def ^:dynamic *db-dir-prefix* "cohesic-xtdb-tests") +(def ^:dynamic *kv-opts* {}) + +(defn with-kv-store + [f] + (let [db-dir (.toFile (Files/createTempDirectory *db-dir-prefix* (make-array FileAttribute 0)))] + (try + (letfn [(kv-store [db-dir-suffix] + {:kv-store (merge {:xtdb/module 'xtdb.mem-kv/->kv-store + :sync? true + :db-dir (io/file db-dir db-dir-suffix)} + *kv-opts*)})] + (with-open [node (xt/start-node {:xtdb/tx-log (kv-store "tx-log") + :xtdb/document-store (kv-store "doc-store") + :xtdb/index-store (kv-store "index-store")})] + (binding [test/*node* node] (f)))) + (finally (xio/delete-dir db-dir))))) diff --git a/test/com/cohesic/xtdb/test.clj b/test/com/cohesic/xtdb/test.clj new file mode 100644 index 0000000..3eddffe --- /dev/null +++ b/test/com/cohesic/xtdb/test.clj @@ -0,0 +1,19 @@ +(ns com.cohesic.xtdb.test + (:require + [clojure.test :as test] + [xtdb.api :as xt]) + (:import + xtdb.api.IXtdb)) + +(def ^:dynamic ^IXtdb *node*) + +(defn random-person + [] + {:xt/id (random-uuid) + :person/name (rand-nth ["Giovanni" "Pietro" "Sergio" "Alessio" "Danilo"]) + :person/last-name (rand-nth ["Giovannone" "Pietrone" "Sergione" "Alessione" "Danilone"]) + :person/age (rand-int 100)}) + +(defn is-committed? [node tx] (test/is (xt/tx-committed? node tx))) + +(defn result-set [results] (into #{} (map first) results))