diff --git a/deps.edn b/deps.edn index 01ad835..8bfdbec 100644 --- a/deps.edn +++ b/deps.edn @@ -37,6 +37,10 @@ ;; Kafka org.apache.kafka/kafka-clients {:mvn/version "3.6.1"} + ;; Compression for storage artifacts + org.apache.commons/commons-compress {:mvn/version "1.24.0"} + org.lz4/lz4-java {:mvn/version "1.8.0"} + ;; Datastores ;; Rocks diff --git a/example/app_lifecycle_example.clj b/example/app_lifecycle_example.clj new file mode 100644 index 0000000..c72f68d --- /dev/null +++ b/example/app_lifecycle_example.clj @@ -0,0 +1,131 @@ +(ns genegraph.app-lifecycle-example + (:require [genegraph.framework.app :as app] + [genegraph.framework.protocol :as p] + [genegraph.framework.event :as event] + [genegraph.framework.storage :as storage] + [genegraph.framework.kafka.admin :as kafka-admin] + [io.pedestal.log :as log] + [io.pedestal.interceptor :as interceptor] + [clojure.data.json :as json])) + +(def publish-interceptor + (interceptor/interceptor + {:name ::publish-interceptor + :enter (fn [e] + (log/info :fn :publish-interceptor) + (event/publish e (:payload e)))})) + +(def cg-interceptor + (interceptor/interceptor + {:name ::cg-interceptor + :enter (fn [e] + (log/info :fn :cg-interceptor + :payload (::event/data e) + :key (::event/key e) + :stored-value (storage/read (get-in e [::storage/storage :test-rocksdb]) + (::event/key e))) + e)})) + +(def base-interceptor + (interceptor/interceptor + {:name ::cg-interceptor + :enter (fn [e] + (log/info :fn :base-interceptor + :payload (::event/data e) + :key (::event/key e)) + (event/store e :test-rocksdb (::event/key e) (::event/data e)))})) + + +(def ccloud-example-app-def + {:type :genegraph-app + :kafka-clusters {:ccloud + {:type :kafka-cluster + :kafka-user "User:2189780" + :common-config {"ssl.endpoint.identification.algorithm" "https" + "sasl.mechanism" "PLAIN" + "request.timeout.ms" "20000" + "bootstrap.servers" "pkc-4yyd6.us-east1.gcp.confluent.cloud:9092" + "retry.backoff.ms" "500" + "security.protocol" "SASL_SSL" + "sasl.jaas.config" (System/getenv "DX_JAAS_CONFIG_DEV")} + :consumer-config {"key.deserializer" + "org.apache.kafka.common.serialization.StringDeserializer" + "value.deserializer" + "org.apache.kafka.common.serialization.StringDeserializer"} + :producer-config {"key.serializer" + "org.apache.kafka.common.serialization.StringSerializer" + "value.serializer" + "org.apache.kafka.common.serialization.StringSerializer"}}} + :storage {:test-rocksdb + {:type :rocksdb + :name :test-rocksdb + :path "/users/tristan/data/genegraph-neo/test-rocks"}} + :topics {:test-topic + {:name :test-topic + :type :kafka-consumer-group-topic + :kafka-consumer-group "testcg9" + :kafka-cluster :ccloud + :serialization :json + :kafka-topic "genegraph-test"} + :test-base + {:name :test-base + :type :kafka-reader-topic + :kafka-cluster :ccloud + :serialization :json + :kafka-topic "genegraph-test-base"} + :publish-to-test + {:name :publish-to-test + :type :simple-queue-topic}} + :processors {:test-processor + {:subscribe :test-topic + :name :genegraph-test-processor + :type :processor + :kafka-cluster :ccloud + :interceptors [cg-interceptor]} + :test-publisher + {:name :genegraph-test-publisher + :subscribe :publish-to-test + :kafka-cluster :ccloud + :type :processor + :interceptors [publish-interceptor]} + :test-base-processor + {:name :genegraph-test-base-processor + :subscribe :test-base + :kafka-cluster :ccloud + :type :processor + :backing-store :test-rocksdb + :interceptors [base-interceptor]}}}) + +(comment + (def ccloud-example-app (p/init ccloud-example-app-def)) + (kafka-admin/admin-actions-by-cluster ccloud-example-app) + (kafka-admin/configure-kafka-for-app! ccloud-example-app) + (p/start ccloud-example-app) + (p/stop ccloud-example-app) + + (p/publish (get-in ccloud-example-app [:topics :publish-to-test]) + {::event/key "k2" + ::event/data {:b :b} + :payload {::event/topic :test-base + ::event/key "k4" + ::event/data {:d :d}}}) + + (p/publish (get-in ccloud-example-app [:topics :publish-to-test]) + {::event/key "k2" + ::event/data {:b :b} + :payload {::event/topic :test-topic + ::event/key "k4" + ::event/data {:c :c}}}) + + (-> ccloud-example-app + :storage + :test-rocksdb + :instance + deref + (storage/read "k3")) + + (-> ccloud-example-app + :topics + :test-topic + genegraph.framework.kafka/topic-up-to-date?) + ) diff --git a/src/genegraph/framework/app.clj b/src/genegraph/framework/app.clj index ca414a8..49eb7ab 100644 --- a/src/genegraph/framework/app.clj +++ b/src/genegraph/framework/app.clj @@ -169,6 +169,11 @@ (log/info :source (:source e) :type (:type e))) +(defmethod p/log-event :component-state-update [e] + (log/info :source (:source e) + :type (:type e) + :state (:state e))) + ;;;;; ;; Stuff for testing ;;;;; diff --git a/src/genegraph/framework/kafka.clj b/src/genegraph/framework/kafka.clj index e8ccd68..bb4afe7 100644 --- a/src/genegraph/framework/kafka.clj +++ b/src/genegraph/framework/kafka.clj @@ -84,6 +84,12 @@ local-offset @(:initial-local-offset topic) cg-offset @(:initial-consumer-group-offset topic) last-offset @(:end-offset-at-start topic)] + (log/info :fn :topic-up-to-date? + :topic (:name topic) + :last-completed-offset last-completed-offset + :local-offset local-offset + :cg-offset cg-offset + :last-offset last-offset) (or (and last-completed-offset (<= last-offset last-completed-offset)) (and (if local-offset (<= last-offset local-offset) true) (if cg-offset (<= last-offset cg-offset) true))))) @@ -100,6 +106,7 @@ (defn handle-event-status-updates [topic event] (let [status @(::event/completion-promise event)] (swap! (:state topic) assoc :last-completed-offset (::event/offset event)) + (deliver-up-to-date-event-if-needed topic) (log/info :fn ::handle-event-status-updates :offset (::event/offset event) :status status))) @@ -121,8 +128,10 @@ ;; Per Kafka docs, end offset reported by the consumer is ;; always the next offset *past* the last record in the ;; topic partition, we want the offset of the last record, - ;; so decrement by one. - (-> (.endOffsets consumer (.assignment consumer)) first val dec)) + ;; so decrement by two. + ;; Unclear why Kafka offsets now increment by twos rather than ones, + ;; but they do... + (-> (.endOffsets consumer (.assignment consumer)) first val (- 2))) (defn poll-kafka-consumer "default method to poll a Kafka consumer for new records. diff --git a/src/genegraph/framework/processor.clj b/src/genegraph/framework/processor.clj index d6b3f86..00adc78 100644 --- a/src/genegraph/framework/processor.clj +++ b/src/genegraph/framework/processor.clj @@ -164,12 +164,12 @@ nil)) -(defn init-kafka-producer! [{:keys [producer kafka-cluster name]}] +(defn init-kafka-producer! [{:keys [producer kafka-cluster] :as p}] (if kafka-cluster (deliver producer (kafka/create-producer kafka-cluster - {"transactional.id" name + {"transactional.id" (name (:name p)) "max.request.size" (int (* 1024 1024 10))})) (deliver producer nil))) diff --git a/src/genegraph/framework/storage.clj b/src/genegraph/framework/storage.clj index 4e1e457..fa2ac9f 100644 --- a/src/genegraph/framework/storage.clj +++ b/src/genegraph/framework/storage.clj @@ -1,6 +1,19 @@ (ns genegraph.framework.storage (:require [clojure.java.io :as io]) - (:import [java.io ByteArrayInputStream]) + (:import [java.io + ByteArrayInputStream BufferedOutputStream BufferedInputStream] + [java.nio.file + Path Files StandardCopyOption LinkOption] + [java.nio.file.attribute FileAttribute] + [org.apache.commons.compress.archivers.tar + TarArchiveEntry TarArchiveOutputStream TarArchiveInputStream] + [org.apache.commons.compress.archivers ArchiveEntry] + [org.apache.commons.compress.compressors.lz4 + BlockLZ4CompressorOutputStream BlockLZ4CompressorInputStream] + [org.apache.commons.compress.compressors.gzip + GzipCompressorOutputStream GzipCompressorInputStream] + [net.jpountz.lz4 + LZ4FrameInputStream LZ4FrameOutputStream LZ4Factory]) (:refer-clojure :exclude [read])) (defprotocol HasInstance @@ -43,6 +56,62 @@ (string? source) (ByteArrayInputStream. (.getBytes source)) :else (io/input-stream source))) +;; Loosely moddeled from https://mkyong.com/java/how-to-create-tar-gz-in-java/ +;; using lz4 instead of gzx + +(defn store-archive [local-path storage-handle] + (let [local-path-obj (Path/of local-path (make-array String 0))] + (with-open [os (-> storage-handle + as-handle + io/output-stream + BufferedOutputStream. + LZ4FrameOutputStream. + TarArchiveOutputStream.)] + (run! (fn [f] + (.putArchiveEntry + os + (TarArchiveEntry. + f + (str + (.relativize local-path-obj + (Path/of (.getPath f) + (make-array String 0)))))) + (io/copy f os) + (.closeArchiveEntry os)) + (filter #(.isFile %) (file-seq (io/file local-path))))))) + +(comment + (store-archive "/Users/tristan/data/genegraph-neo/test-rocks-snapshot" + {:type :gcs + :bucket "genegraph-framework-dev" + :path "test-tarball.tar.gz"}) + ) + +(defn restore-archive [target-path storage-handle] + (let [base-path (Path/of target-path (make-array String 0))] + (with-open [is (-> storage-handle + as-handle + io/input-stream + BufferedInputStream. + LZ4FrameInputStream. + TarArchiveInputStream.)] + (loop [entry (.getNextEntry is)] + (when entry + (let [file-path (.resolve base-path (.getName entry)) + parent-path (.getParent file-path)] + (when (Files/notExists parent-path (make-array LinkOption 0)) + (Files/createDirectories parent-path (make-array FileAttribute 0))) + (io/copy is (.toFile file-path))) + (recur (.getNextEntry is))))))) + +(comment + (restore-archive + "/Users/tristan/data/genegraph-neo/test-rocks-restore" + {:type :file + :base "/users/tristan/data/genegraph-neo" + :path "test-tarball.tar.lz4"}) + ) + (comment (as-handle {:type :file :base "/users/tristan/data" diff --git a/src/genegraph/framework/storage/rocksdb.clj b/src/genegraph/framework/storage/rocksdb.clj index 8749059..035c35c 100644 --- a/src/genegraph/framework/storage/rocksdb.clj +++ b/src/genegraph/framework/storage/rocksdb.clj @@ -4,7 +4,7 @@ [taoensso.nippy :as nippy] [clojure.java.io :as io] [digest]) - (:import (org.rocksdb Options ReadOptions RocksDB Slice CompressionType) + (:import (org.rocksdb Options ReadOptions RocksDB Slice CompressionType Checkpoint) java.util.Arrays java.nio.ByteBuffer java.io.ByteArrayOutputStream)) @@ -17,6 +17,28 @@ (.setCreateIfMissing true) (.setCompressionType CompressionType/LZ4_COMPRESSION)) path)) +(defn create-checkpoint [rocksdb path] + (doto (Checkpoint/create rocksdb) + (.createCheckpoint path) + .close)) + +(comment + (def rtest + (p/init {:name :rocks-test + :type :rocksdb + :path "/Users/tristan/data/genegraph-neo/test-rocks"})) + + (p/start rtest) + @(:instance rtest) + (create-checkpoint @(:instance rtest) + "/Users/tristan/data/genegraph-neo/test-rocks-snapshot") + + (def c (Checkpoint/create @(:instance rtest))) + (.createCheckpoint c "/Users/tristan/data/genegraph-neo/test-rocks-snapshot-2") + (.close c) + + ) + (defrecord RocksDBInstance [name type path @@ -34,7 +56,16 @@ this) (stop [this] (.close @instance) - this)) + this) + + storage/Snapshot + (store-snapshot [this storage-handle] + + ) + (restore-snapshot [this storage-handle]) + ) + +(System/getProperty "java.io.tmpdir") (defmethod p/init :rocksdb [db-def] (map->RocksDBInstance