Skip to content

Commit

Permalink
Cleaning up comments in app.clj
Browse files Browse the repository at this point in the history
Mostly this stuff should exist in the 'examples' directory
  • Loading branch information
tnavatar committed Jan 22, 2024
1 parent 459c563 commit be5cc54
Showing 1 changed file with 0 additions and 235 deletions.
235 changes: 0 additions & 235 deletions src/genegraph/framework/app.clj
Original file line number Diff line number Diff line change
Expand Up @@ -175,238 +175,3 @@
(log/info :source (:source e)
:type (:type e)
:state (:state e)))

;;;;;
;; Stuff for testing
;;;;;



(defn consumer-group-msg-interceptor [event]
(Thread/startVirtualThread
#(let [r @(::event/completion-promise event)]
(log/info :msg "effect complete" :result r :offset (::event/offset event))))
(event/store event
:test-jena
"http://example.com/test-graph"
(rdf/statements->model
[[:sepio/GeneValidityEvidenceLevelAssertion
:rdf/type
:sepio/Assertion]])))

#_(p/publish (get-in a2 [:topics :publish-to-test])
{:payload
{::event/key "k18"
::event/value "v19"
::event/topic :test-topic}
#_#_#_#_::event/skip-local-effects true
::event/skip-publish-effects true})

#_(-> event
(event/publish {::event/key (str "new-" (::event/key event))
::event/data {:hgvs "NC_00000001:50000A>C"}
::event/topic :test-endpoint}))


(defn test-interceptor-fn [event]
(consumer-group-msg-interceptor event))

(defn test-publisher-fn [event]
(event/publish event (:payload event)))

(defn test-endpoint-fn [event]
(assoc event
:response
{:status 200
:body "Hello, flower"}))

(def app-def-2
{:type :genegraph-app
:kafka-clusters {:local
{:common-config {"bootstrap.servers" "localhost:9092"}
:producer-config {"key.serializer"
"org.apache.kafka.common.serialization.StringSerializer",
"value.serializer"
"org.apache.kafka.common.serialization.StringSerializer"}
:consumer-config {"key.deserializer"
"org.apache.kafka.common.serialization.StringDeserializer"
"value.deserializer"
"org.apache.kafka.common.serialization.StringDeserializer"}}}
:topics {:test-topic
{:name :test-topic
:type :kafka-consumer-group-topic
:kafka-consumer-group "testcg9"
:kafka-cluster :local
:kafka-topic "test"}
:test-reader
{:name :test-reader
:type :kafka-reader-topic
:kafka-cluster :local
:kafka-topic "test"}
:test-endpoint
{:name :test-endpoint
:type :kafka-producer-topic
:serialization :json
:kafka-topic "test-out"
:kafka-cluster :local}
:test-input
{:name :test-input
:type :kafka-producer-topic
:serialization :json
:kafka-topic "test"
:kafka-cluster :local}
:publish-to-test
{:name :publish-to-test
:type :simple-queue-topic}}
:storage {:test-rocksdb
{:type :rocksdb
:name :test-rocksdb
:path "/users/tristan/desktop/test-rocks"}
:test-jena
{:type :rdf
:name :test-jena
:path "/users/tristan/desktop/test-jena"}}
:processors {:test-processor
{:subscribe :test-topic
:name :test-processor
:type :parallel-processor
:kafka-cluster :local
:interceptors `[test-interceptor-fn]}
:test-publisher
{:name :test-publisher
:subscribe :publish-to-test
:kafka-cluster :local
:type :processor
:interceptors `[test-publisher-fn]}
:test-reader-processor
{:name :test-reader-processor
:subscribe :test-reader
:kafka-cluster :local
:type :processor
:backing-store :test-jena
:interceptors `[test-publisher-fn]}
:test-endpoint
{:name :test-endpoint
:type :processor
:interceptors `[test-endpoint-fn]}}
:http-servers {:test-server
{:type :http-server
:name :test-server
:endpoints [{:path "/hello"
:processor :test-endpoint}]
::http/type :jetty
::http/port 8888
::http/join? false}}})




(defn print-event [event]
(clojure.pprint/pprint event)
event)

(def app-def-3
{:type :genegraph-app
:topics {:test-topic
{:name :test-topic
:type :simple-queue-topic}}
:processors {:test-processor
{:subscribe :test-topic
:name :test-processor
:type :processor
:interceptors `[print-event]
:init-fn (fn [this]
(assoc this ::event/metadata {::local-conf (:storage this)}))}}})


(def gcs-handle
{:type :gcs
:bucket "genegraph-framework-dev"})

(def fs-handle
{:type :file
:base "/users/tristan/data/genegraph-neo/"})

(comment


(def a3 (p/init app-def-3))

(p/start a3)
(p/stop a3)

(p/publish (get-in a3 [:topics :test-topic])
{::event/key "akey"
::event/value "avalue"})

)

(comment

(def a2 (p/init app-def-2))
(p/start a2)
(p/stop a2)

(-> a2 :topics :test-topic :initial-consumer-group-offset)
(-> a2 :topics :test-topic :end-offset-at-start)
(-> a2 :topics :test-topic :state deref)
(-> a2 :topics :test-topic kafka/topic-up-to-date?)

(-> a2 :processors :default-system-processor)

(p/publish (get-in a2 [:topics :publish-to-test])
{:payload
{::event/key "k18"
::event/value "v19"
::event/topic :test-topic}
#_#_#_#_::event/skip-local-effects true
::event/skip-publish-effects true})



(p/publish (get-in a2 [:processors :test-processor :system-topic])
{:key :k :type :system-event})

(p/publish-system-update (get-in a2 [:topics :test-topic])
{:key :k :type :system-event})

(def lp1 (promise))

lp1

(p/publish-system-update (get-in a2 [:processors :test-processor])
{:type :register-listener
:predicate #(= :system-event (:type %))
:promise lp1})

(-> a2 :processors :test-processor :system-topic)
(-> a2 :topics :system)

(time
(s/store-snapshot (get-in a2 [:storage :test-jena])
gcs-handle))
(time
(s/restore-snapshot (get-in a2 [:storage :test-jena])
gcs-handle))

(s/store-offset @(get-in a2 [:storage :test-rocksdb :instance]) :test-topic 1)
(s/retrieve-offset @(get-in a2 [:storage :test-rocksdb :instance]) :test-topic)
(processor/starting-offset (get-in a2 [:processors :test-processor]))
(get-in a2 [:processors :test-processor :storage :test-rocksdb :instance])

(def lp2 (promise))

(-> {::listeners (atom {:promise lp2
:predicate #(= :system-event (:type %))})
:type :system-event
:key :boo}
::listeners
deref
vals)

#_(run! #(deliver (:promise %) event)
(filter #((:predicate %) event)
(:vals @(::listeners event))))
)


0 comments on commit be5cc54

Please sign in to comment.