Skip to content

Commit

Permalink
rdf/tx bug fix, allow init-fn for http endpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
tnavatar committed Aug 15, 2024
1 parent f6d394d commit b658903
Show file tree
Hide file tree
Showing 9 changed files with 255 additions and 17 deletions.
39 changes: 35 additions & 4 deletions example/app_lifecycle_example.clj
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,29 @@
[genegraph.framework.kafka.admin :as kafka-admin]
[io.pedestal.log :as log]
[io.pedestal.interceptor :as interceptor]
[clojure.data.json :as json]))
[clojure.data.json :as json]
[portal.api :as portal]))

;; Portal
(comment
(def p (portal/open))
(add-tap #'portal/submit)
(portal/close)
(portal/clear)
)

(def publish-interceptor
(interceptor/interceptor
{:name ::publish-interceptor
:enter (fn [e]
(log/info :fn :publish-interceptor)
(event/publish e (:payload e)))}))
(-> e
(event/publish {::event/key "k1"
::event/data {:d1 "hi"}
::event/topic :test-out-1})
(event/publish {::event/key "k2"
::event/data {:d2 "hi"}
::event/topic :test-out-2})))}))

(def cg-interceptor
(interceptor/interceptor
Expand Down Expand Up @@ -69,13 +84,29 @@
:kafka-consumer-group "testcg9"
:kafka-cluster :ccloud
:serialization :json
:kafka-topic "genegraph-test"}
:kafka-topic "genegraph-test"
:kafka-topic-config {}}
:test-base
{:name :test-base
:type :kafka-reader-topic
:kafka-cluster :ccloud
:serialization :json
:kafka-topic-config {}
:kafka-topic "genegraph-test-base"}
:test-out-1
{:name :test-base
:type :kafka-producer-topic
:kafka-cluster :ccloud
:serialization :json
:kafka-topic-config {}
:kafka-topic "genegraph-test-out-1"}
:test-out-2
{:name :test-base
:type :kafka-producer-topic
:kafka-cluster :ccloud
:serialization :json
:kafka-topic-config {}
:kafka-topic "genegraph-test-out-2"}
:publish-to-test
{:name :publish-to-test
:type :simple-queue-topic}}
Expand All @@ -101,7 +132,7 @@

(comment
(def ccloud-example-app (p/init ccloud-example-app-def))
(kafka-admin/admin-actions-by-cluster ccloud-example-app)
(tap> (kafka-admin/admin-actions-by-cluster ccloud-example-app))
(kafka-admin/configure-kafka-for-app! ccloud-example-app)
(p/start ccloud-example-app)
(p/stop ccloud-example-app)
Expand Down
63 changes: 63 additions & 0 deletions example/effect_completion_example.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
(ns genegraph.effect-completion-example
(:require [genegraph.framework.app :as app]
[genegraph.framework.event :as event]
[genegraph.framework.storage :as storage]
[genegraph.framework.protocol :as p]
[io.pedestal.interceptor :as interceptor]
[io.pedestal.log :as log]))

(def test-publisher
(interceptor/interceptor
{:name ::test-publisher
:enter (fn [e] #_(event/publish e
{::event/topic :out
::event/key :k
::event/data :v})
e)}))

(def test-reader
(interceptor/interceptor
{:name ::test-reader
:enter (fn [e]
(log/info :fn ::test-reader :msg (::event/data e))
e)}))

(def test-app-def
{:type :genegraph-app
:storage
{:db
{:name :db
:type :rocksdb
:path "/users/tristan/data/genegraph-neo/effect-rocks/"}}
:topics
{:in
{:name :in
:type :simple-queue-topic}
:out
{:name :out
:type :simple-queue-topic}}
:processors
{:test-processor
{:type :processor
:name :test-processor
:subscribe :in
:interceptors [test-publisher]}
:test-reader
{:type :processor
:name :test-reader
:subscribe :out
:interceptors [test-reader]}}})

(comment
(def test-app (p/init test-app-def))
(p/start test-app)
(p/stop test-app)

(let [p (promise)]
(p/publish (get-in test-app [:topics :in])
{::event/key :k
::event/data {:a :a}
::event/completion-promise p})
(deref p 100 :timeout))

)
81 changes: 81 additions & 0 deletions example/http_init_example.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
(ns http-init-example
(:require [genegraph.framework.app :as app]
[genegraph.framework.protocol :as p]
[genegraph.framework.event :as event]
[io.pedestal.interceptor :as interceptor]
[io.pedestal.http :as http]
[io.pedestal.log :as log]))


(def ready-server
{:ready-server
{:type :http-server
:name :ready-server
:init-fn (fn [svr]
(log/info :ready-server :init)
svr)
::http/host "0.0.0.0"
::http/allowed-origins {:allowed-origins (constantly true)
:creds true}
::http/routes
[["/ready"
:get (fn [_] {:status 200 :body "server is ready"})
:route-name ::readiness]
["/live"
:get (fn [_] {:status 200 :body "server is live"})
:route-name ::liveness]]
::http/type :jetty
::http/port 8888
::http/join? false
::http/secure-headers nil}})

