Elasticsearch new low level rest-client wrapper
This is a Signal AI fork of mpenet/spandex.
Changes from master:
- Add wrapper to
bulk-chan
9daa4091bb36fcc16d76853a2276699c35ea3419
To release, change the version in project.clj
and push to master
Versioning should match:
0.7.7-1[-SNAPSHOT]
where 0.7.7
is the upstream version of spandex, 1
is our fork version, and -SNAPSHOT
is optional and indicates development releases.
To quote "State of the official Elasticsearch Java clients"
The Java REST client is the future for Java users of Elasticsearch.
Because the legacy native client is a bit of a nightmare to deal with (for many reasons) and the new REST client is quite capable and fast too, see "Benchmarking REST client and transport client"
Not to mention it supports some interesting features:
-
compatibility with any Elasticsearch version
-
load balancing across all available nodes
-
failover in case of node failures and upon specific response codes
-
failed connection penalization
-
persistent connections
-
trace logging of requests and responses
-
optional automatic discovery of cluster nodes (also known as sniffing)
-
Be minimal & performant
-
RING inspired
-
All "exotic" features should be optional
-
Not a giant DSL over another DSL, just maps everywhere. Read ElasticSearch doc -> done, not another layer of indirection
-
Provide minimal (and optional) utils to do the boring stuff (bulk, scroll queries, compose urls)
-
Can do async via simple callbacks based api or
core.async
-
Provide specs
(require '[qbits.spandex :as s])
(def c (s/client {:hosts ["http://127.0.0.1:9200" "https://foo2:3838"]}))
;; add optional sniffer
(def s (s/sniffer c {... options ...}))
First setup and make sure that you have appropriate access to the host via tunneling.
e.g. Add/edit your ~/.ssh/config
to look something like
# Example of tunneling in ~/.ssh/config
# .. more config
Host my-aws-elasticsearch-host
HostName 10.123.345.456
User ec2-user
IdentitiesOnly yes
IdentityFile ~/.ssh/my-aws-elasticsearch.pem
LocalForward 9200 vpc-my-aws-elasticsearch-host-lb43i.us-east-1.es.amazonaws.com:443
ServerAliveInterval 240
# .. more config
You can then start ssh tunneling with
# see manpage of `ssh` for more details
ssh -oStrictHostKeyChecking=no my-aws-elasticsearch-host -N
Then you can create your client using the following :http-client
options like
;; if you are using tunnelling to host in AWS e.g.
(def client (s/client {:hosts ["https://localhost:9200"]
:http-client {:ssl-context (client/ssl-context-trust-all)
:ssl-noop-hostname-verifier? true}}))
Most of spandex request functions take a request map as parameter. The
:url
key differs a bit from the original RING spec, it allows to
pass a raw string but also a sequence (potentially 2d) of encodable
things, keywords, .toString'able objects that make sense or nil (which
could be caused by a missing :url key).
(s/request c {:url [:foo :bar :_search] ...})
(s/request c {:url [:foo [:bar :something "more"] :_search] ...})
(s/request c {:url :_search ...})
(s/request c {:url "/index/_search" ...})
(s/request c {:url (java.util.UUID/randomUUID) ...})
(s/request c {...}) ;; defaults to "/"
(s/request c {:url [:entries :entry :_search]
:method :get
:body {:query {:match_all {}}}})
>> {:body {:_index "entries", :_type "entry", :_id "AVkDDJvdkd2OsNWu4oYk", :_version 1, :_shards {:total 2, :successful 1, :failed 0}, :created true}, :status 201, :headers {"Content-Type" "application/json; charset=UTF-8", "Content-Length" "141"}, :host #object[org.apache.http.HttpHost 0x62b90fad "http://127.0.0.1:9200"]}
(s/request-async c {:url "/urls/url/"
:method :get
:body {:query {:match {:message "this is a test"}}}
:success (fn [response-as-clj] ... )
:error (fn [ex] :boom)})
(async/<!! (s/request-chan c {:url "/urls/url/"
:method :get
:body {:query {:match {:message "this is a test"}}}}))
Scrolling via core.async (fully NIO internally), interruptible if you async/close! the returned chan.
(async/go
(let [ch (s/scroll-chan client {:url "/foo/_search" :body {:query {:match_all {}}}})]
(loop []
(when-let [page (async/<! ch)]
(do-something-with-page page)
(recur)))))
"Faux streaming" of _bulk requests (flushes bulk request after configurable interval or threshold. Uses request-chan internally, so it's quite cheap.
(let [{:keys [input-ch output-ch]} (bulk-chan client {:flush-threshold 100
:flush-interval 5000
:max-concurrent-requests 3})]
;; happily takes a sequence of actions or single fragments
(async/put! input-ch [{:delete {:_index "foo" :_id "1234"}} {:_index :bar} {:create {...}}])
(async/put! input-ch {"delete" {"_index" "website" "_type" "blog" "_id" "123"}}))
;; setup an response consumer (we just want to make sure we don't clog this channel)
(future (loop [] (async/<!! (:output-ch c))))
Or the clj.specs if that's your thing:
If you wish to support the work on this project you can do this via my patreon page.
Copyright © 2018 Max Penet
Distributed under the Eclipse Public License, the same as Clojure.