Skip to content

Commit

Permalink
Archive store; app event updates
Browse files Browse the repository at this point in the history
  • Loading branch information
tnavatar committed Jan 10, 2024
1 parent 1c2117a commit 33775f4
Show file tree
Hide file tree
Showing 7 changed files with 256 additions and 7 deletions.
4 changes: 4 additions & 0 deletions deps.edn
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@
;; Kafka
org.apache.kafka/kafka-clients {:mvn/version "3.6.1"}

;; Compression for storage artifacts
org.apache.commons/commons-compress {:mvn/version "1.24.0"}
org.lz4/lz4-java {:mvn/version "1.8.0"}

;; Datastores

;; Rocks
Expand Down
131 changes: 131 additions & 0 deletions example/app_lifecycle_example.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
(ns genegraph.app-lifecycle-example
(:require [genegraph.framework.app :as app]
[genegraph.framework.protocol :as p]
[genegraph.framework.event :as event]
[genegraph.framework.storage :as storage]
[genegraph.framework.kafka.admin :as kafka-admin]
[io.pedestal.log :as log]
[io.pedestal.interceptor :as interceptor]
[clojure.data.json :as json]))

(def publish-interceptor
(interceptor/interceptor
{:name ::publish-interceptor
:enter (fn [e]
(log/info :fn :publish-interceptor)
(event/publish e (:payload e)))}))

(def cg-interceptor
(interceptor/interceptor
{:name ::cg-interceptor
:enter (fn [e]
(log/info :fn :cg-interceptor
:payload (::event/data e)
:key (::event/key e)
:stored-value (storage/read (get-in e [::storage/storage :test-rocksdb])
(::event/key e)))
e)}))

(def base-interceptor
(interceptor/interceptor
{:name ::cg-interceptor
:enter (fn [e]
(log/info :fn :base-interceptor
:payload (::event/data e)
:key (::event/key e))
(event/store e :test-rocksdb (::event/key e) (::event/data e)))}))


(def ccloud-example-app-def
{:type :genegraph-app
:kafka-clusters {:ccloud
{:type :kafka-cluster
:kafka-user "User:2189780"
:common-config {"ssl.endpoint.identification.algorithm" "https"
"sasl.mechanism" "PLAIN"
"request.timeout.ms" "20000"
"bootstrap.servers" "pkc-4yyd6.us-east1.gcp.confluent.cloud:9092"
"retry.backoff.ms" "500"
"security.protocol" "SASL_SSL"
"sasl.jaas.config" (System/getenv "DX_JAAS_CONFIG_DEV")}
:consumer-config {"key.deserializer"
"org.apache.kafka.common.serialization.StringDeserializer"
"value.deserializer"
"org.apache.kafka.common.serialization.StringDeserializer"}
:producer-config {"key.serializer"
"org.apache.kafka.common.serialization.StringSerializer"
"value.serializer"
"org.apache.kafka.common.serialization.StringSerializer"}}}
:storage {:test-rocksdb
{:type :rocksdb
:name :test-rocksdb
:path "/users/tristan/data/genegraph-neo/test-rocks"}}
:topics {:test-topic
{:name :test-topic
:type :kafka-consumer-group-topic
:kafka-consumer-group "testcg9"
:kafka-cluster :ccloud
:serialization :json
:kafka-topic "genegraph-test"}
:test-base
{:name :test-base
:type :kafka-reader-topic
:kafka-cluster :ccloud
:serialization :json
:kafka-topic "genegraph-test-base"}
:publish-to-test
{:name :publish-to-test
:type :simple-queue-topic}}
:processors {:test-processor
{:subscribe :test-topic
:name :genegraph-test-processor
:type :processor
:kafka-cluster :ccloud
:interceptors [cg-interceptor]}
:test-publisher
{:name :genegraph-test-publisher
:subscribe :publish-to-test
:kafka-cluster :ccloud
:type :processor
:interceptors [publish-interceptor]}
:test-base-processor
{:name :genegraph-test-base-processor
:subscribe :test-base
:kafka-cluster :ccloud
:type :processor
:backing-store :test-rocksdb
:interceptors [base-interceptor]}}})

