Skip to content

Commit

Permalink
Forgoing live storage snapshots for now; opting for simpler approach
Browse files Browse the repository at this point in the history
  • Loading branch information
tnavatar committed Jan 11, 2024
1 parent 33775f4 commit d583b1b
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 39 deletions.
9 changes: 8 additions & 1 deletion example/app_lifecycle_example.clj
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,11 @@
:storage {:test-rocksdb
{:type :rocksdb
:name :test-rocksdb
:path "/users/tristan/data/genegraph-neo/test-rocks"}}
:path "/Users/tristan/data/genegraph-neo/test-rocks"
:snapshot-handle {:type :file
:base "/Users/tristan/data/genegraph-neo"
:path "test-rocks-snapshot.tar.lz4"}
:load-snapshot true}}
:topics {:test-topic
{:name :test-topic
:type :kafka-consumer-group-topic
Expand Down Expand Up @@ -103,6 +107,9 @@
(p/start ccloud-example-app)
(p/stop ccloud-example-app)

(storage/store-snapshot (get-in ccloud-example-app-def [:storage :test-rocksdb]))
(storage/restore-snapshot (get-in ccloud-example-app-def [:storage :test-rocksdb]))

(p/publish (get-in ccloud-example-app [:topics :publish-to-test])
{::event/key "k2"
::event/data {:b :b}
Expand Down
17 changes: 13 additions & 4 deletions src/genegraph/framework/storage.clj
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
[org.apache.commons.compress.compressors.gzip
GzipCompressorOutputStream GzipCompressorInputStream]
[net.jpountz.lz4
LZ4FrameInputStream LZ4FrameOutputStream LZ4Factory])
LZ4FrameInputStream LZ4FrameOutputStream LZ4Factory]
[java.time Instant])
(:refer-clojure :exclude [read]))

(defprotocol HasInstance
Expand Down Expand Up @@ -42,9 +43,9 @@
(store-offset [this topic offset] [this topic offset commit-promise])
(retrieve-offset [this topic]))

(defprotocol Snapshot
(store-snapshot [this storage-handle])
(restore-snapshot [this storage-handle]))
#_(defprotocol Snapshot
(store-snapshot [this])
(restore-snapshot [this]))

(defmulti as-handle :type)

Expand Down Expand Up @@ -104,6 +105,7 @@
(io/copy is (.toFile file-path)))
(recur (.getNextEntry is)))))))


(comment
(restore-archive
"/Users/tristan/data/genegraph-neo/test-rocks-restore"
Expand All @@ -112,6 +114,13 @@
:path "test-tarball.tar.lz4"})
)

(defn store-snapshot [this]
(store-archive (:path this) (:snapshot-handle this)))

(defn restore-snapshot [this]
(when-not (-> this :path io/as-file .exists)
(restore-archive (:path this) (:snapshot-handle this))))

(comment
(as-handle {:type :file
:base "/users/tristan/data"
Expand Down
8 changes: 7 additions & 1 deletion src/genegraph/framework/storage/rdf.clj
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@
s/HasInstance
(instance [_] @instance)

s/Snapshot

;; This would be a little slow for production use.
;; should adapt later on for creating a snapshot on a live system

#_#_#_s/Snapshot
(store-snapshot [this storage-handle]
(with-open [os (-> storage-handle
(assoc :path (clojure.core/name name))
Expand All @@ -69,6 +73,8 @@

p/Lifecycle
(start [this]
(when (:load-snapshot this)
(s/restore-snapshot this))
(reset! instance (i/start-dataset (dissoc this :instance))))
(stop [this]
(.close @instance)
Expand Down
36 changes: 3 additions & 33 deletions src/genegraph/framework/storage/rocksdb.clj
Original file line number Diff line number Diff line change
Expand Up @@ -9,36 +9,12 @@
java.nio.ByteBuffer
java.io.ByteArrayOutputStream))



(defn open [path]
(io/make-parents path)
(RocksDB/open (doto (Options.)
(.setCreateIfMissing true)
(.setCompressionType CompressionType/LZ4_COMPRESSION)) path))

(defn create-checkpoint [rocksdb path]
(doto (Checkpoint/create rocksdb)
(.createCheckpoint path)
.close))

(comment
(def rtest
(p/init {:name :rocks-test
:type :rocksdb
:path "/Users/tristan/data/genegraph-neo/test-rocks"}))

(p/start rtest)
@(:instance rtest)
(create-checkpoint @(:instance rtest)
"/Users/tristan/data/genegraph-neo/test-rocks-snapshot")

(def c (Checkpoint/create @(:instance rtest)))
(.createCheckpoint c "/Users/tristan/data/genegraph-neo/test-rocks-snapshot-2")
(.close c)

)

(defrecord RocksDBInstance [name
type
path
Expand All @@ -51,22 +27,16 @@
p/Lifecycle
(start [this]
(io/make-parents path)
(reset! instance
(open path))
(when (:load-snapshot this)
(storage/restore-snapshot this))
(reset! instance (open path))
this)
(stop [this]
(.close @instance)
this)

storage/Snapshot
(store-snapshot [this storage-handle]

)
(restore-snapshot [this storage-handle])
)

(System/getProperty "java.io.tmpdir")

(defmethod p/init :rocksdb [db-def]
(map->RocksDBInstance
(assoc db-def
Expand Down

0 comments on commit d583b1b

Please sign in to comment.