diff --git a/src/genegraph/framework/app.clj b/src/genegraph/framework/app.clj index 3c70750..1f5c4d8 100644 --- a/src/genegraph/framework/app.clj +++ b/src/genegraph/framework/app.clj @@ -175,238 +175,3 @@ (log/info :source (:source e) :type (:type e) :state (:state e))) - -;;;;; -;; Stuff for testing -;;;;; - - - -(defn consumer-group-msg-interceptor [event] - (Thread/startVirtualThread - #(let [r @(::event/completion-promise event)] - (log/info :msg "effect complete" :result r :offset (::event/offset event)))) - (event/store event - :test-jena - "http://example.com/test-graph" - (rdf/statements->model - [[:sepio/GeneValidityEvidenceLevelAssertion - :rdf/type - :sepio/Assertion]]))) - -#_(p/publish (get-in a2 [:topics :publish-to-test]) - {:payload - {::event/key "k18" - ::event/value "v19" - ::event/topic :test-topic} - #_#_#_#_::event/skip-local-effects true - ::event/skip-publish-effects true}) - -#_(-> event - (event/publish {::event/key (str "new-" (::event/key event)) - ::event/data {:hgvs "NC_00000001:50000A>C"} - ::event/topic :test-endpoint})) - - -(defn test-interceptor-fn [event] - (consumer-group-msg-interceptor event)) - -(defn test-publisher-fn [event] - (event/publish event (:payload event))) - -(defn test-endpoint-fn [event] - (assoc event - :response - {:status 200 - :body "Hello, flower"})) - -(def app-def-2 - {:type :genegraph-app - :kafka-clusters {:local - {:common-config {"bootstrap.servers" "localhost:9092"} - :producer-config {"key.serializer" - "org.apache.kafka.common.serialization.StringSerializer", - "value.serializer" - "org.apache.kafka.common.serialization.StringSerializer"} - :consumer-config {"key.deserializer" - "org.apache.kafka.common.serialization.StringDeserializer" - "value.deserializer" - "org.apache.kafka.common.serialization.StringDeserializer"}}} - :topics {:test-topic - {:name :test-topic - :type :kafka-consumer-group-topic - :kafka-consumer-group "testcg9" - :kafka-cluster :local - :kafka-topic "test"} - :test-reader - {:name :test-reader - :type :kafka-reader-topic - :kafka-cluster :local - :kafka-topic "test"} - :test-endpoint - {:name :test-endpoint - :type :kafka-producer-topic - :serialization :json - :kafka-topic "test-out" - :kafka-cluster :local} - :test-input - {:name :test-input - :type :kafka-producer-topic - :serialization :json - :kafka-topic "test" - :kafka-cluster :local} - :publish-to-test - {:name :publish-to-test - :type :simple-queue-topic}} - :storage {:test-rocksdb - {:type :rocksdb - :name :test-rocksdb - :path "/users/tristan/desktop/test-rocks"} - :test-jena - {:type :rdf - :name :test-jena - :path "/users/tristan/desktop/test-jena"}} - :processors {:test-processor - {:subscribe :test-topic - :name :test-processor - :type :parallel-processor - :kafka-cluster :local - :interceptors `[test-interceptor-fn]} - :test-publisher - {:name :test-publisher - :subscribe :publish-to-test - :kafka-cluster :local - :type :processor - :interceptors `[test-publisher-fn]} - :test-reader-processor - {:name :test-reader-processor - :subscribe :test-reader - :kafka-cluster :local - :type :processor - :backing-store :test-jena - :interceptors `[test-publisher-fn]} - :test-endpoint - {:name :test-endpoint - :type :processor - :interceptors `[test-endpoint-fn]}} - :http-servers {:test-server - {:type :http-server - :name :test-server - :endpoints [{:path "/hello" - :processor :test-endpoint}] - ::http/type :jetty - ::http/port 8888 - ::http/join? false}}}) - - - - -(defn print-event [event] - (clojure.pprint/pprint event) - event) - -(def app-def-3 - {:type :genegraph-app - :topics {:test-topic - {:name :test-topic - :type :simple-queue-topic}} - :processors {:test-processor - {:subscribe :test-topic - :name :test-processor - :type :processor - :interceptors `[print-event] - :init-fn (fn [this] - (assoc this ::event/metadata {::local-conf (:storage this)}))}}}) - - -(def gcs-handle - {:type :gcs - :bucket "genegraph-framework-dev"}) - -(def fs-handle - {:type :file - :base "/users/tristan/data/genegraph-neo/"}) - -(comment - - - (def a3 (p/init app-def-3)) - - (p/start a3) - (p/stop a3) - - (p/publish (get-in a3 [:topics :test-topic]) - {::event/key "akey" - ::event/value "avalue"}) - - ) - -(comment - - (def a2 (p/init app-def-2)) - (p/start a2) - (p/stop a2) - - (-> a2 :topics :test-topic :initial-consumer-group-offset) - (-> a2 :topics :test-topic :end-offset-at-start) - (-> a2 :topics :test-topic :state deref) - (-> a2 :topics :test-topic kafka/topic-up-to-date?) - - (-> a2 :processors :default-system-processor) - - (p/publish (get-in a2 [:topics :publish-to-test]) - {:payload - {::event/key "k18" - ::event/value "v19" - ::event/topic :test-topic} - #_#_#_#_::event/skip-local-effects true - ::event/skip-publish-effects true}) - - - - (p/publish (get-in a2 [:processors :test-processor :system-topic]) - {:key :k :type :system-event}) - - (p/publish-system-update (get-in a2 [:topics :test-topic]) - {:key :k :type :system-event}) - - (def lp1 (promise)) - - lp1 - - (p/publish-system-update (get-in a2 [:processors :test-processor]) - {:type :register-listener - :predicate #(= :system-event (:type %)) - :promise lp1}) - - (-> a2 :processors :test-processor :system-topic) - (-> a2 :topics :system) - - (time - (s/store-snapshot (get-in a2 [:storage :test-jena]) - gcs-handle)) - (time - (s/restore-snapshot (get-in a2 [:storage :test-jena]) - gcs-handle)) - - (s/store-offset @(get-in a2 [:storage :test-rocksdb :instance]) :test-topic 1) - (s/retrieve-offset @(get-in a2 [:storage :test-rocksdb :instance]) :test-topic) - (processor/starting-offset (get-in a2 [:processors :test-processor])) - (get-in a2 [:processors :test-processor :storage :test-rocksdb :instance]) - - (def lp2 (promise)) - - (-> {::listeners (atom {:promise lp2 - :predicate #(= :system-event (:type %))}) - :type :system-event - :key :boo} - ::listeners - deref - vals) - - #_(run! #(deliver (:promise %) event) - (filter #((:predicate %) event) - (:vals @(::listeners event)))) - ) - -