(comment
(def ccloud-example-app (p/init ccloud-example-app-def))
(kafka-admin/admin-actions-by-cluster ccloud-example-app)
(kafka-admin/configure-kafka-for-app! ccloud-example-app)
(p/start ccloud-example-app)
(p/stop ccloud-example-app)

(p/publish (get-in ccloud-example-app [:topics :publish-to-test])
{::event/key "k2"
::event/data {:b :b}
:payload {::event/topic :test-base
::event/key "k4"
::event/data {:d :d}}})

(p/publish (get-in ccloud-example-app [:topics :publish-to-test])
{::event/key "k2"
::event/data {:b :b}
:payload {::event/topic :test-topic
::event/key "k4"
::event/data {:c :c}}})

(-> ccloud-example-app
:storage
:test-rocksdb
:instance
deref
(storage/read "k3"))

(-> ccloud-example-app
:topics
:test-topic
genegraph.framework.kafka/topic-up-to-date?)
)
5 changes: 5 additions & 0 deletions src/genegraph/framework/app.clj
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,11 @@
(log/info :source (:source e)
:type (:type e)))

(defmethod p/log-event :component-state-update [e]
(log/info :source (:source e)
:type (:type e)
:state (:state e)))

;;;;;
;; Stuff for testing
;;;;;
Expand Down
13 changes: 11 additions & 2 deletions src/genegraph/framework/kafka.clj
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@
local-offset @(:initial-local-offset topic)
cg-offset @(:initial-consumer-group-offset topic)
last-offset @(:end-offset-at-start topic)]
(log/info :fn :topic-up-to-date?
:topic (:name topic)
:last-completed-offset last-completed-offset
:local-offset local-offset
:cg-offset cg-offset
:last-offset last-offset)
(or (and last-completed-offset (<= last-offset last-completed-offset))
(and (if local-offset (<= last-offset local-offset) true)
(if cg-offset (<= last-offset cg-offset) true)))))
Expand All @@ -100,6 +106,7 @@
(defn handle-event-status-updates [topic event]
(let [status @(::event/completion-promise event)]
(swap! (:state topic) assoc :last-completed-offset (::event/offset event))
(deliver-up-to-date-event-if-needed topic)
(log/info :fn ::handle-event-status-updates
:offset (::event/offset event)
:status status)))
Expand All @@ -121,8 +128,10 @@
;; Per Kafka docs, end offset reported by the consumer is
;; always the next offset *past* the last record in the
;; topic partition, we want the offset of the last record,
;; so decrement by one.
(-> (.endOffsets consumer (.assignment consumer)) first val dec))
;; so decrement by two.
;; Unclear why Kafka offsets now increment by twos rather than ones,
;; but they do...
(-> (.endOffsets consumer (.assignment consumer)) first val (- 2)))

(defn poll-kafka-consumer
"default method to poll a Kafka consumer for new records.
Expand Down
4 changes: 2 additions & 2 deletions src/genegraph/framework/processor.clj
Original file line number Diff line number Diff line change
Expand Up @@ -164,12 +164,12 @@
nil))


