Skip to content

Commit

Permalink
#10: Bulk internal parsing using rapidjson instead of jsoncpp.
Browse files Browse the repository at this point in the history
  • Loading branch information
johniez committed Jul 12, 2019
1 parent d38b766 commit 0dd63ee
Showing 1 changed file with 28 additions and 23 deletions.
51 changes: 28 additions & 23 deletions src/bulk.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
#include <string>
#include <sstream>
#include <cpr/cpr.h>
#include <json/json.h>
#include <rapidjson/document.h>
#include "logging-impl.h"
#include "elasticlient/client.h"

Expand Down Expand Up @@ -163,11 +163,9 @@ std::size_t Bulk::perform(const IBulkData &bulk) {
void Bulk::Implementation::processResult(
const std::string &result, std::size_t size)
{
// TODO parse this using rapidjson and remove jsoncpp dependency.
Json::Value root;
Json::Reader reader;
// parse elastic json result without comments (false at the end)
if (!reader.parse(result, root, false)) {
rapidjson::Document root;
rapidjson::ParseResult ok = root.Parse(result.c_str());
if (!ok || !root.IsObject()) {
// probably whole bulk has failed
errCount += size;
return;
Expand All @@ -187,65 +185,72 @@ void Bulk::Implementation::processResult(
// ]}

// check errors flag, if it is false, do not parse single responses
if (root.isMember("errors")) {
const Json::Value &errors = root["errors"];
if (errors.isBool() && !errors.asBool()) {
if (root.HasMember("errors")) {
const rapidjson::Value &errors = root["errors"];
if (errors.IsBool() && !errors.GetBool()) {
// everything is alright, errors==false.
return;
}
}

// check presence of the bulk items results
if (!root.isMember("items")) {
if (!root.HasMember("items")) {
LOG(LogLevel::WARNING, "Bulk ran with errors, but no items are present "
"at the response! Err count is inaccurate now!");
return;
}
const Json::Value &items = root["items"];
const rapidjson::Value &items = root["items"];
// check correct type of the items
if (!items.isArray()) {
if (!items.IsArray()) {
LOG(LogLevel::WARNING, "Failed to read elastic response field 'items', because "
"it is not an array! Err count is inaccurate now!");
return;
}

// process items responses
for (const Json::Value &item: items) {
if (!item.isObject()) {
for (const rapidjson::Value &item: items.GetArray()) {
if (!item.IsObject()) {
LOG(LogLevel::WARNING, "Bulk items responses have to be objects!");
errCount++;
continue;
}

// check index action response
if (item.isMember("index")) {
const Json::Value &res = item["index"];
if (!res.isObject()) {
if (item.HasMember("index")) {
const rapidjson::Value &res = item["index"];
if (!res.IsObject()) {
LOG(LogLevel::WARNING, "Bulk response has unexpected format, "
"object was expected.");
errCount++;
continue;
}
// read status code
const Json::Value &status = res.get("status", Json::Int(500));
if (!status.isNumeric()) {
if (!res.HasMember("status")) {
LOG(LogLevel::WARNING, "Bulk response item with missing status.");
errCount++;
continue;
};
const rapidjson::Value &status = res["status"];
if (!status.IsInt()) {
LOG(LogLevel::WARNING, "Bulk response was expected to have numeric status. "
"Skipping this response checking.");
errCount++;
continue;
}

// if status code is not 2xx family, consider it as error
if (status.asInt() / 100 != 2) {
if (status.GetInt() / 100 != 2) {
errCount++;
}
} else {
LOG(LogLevel::WARNING, "Unsupported 'action' found at bulk response.");

}
}

// complain if not all items of the bulk were covered by responses
if (items.size() < size) {
if (items.Size() < size) {
LOG(LogLevel::INFO, "Bulk has more items than responses received. Cannot tell "
"whether %lu items succeeded...", size - items.size());
"whether %lu items succeeded...", size - items.Size());
}
}

Expand Down

0 comments on commit 0dd63ee

Please sign in to comment.