(def print-event-interceptor
(interceptor/interceptor
{:name :print-system-event
:enter (fn [e]
(log/info :system-event :recieved)
e)}))

(def publish-system-event-interceptor
(interceptor/interceptor
{:name :publish-system-event
:enter (fn [e]
(event/publish
e
{::event/topic :system
::event/key :d
::event/data {:g :g}}))}))


(def ready-app-def
{:type :genegraph-app
:http-servers ready-server
:topics
{:events-topic {:type :simple-queue-topic
:name :events-topic}}
:processors
{:event-processor
{:type :processor
:name :event-processor
:subscribe :events-topic
:interceptors [print-event-interceptor
publish-system-event-interceptor]}
:system-processor
(update app/default-system-processor
:interceptors
conj
print-event-interceptor)
:ready-api
{:type :processor
:name :ready-api
}}})


(comment
(def ready-app (p/init ready-app-def))
(p/start ready-app)
(p/publish (get-in ready-app [:topics :events-topic])
{::event/key :a ::event/data {:b :b}})
(p/stop ready-app)

)
12 changes: 12 additions & 0 deletions example/lucene_example.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
(ns genegraph.lucene-example
(:import [org.apache.lucene.analysis Analyzer]
[org.apache.lucene.analysis.standard StandardAnalyzer]
[org.apache.lucene.store Directory FSDirectory]
[org.apache.lucene.index IndexWriter IndexWriterConfig]
[java.nio.file Path]))


(with-open [iw (IndexWriter. (FSDirectory/open
(Path/of "/Users/tristan/data/genegraph-neo/lucene-test"
(make-array String 0)))
(IndexWriterConfig. (StandardAnalyzer.)))])
39 changes: 39 additions & 0 deletions example/transaction_example.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
(ns transaction-example
(:require [genegraph.framework.app :as app]
[genegraph.framework.event :as event]
[genegraph.framework.storage :as storage]
[genegraph.framework.protocol :as p]
[genegraph.framework.storage.rdf :as rdf])
(:import [org.apache.jena.sparql.core Transactional]))


(def transaction-app
{:type :genegraph-app
:storage {:test-tdb
{:type :rdf
:name :test-tdb
:path "/users/tristan/Desktop/test-tdb"}}})

(comment
(def tapp (p/init transaction-app))
(p/start tapp)
(p/stop tapp)

(time
(let [db @(get-in tapp [:storage :test-tdb :instance])]
(dotimes [n 100000]
(.begin db org.apache.jena.query.ReadWrite/READ)
(try (do (clojure.core/+ 1 1))
(.commit db)
(finally (.end db))))))

(time
(let [db @(get-in tapp [:storage :test-tdb :instance])]
(dotimes [n 1000000]
(rdf/tx db
(+ 1 1)))))

(macroexpand-1 `(rdf/tx db
(+ 1 1)))

)
9 changes: 9 additions & 0 deletions example/transaction_publish_example.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
(ns transaction-publish-example
(:require [genegraph.framework.app :as app]
[genegraph.framework.protocol :as p]
[genegraph.framework.event :as event]
[genegraph.framework.storage :as storage]
[genegraph.framework.kafka.admin :as kafka-admin]
[io.pedestal.log :as log]
[io.pedestal.interceptor :as interceptor]
[clojure.data.json :as json]))
2 changes: 1 addition & 1 deletion src/genegraph/framework/app.clj
Original file line number Diff line number Diff line change
Expand Up @@ -178,4 +178,4 @@


(defmethod p/log-event :default [e]
(log/log (assoc (::event/data e) :level :info)))
e)
24 changes: 13 additions & 11 deletions src/genegraph/framework/http_server.clj
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,19 @@
[path http-method (p/as-interceptors processor) :route-name (:name processor)]))

(defmethod p/init :http-server [server-def]
(-> server-def
(update ::http/routes
#(->> (:endpoints server-def)
(map endpoint->route)
(concat %)
set
route/expand-routes))
(assoc :state (atom {:status :stopped})
:producer (promise))
http/create-server
map->Server))
(let [init-fn (:init-fn server-def identity)]
(-> server-def
init-fn
(update ::http/routes
#(->> (:endpoints server-def)
(map endpoint->route)
(concat %)
set
route/expand-routes))
(assoc :state (atom {:status :stopped})
:producer (promise))
http/create-server
map->Server)))

(comment

Expand Down
3 changes: 2 additions & 1 deletion src/genegraph/framework/storage/rdf.clj
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@
(do ~@body)
(try
(.begin ~db ReadWrite/READ)
(do ~@body)
(do ~@body
(.commit ~db)) ; https://github.com/apache/jena/issues/2584
(finally (.end ~db)))))

(defrecord RDFStore [instance
Expand Down

0 comments on commit b658903

Please sign in to comment.