Skip to content

Commit

Permalink
#10: RapidJSON instead of slow JsonCpp (in scroll api). Still WIP, as…
Browse files Browse the repository at this point in the history
… this must be resolved:

  * reconsider using std::unique_ptr on the interface, maybe raw pointer would be more ABI secure
  * CMake && doc update
  • Loading branch information
johniez committed Jul 11, 2019
1 parent 2b6fca2 commit d38b766
Show file tree
Hide file tree
Showing 7 changed files with 162 additions and 123 deletions.
1 change: 1 addition & 0 deletions Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ The library is based on [C++ Requests: Curl for People](https://github.com/whosh
## Dependencies
* [C++ Requests: Curl for People](https://github.com/whoshuu/cpr)
* [JsonCpp](https://github.com/open-source-parsers/jsoncpp)
* [RapidJSON](http://rapidjson.org/)
* [Google Test](https://github.com/google/googletest)
* Only for tests: [C++ HTTP mock server library](https://github.com/seznam/httpmockserver)

Expand Down
101 changes: 101 additions & 0 deletions include/elasticlient/scroll-parser.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/**
* \file
* \brief Scroll parser implemented using rapidjson library.
*
* Parsing of JSON result exposed due to header-only rapidjson library.
* Result parsing could be completely hidden inside elasticlient implementation
* thus resulting in necessity to parse result twice - for error detection and
* scrollId retrieval, and later by the client code to get the data.
* Or it can be (and it is) exposed, so the client code compiles exactly same
* rapidjson::Document class for result parsing and for result hits processing.
*/

#pragma once

#include <string>
#include <memory>
#include <rapidjson/document.h>
#include <elasticlient/scroll.h>


namespace elasticlient {


/**
* Parse scroll result and set it into \p parsedResult.
*
* \note Function implemented in public header to assure binary compatibility
* of parsed rapidjson document. Otherwise there would be a risk of using binary
* incompatible versions inside elasticlient.so and the client's side...
*
* \param result Scroll result (json).
* \param parsedResult Parsed json result.
* \param scrollId ScrollId read from the result.
* \return true on success.
*/
bool parseScrollResult(
const std::string &result, std::unique_ptr<JsonResult> &parsedResult,
std::string &scrollId)
{
parsedResult.reset(new elasticlient::JsonResult{});

rapidjson::ParseResult ok = parsedResult->document.Parse(result.c_str());
if (!ok || !parsedResult->document.IsObject()) {
// something is weird, only report error
return false;
}

if (parsedResult->document.HasMember("error")) {
const rapidjson::Value &err = parsedResult->document["error"];
if (!err.IsBool() || err.GetBool()) {
// an error was reported
return false;
}
}

if (parsedResult->document.HasMember("timed_out")) {
const rapidjson::Value &doc = parsedResult->document["timed_out"];
if (!doc.IsBool() || doc.GetBool()) {
// request timed out
return false;
}
}

if (parsedResult->document.HasMember("_shards")
&& parsedResult->document["_shards"].IsObject()) {
const rapidjson::Value &shards = parsedResult->document["_shards"];
if (shards.HasMember("failed") && shards["failed"].IsInt()) {
const int failed = shards["failed"].GetInt();
if (failed > 0) {
// failed shards, do not trust the data
return false;
}
} else {
// no information about failed shards, but it has to be here
return false;
}
} else {
// no information about shards, but it has to be here
return false;
}

if (parsedResult->document.HasMember("hits")) {
rapidjson::Value &hits = parsedResult->document["hits"];
// just make sure the hits array is present
if (hits.HasMember("hits") && hits["hits"].IsArray()) {
if (parsedResult->document.HasMember("_scroll_id")
&& parsedResult->document["_scroll_id"].IsString()) {
scrollId = parsedResult->document["_scroll_id"].GetString();
return true;
}
// missing _scroll_id, is that scroll response?
return false;
}
}

// scroll response is not ok
return false;
}


} // namespace elasticlient
22 changes: 13 additions & 9 deletions include/elasticlient/scroll.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,22 @@
#include <memory>
#include <vector>
#include <cstdint>


// Forward Json::Value existence.
namespace Json {
class Value;
}
#include <rapidjson/document.h>


/// The elasticlient namespace
namespace elasticlient {


/**
* Scroll result.
*/
struct JsonResult {
/// Parsed document. On successful scroll it contains ["hits"]["hits"] array.
rapidjson::Document document;
};


// Forward Client class existence.
class Client;

Expand Down Expand Up @@ -80,14 +84,14 @@ class Scroll {
* \return true if okay
* \return false on error
*/
bool next(Json::Value &parsedResult);
bool next(std::unique_ptr<JsonResult> &parsedResult);

/// Return Client class with current config.
const std::shared_ptr<Client> &getClient() const;

protected:
/// Creates new scroll - obtain scrollId and parsedResult
virtual bool createScroll(Json::Value &parsedResult);
virtual bool createScroll(std::unique_ptr<JsonResult> &parsedResult);
};


Expand Down Expand Up @@ -116,7 +120,7 @@ class ScrollByScan : public Scroll {
* make two Elasticsearch calls one for obtain scrollId and second for obtain
* first bulk of results.
*/
virtual bool createScroll(Json::Value &parsedResult) override;
virtual bool createScroll(std::unique_ptr<JsonResult> &parsedResult) override;
};


Expand Down
1 change: 1 addition & 0 deletions src/bulk.cc
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ std::size_t Bulk::perform(const IBulkData &bulk) {
void Bulk::Implementation::processResult(
const std::string &result, std::size_t size)
{
// TODO parse this using rapidjson and remove jsoncpp dependency.
Json::Value root;
Json::Reader reader;
// parse elastic json result without comments (false at the end)
Expand Down
6 changes: 1 addition & 5 deletions src/scroll-impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
#include <string>
#include <vector>
#include <stdexcept>
#include <json/json.h>
#include "elasticlient/client.h"


Expand Down Expand Up @@ -71,10 +70,7 @@ class Scroll::Implementation {
}

/// Run request on Client
bool run(const std::string &commonUrlPart, const std::string &body, Json::Value &parsedResult);

///Parse Elasticsearch HTTP \p result into \p parsedResult
bool parseResult(const std::string &result, Json::Value &parsedResult);
bool run(const std::string &commonUrlPart, const std::string &body, std::unique_ptr<JsonResult> &parsedResult);
};


Expand Down
76 changes: 6 additions & 70 deletions src/scroll.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@

#include <sstream>
#include <memory>
#include <json/json.h>
#include <cpr/cpr.h>
#include "logging-impl.h"
#include "elasticlient/scroll-parser.h"


namespace elasticlient {
Expand Down Expand Up @@ -57,78 +57,14 @@ void Scroll::Implementation::ScrollParams::clear() {
}


bool Scroll::Implementation::parseResult(const std::string &result, Json::Value &parsedResult) {
Json::Value root;
Json::Reader reader;
// parse elastic json result without comments (false at the end)
if (!reader.parse(result, root, false)) {
// something is weird, ony report error
LOG(LogLevel::WARNING, "Error when parsing JSON Elastic result.");
return false;
}

if (root.isMember("error")) {
const Json::Value &error = root["error"];
if (!error.isBool() || error.asBool()) {
LOG(LogLevel::WARNING, "An Elastic error occured while getting scroll result. '%s'",
result.c_str());
return false;
}
}

if (root.isMember("timed_out")) {
if (!root["timed_out"].isBool() || root["timed_out"].asBool()) {
LOG(LogLevel::WARNING, "The Scroll has been timeouted. '%s'", result.c_str());
return false;
}
}

if (root.isMember("_shards") && root["_shards"].isObject()) {
const Json::Value &shards = root["_shards"];
if (shards.isMember("failed") && shards["failed"].isInt()) {
if (shards["failed"].asInt() > 0) {
LOG(LogLevel::WARNING, "Results not obtained from all shards (failed=%d). '%s'",
shards["failed"].asInt(), result.c_str());
return false;
}
} else {
LOG(LogLevel::WARNING, "In result is no information about failed shards. '%s'",
result.c_str());
return false;
}
} else {
LOG(LogLevel::WARNING, "In result is no information about shards.");
return false;
}

// everything is alright, errors==false, results from all shards obtained
if (root.isMember("hits") && root["hits"].isObject()) {
const Json::Value &hits = root["hits"];
if (hits.isMember("hits") && hits["hits"].isArray()) {
if (root.isMember("_scroll_id") && root["_scroll_id"].isString()) {
// propagate result and store scrollId in member variable
parsedResult = hits;
scrollParameters.scrollId = root["_scroll_id"].asString();
return true;
}
LOG(LogLevel::WARNING, "In result is no _scroll_id.");

return false;
}
}
LOG(LogLevel::WARNING, "Scroll result is corrupted.");
return false;
}


bool Scroll::Implementation::run(
const std::string &commonUrlPart, const std::string &body, Json::Value &parsedResult)
const std::string &commonUrlPart, const std::string &body, std::unique_ptr<elasticlient::JsonResult> &parsedResult)
{
try {
const cpr::Response r = client->performRequest(Client::HTTPMethod::POST,
commonUrlPart, body);
if (r.status_code / 100 == 2 or r.status_code == 404) {
return parseResult(r.text, parsedResult);
return parseScrollResult(r.text, parsedResult, scrollParameters.scrollId);
}

} catch(const ConnectionException &ex) {
Expand All @@ -152,7 +88,7 @@ void Scroll::init(
}


bool Scroll::createScroll(Json::Value &parsedResult) {
bool Scroll::createScroll(std::unique_ptr<elasticlient::JsonResult> &parsedResult) {
Implementation::ScrollParams &scrollParameters = impl->scrollParameters;
std::ostringstream urlPart;
urlPart << scrollParameters.indexName << "/" << scrollParameters.docType << "/_search?scroll="
Expand All @@ -170,7 +106,7 @@ bool Scroll::createScroll(Json::Value &parsedResult) {
}


bool Scroll::next(Json::Value &parsedResult) {
bool Scroll::next(std::unique_ptr<elasticlient::JsonResult> &parsedResult) {
Implementation::ScrollParams &scrollParameters = impl->scrollParameters;

if (!impl->isInitialized()) {
Expand Down Expand Up @@ -249,7 +185,7 @@ ScrollByScan::ScrollByScan(const std::vector<std::string> &hostUrlList,
ScrollByScan::ScrollByScan(ScrollByScan &&) = default;


bool ScrollByScan::createScroll(Json::Value &parsedResult) {
bool ScrollByScan::createScroll(std::unique_ptr<elasticlient::JsonResult> &parsedResult) {
Implementation::ScrollParams &scrollParameters = impl->scrollParameters;

std::ostringstream urlPart;
Expand Down
Loading

0 comments on commit d38b766

Please sign in to comment.