(defn init-kafka-producer! [{:keys [producer kafka-cluster name]}]
(defn init-kafka-producer! [{:keys [producer kafka-cluster] :as p}]
(if kafka-cluster
(deliver producer
(kafka/create-producer
kafka-cluster
{"transactional.id" name
{"transactional.id" (name (:name p))
"max.request.size" (int (* 1024 1024 10))}))
(deliver producer nil)))

Expand Down
71 changes: 70 additions & 1 deletion src/genegraph/framework/storage.clj
Original file line number Diff line number Diff line change
@@ -1,6 +1,19 @@
(ns genegraph.framework.storage
(:require [clojure.java.io :as io])
(:import [java.io ByteArrayInputStream])
(:import [java.io
ByteArrayInputStream BufferedOutputStream BufferedInputStream]
[java.nio.file
Path Files StandardCopyOption LinkOption]
[java.nio.file.attribute FileAttribute]
[org.apache.commons.compress.archivers.tar
TarArchiveEntry TarArchiveOutputStream TarArchiveInputStream]
[org.apache.commons.compress.archivers ArchiveEntry]
[org.apache.commons.compress.compressors.lz4
BlockLZ4CompressorOutputStream BlockLZ4CompressorInputStream]
[org.apache.commons.compress.compressors.gzip
GzipCompressorOutputStream GzipCompressorInputStream]
[net.jpountz.lz4
LZ4FrameInputStream LZ4FrameOutputStream LZ4Factory])
(:refer-clojure :exclude [read]))

(defprotocol HasInstance
Expand Down Expand Up @@ -43,6 +56,62 @@
(string? source) (ByteArrayInputStream. (.getBytes source))
:else (io/input-stream source)))

;; Loosely moddeled from https://mkyong.com/java/how-to-create-tar-gz-in-java/
;; using lz4 instead of gzx

(defn store-archive [local-path storage-handle]
(let [local-path-obj (Path/of local-path (make-array String 0))]
(with-open [os (-> storage-handle
as-handle
io/output-stream
BufferedOutputStream.
LZ4FrameOutputStream.
TarArchiveOutputStream.)]
(run! (fn [f]
(.putArchiveEntry
os
(TarArchiveEntry.
f
(str
(.relativize local-path-obj
(Path/of (.getPath f)
(make-array String 0))))))
(io/copy f os)
(.closeArchiveEntry os))
(filter #(.isFile %) (file-seq (io/file local-path)))))))

(comment
(store-archive "/Users/tristan/data/genegraph-neo/test-rocks-snapshot"
{:type :gcs
:bucket "genegraph-framework-dev"
:path "test-tarball.tar.gz"})
)

(defn restore-archive [target-path storage-handle]
(let [base-path (Path/of target-path (make-array String 0))]
(with-open [is (-> storage-handle
as-handle
io/input-stream
BufferedInputStream.
LZ4FrameInputStream.
TarArchiveInputStream.)]
(loop [entry (.getNextEntry is)]
(when entry
(let [file-path (.resolve base-path (.getName entry))
parent-path (.getParent file-path)]
(when (Files/notExists parent-path (make-array LinkOption 0))
(Files/createDirectories parent-path (make-array FileAttribute 0)))
(io/copy is (.toFile file-path)))
(recur (.getNextEntry is)))))))

(comment
(restore-archive
"/Users/tristan/data/genegraph-neo/test-rocks-restore"
{:type :file
:base "/users/tristan/data/genegraph-neo"
:path "test-tarball.tar.lz4"})
)

(comment
(as-handle {:type :file
:base "/users/tristan/data"
Expand Down
35 changes: 33 additions & 2 deletions src/genegraph/framework/storage/rocksdb.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
[taoensso.nippy :as nippy]
[clojure.java.io :as io]
[digest])
(:import (org.rocksdb Options ReadOptions RocksDB Slice CompressionType)
(:import (org.rocksdb Options ReadOptions RocksDB Slice CompressionType Checkpoint)
java.util.Arrays
java.nio.ByteBuffer
java.io.ByteArrayOutputStream))
Expand All @@ -17,6 +17,28 @@
(.setCreateIfMissing true)
(.setCompressionType CompressionType/LZ4_COMPRESSION)) path))

(defn create-checkpoint [rocksdb path]
(doto (Checkpoint/create rocksdb)
(.createCheckpoint path)
.close))

(comment
(def rtest
(p/init {:name :rocks-test
:type :rocksdb
:path "/Users/tristan/data/genegraph-neo/test-rocks"}))

(p/start rtest)
@(:instance rtest)
(create-checkpoint @(:instance rtest)
"/Users/tristan/data/genegraph-neo/test-rocks-snapshot")

(def c (Checkpoint/create @(:instance rtest)))
(.createCheckpoint c "/Users/tristan/data/genegraph-neo/test-rocks-snapshot-2")
(.close c)

)

(defrecord RocksDBInstance [name
type
path
Expand All @@ -34,7 +56,16 @@
this)
(stop [this]
(.close @instance)
this))
this)

storage/Snapshot
(store-snapshot [this storage-handle]

)
(restore-snapshot [this storage-handle])
)

(System/getProperty "java.io.tmpdir")

(defmethod p/init :rocksdb [db-def]
(map->RocksDBInstance
Expand Down

0 comments on commit 33775f4

Please sign in to comment.