Skip to content

Commit

Permalink
Merge branch '1570_1572_1577_nested_queries_core' into 1574_subQuery_…
Browse files Browse the repository at this point in the history
…go_interface
  • Loading branch information
reindexer-bot committed Dec 5, 2023
1 parent 8fe18b8 commit fd258ad
Show file tree
Hide file tree
Showing 162 changed files with 6,079 additions and 3,862 deletions.
5 changes: 2 additions & 3 deletions .clang-format
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ AlignAfterOpenBracket: Align
AlignConsecutiveAssignments: false
AlignConsecutiveDeclarations: false
AlignEscapedNewlinesLeft: true
AlignOperands: true
AlignOperands: true
AlignTrailingComments: true
AllowAllParametersOfDeclarationOnNextLine: true
AllowShortBlocksOnASingleLine: false
Expand All @@ -20,7 +20,7 @@ AlwaysBreakBeforeMultilineStrings: true
AlwaysBreakTemplateDeclarations: true
BinPackArguments: true
BinPackParameters: true
BraceWrapping:
BraceWrapping:
AfterClass: false
AfterControlStatement: false
AfterEnum: false
Expand All @@ -45,7 +45,6 @@ ConstructorInitializerIndentWidth: 4
ContinuationIndentWidth: 4
Cpp11BracedListStyle: true
DerivePointerAlignment: true
PointerAlignment: Left
DisableFormat: false
ExperimentalAutoDetectBinPacking: false
ForEachMacros: [ foreach, Q_FOREACH, BOOST_FOREACH ]
Expand Down
3 changes: 3 additions & 0 deletions bindings/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ const (
QueryUpdateFieldV2 = 25
QueryBetweenFieldsCondition = 26
QueryAlwaysFalseCondition = 27
QueryAlwaysTrueCondition = 28
QuerySubQueryCondition = 29
QueryFieldSubQueryCondition = 30

LeftJoin = 0
InnerJoin = 1
Expand Down
27 changes: 15 additions & 12 deletions cpp_src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -53,25 +53,28 @@ include (TargetArch)
target_architecture(COMPILER_TARGET_ARCH)

# Configure compile options
string(REPLACE "-DNDEBUG" "" CMAKE_CXX_FLAGS_RELWITHDEBINFO "${CMAKE_CXX_FLAGS_RELWITHDEBINFO}")
string(REPLACE "-O2" "-O3" CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS}")
string(REPLACE "-O2" "-O3" CMAKE_C_FLAGS "${CMAKE_C_FLAGS}")
if (NOT ${COMPILER_TARGET_ARCH} STREQUAL "e2k")
string(REPLACE "-g" "-g1" CMAKE_CXX_FLAGS_RELWITHDEBINFO "${CMAKE_CXX_FLAGS_RELWITHDEBINFO}")
else()
string(REPLACE "-g" "-g0" CMAKE_CXX_FLAGS_RELWITHDEBINFO "${CMAKE_CXX_FLAGS_RELWITHDEBINFO}")
endif()

if (MSVC)
set(CMAKE_CXX_FLAGS_RELWITHDEBINFO "-O2 -g1")
set(CMAKE_C_FLAGS_RELWITHDEBINFO "-O2 -g1")
set(CMAKE_CXX_FLAGS_RELEASE "-O2 -DNDEBUG")
set(CMAKE_C_FLAGS_RELEASE "-O2 -DNDEBUG")
else ()
set(CMAKE_CXX_FLAGS_RELWITHDEBINFO "-O3 -g1")
set(CMAKE_C_FLAGS_RELWITHDEBINFO "-O3 -g1")
set(CMAKE_CXX_FLAGS_RELEASE "-O3 -DNDEBUG")
set(CMAKE_C_FLAGS_RELEASE "-O3 -DNDEBUG")
endif ()
if (${COMPILER_TARGET_ARCH} STREQUAL "e2k")
set(CMAKE_CXX_FLAGS_RELWITHDEBINFO "-O3 -g0")
add_definitions(-D__E2K__)
add_definitions(-D__LCC__)
endif()
endif ()

if (NOT MSVC AND NOT APPLE)
check_linker_flag (-gz cxx_linker_supports_gz)
if (cxx_linker_supports_gz)
set (CMAKE_EXE_LINKER_FLAGS_RELWITHDEBINFO "${CMAKE_EXE_LINKER_FLAGS_RELWITHDEBINFO} -gz")
endif ()
set (CMAKE_EXE_LINKER_FLAGS_RELWITHDEBINFO "${CMAKE_EXE_LINKER_FLAGS_RELWITHDEBINFO} -gz")
endif ()
endif ()

if (MSVC)
Expand Down
3 changes: 2 additions & 1 deletion cpp_src/client/coroqueryresults.cc
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,10 @@ Item CoroQueryResults::Iterator::GetItem() {
if (err.ok()) {
return item;
}
return Item();
} catch (const Error &) {
return Item();
}
return Item();
}

int64_t CoroQueryResults::Iterator::GetLSN() {
Expand Down
14 changes: 7 additions & 7 deletions cpp_src/client/cororpcclient.cc
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ Error CoroRPCClient::modifyItem(std::string_view nsName, Item& item, int mode, s
}
CoroQueryResults qr;
InternalRdxContext ctxCompl = ctx.WithCompletion(nullptr);
auto ret = selectImpl(Query(std::string(nsName)).Limit(0), qr, netTimeout, ctxCompl);
auto ret = selectImpl(Query(nsName).Limit(0), qr, netTimeout, ctxCompl);
if (ret.code() == errTimeout) {
return Error(errTimeout, "Request timeout");
}
Expand Down Expand Up @@ -242,7 +242,7 @@ Error CoroRPCClient::Delete(const Query& query, CoroQueryResults& result, const
query.Serialize(ser);

NsArray nsArray;
query.WalkNested(true, true, [this, &nsArray](const Query& q) { nsArray.push_back(getNamespace(q.NsName())); });
query.WalkNested(true, true, false, [this, &nsArray](const Query& q) { nsArray.push_back(getNamespace(q.NsName())); });

result = CoroQueryResults(&conn_, std::move(nsArray), 0, config_.FetchAmount, config_.RequestTimeout);

Expand All @@ -263,7 +263,7 @@ Error CoroRPCClient::Update(const Query& query, CoroQueryResults& result, const
query.Serialize(ser);

NsArray nsArray;
query.WalkNested(true, true, [this, &nsArray](const Query& q) { nsArray.push_back(getNamespace(q.NsName())); });
query.WalkNested(true, true, false, [this, &nsArray](const Query& q) { nsArray.push_back(getNamespace(q.NsName())); });

result = CoroQueryResults(&conn_, std::move(nsArray), 0, config_.FetchAmount, config_.RequestTimeout);

Expand Down Expand Up @@ -313,10 +313,10 @@ Error CoroRPCClient::selectImpl(const Query& query, CoroQueryResults& result, se
WrSerializer qser, pser;
int flags = result.fetchFlags_ ? result.fetchFlags_ : (kResultsWithPayloadTypes | kResultsCJson);
flags |= kResultsSupportIdleTimeout;
bool hasJoins = !query.joinQueries_.empty();
bool hasJoins = !query.GetJoinQueries().empty();
if (!hasJoins) {
for (auto& mq : query.mergeQueries_) {
if (!mq.joinQueries_.empty()) {
for (auto& mq : query.GetMergeQueries()) {
if (!mq.GetJoinQueries().empty()) {
hasJoins = true;
break;
}
Expand All @@ -328,7 +328,7 @@ Error CoroRPCClient::selectImpl(const Query& query, CoroQueryResults& result, se
}
NsArray nsArray;
query.Serialize(qser);
query.WalkNested(true, true, [this, &nsArray](const Query& q) { nsArray.push_back(getNamespace(q.NsName())); });
query.WalkNested(true, true, false, [this, &nsArray](const Query& q) { nsArray.push_back(getNamespace(q.NsName())); });
h_vector<int32_t, 4> vers;
for (auto& ns : nsArray) {
vers.push_back(ns->tagsMatcher_.version() ^ ns->tagsMatcher_.stateToken());
Expand Down
3 changes: 2 additions & 1 deletion cpp_src/client/queryresults.cc
Original file line number Diff line number Diff line change
Expand Up @@ -250,9 +250,10 @@ Item QueryResults::Iterator::GetItem() {
if (err.ok()) {
return item;
}
return Item();
} catch (const Error &) {
return Item();
}
return Item();
}

int64_t QueryResults::Iterator::GetLSN() {
Expand Down
2 changes: 2 additions & 0 deletions cpp_src/client/resultserializer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ void ResultSerializer::GetRawQueryParams(ResultSerializer::QueryParams& ret, con
case QueryResultExplain:
ret.explainResults = std::string(data);
break;
default:
throw Error(errLogic, "Unexpected Query tag: %d", tag);
}
}
}
Expand Down
60 changes: 45 additions & 15 deletions cpp_src/client/rpcclient.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <functional>
#include "client/itemimpl.h"
#include "core/namespacedef.h"
#include "core/schema.h"
#include "gason/gason.h"
#include "tools/cpucheck.h"
#include "tools/errors.h"
Expand Down Expand Up @@ -98,6 +99,7 @@ void RPCClient::run(size_t thIdx) {

workers_[thIdx].stop_.start();
delayedUpdates_.clear();
serialDelays_ = 0;

for (size_t i = thIdx; int(i) < config_.ConnPoolSize; i += config_.WorkerThreads) {
connections_[i].reset(new cproto::ClientConnection(workers_[thIdx].loop_, &connectData_,
Expand Down Expand Up @@ -381,7 +383,7 @@ Error RPCClient::Delete(const Query& query, QueryResults& result, const Internal
auto conn = getConn();

NsArray nsArray;
query.WalkNested(true, true, [this, &nsArray](const Query& q) { nsArray.push_back(getNamespace(q.NsName())); });
query.WalkNested(true, true, false, [this, &nsArray](const Query& q) { nsArray.push_back(getNamespace(q.NsName())); });

result = QueryResults(conn, std::move(nsArray), nullptr, 0, config_.FetchAmount, config_.RequestTimeout);

Expand All @@ -408,7 +410,7 @@ Error RPCClient::Update(const Query& query, QueryResults& result, const Internal
auto conn = getConn();

NsArray nsArray;
query.WalkNested(true, true, [this, &nsArray](const Query& q) { nsArray.push_back(getNamespace(q.NsName())); });
query.WalkNested(true, true, false, [this, &nsArray](const Query& q) { nsArray.push_back(getNamespace(q.NsName())); });

result = QueryResults(conn, std::move(nsArray), nullptr, 0, config_.FetchAmount, config_.RequestTimeout);

Expand Down Expand Up @@ -469,10 +471,10 @@ Error RPCClient::selectImpl(const Query& query, QueryResults& result, cproto::Cl
const InternalRdxContext& ctx) {
WrSerializer qser, pser;
int flags = result.fetchFlags_ ? result.fetchFlags_ : (kResultsWithPayloadTypes | kResultsCJson);
bool hasJoins = !query.joinQueries_.empty();
bool hasJoins = !query.GetJoinQueries().empty();
if (!hasJoins) {
for (auto& mq : query.mergeQueries_) {
if (!mq.joinQueries_.empty()) {
for (auto& mq : query.GetMergeQueries()) {
if (!mq.GetJoinQueries().empty()) {
hasJoins = true;
break;
}
Expand All @@ -484,7 +486,7 @@ Error RPCClient::selectImpl(const Query& query, QueryResults& result, cproto::Cl
}
NsArray nsArray;
query.Serialize(qser);
query.WalkNested(true, true, [this, &nsArray](const Query& q) { nsArray.push_back(getNamespace(q.NsName())); });
query.WalkNested(true, true, false, [this, &nsArray](const Query& q) { nsArray.push_back(getNamespace(q.NsName())); });
h_vector<int32_t, 4> vers;
for (auto& ns : nsArray) {
shared_lock<shared_timed_mutex> lck(ns->lck_);
Expand Down Expand Up @@ -717,21 +719,39 @@ void RPCClient::onUpdates(net::cproto::RPCAnswer& ans, cproto::ClientConnection*
// If tagsMatcher has been updated but there is no bundled tagsMatcher in cjson
// then we need to ask server to send tagsMatcher.

++serialDelays_;
// Delay this update and all the further updates until we get responce from server.
ans.EnsureHold();
delayedUpdates_.emplace_back(std::move(ans));

QueryResults* qr = new QueryResults;
Select(Query(std::string(nsName)).Limit(0), *qr,
InternalRdxContext(nullptr,
[=](const Error& err) {
delete qr;
// If there are delayed updates then send them to client
auto uq = std::move(delayedUpdates_);
delayedUpdates_.clear();
if (err.ok())
for (auto& a1 : uq) onUpdates(a1, conn);
}),
InternalRdxContext(
nullptr,
[this, qr, conn](const Error& err) {
delete qr;
// If there are delayed updates then send them to client
auto uq = std::move(delayedUpdates_);
delayedUpdates_.clear();

if (!err.ok() || serialDelays_ > 1) {
// This update was already dealyed, but was not able to synchronize tagsmatcher.
// Such situation usually means, that master's namespace was recreated and must be synchronized via force
// sync.
// Current fix is suboptimal and in some cases even incorrect (but still better, than previous
// implementation) - proper fix requires some versioning info about namespaces, which exists
// in v4 only
std::string_view nsName(std::string_view(uq.front().GetArgs(1)[1]));
logPrintf(
LogWarning,
"[repl:%s] Unable to sync tags matcher via online-replication (err: '%s'). Calling UpdatesLost fallback",
nsName, err.what());
serialDelays_ = 0;
observers_.OnUpdatesLost(nsName);
} else {
for (auto& a1 : uq) onUpdates(a1, conn);
}
}),
conn);
return;
} else {
Expand All @@ -748,7 +768,17 @@ void RPCClient::onUpdates(net::cproto::RPCAnswer& ans, cproto::ClientConnection*
ns->tagsMatcher_.deserialize(rdser, wrec.itemModify.tmVersion, ns->tagsMatcher_.stateToken());
}
}
} else if (wrec.type == WalTagsMatcher) {
TagsMatcher tm;
Serializer ser(wrec.data.data(), wrec.data.size());
const auto version = ser.GetVarint();
const auto stateToken = ser.GetVarint();
tm.deserialize(ser, version, stateToken);
auto ns = getNamespace(nsName);
std::lock_guard lck(ns->lck_);
ns->tagsMatcher_ = std::move(tm);
}
serialDelays_ = 0;
observers_.OnWALUpdate(LSNPair(lsn, originLSN), nsName, wrec);
}

Expand Down
2 changes: 2 additions & 0 deletions cpp_src/client/rpcclient.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ class RPCClient {
ev::async stop_;
std::atomic_bool running;
};

Error selectImpl(std::string_view query, QueryResults &result, cproto::ClientConnection *, seconds netTimeout,
const InternalRdxContext &ctx);
Error selectImpl(const Query &query, QueryResults &result, cproto::ClientConnection *, seconds netTimeout,
Expand Down Expand Up @@ -122,6 +123,7 @@ class RPCClient {
UpdatesObservers observers_;
std::atomic<net::cproto::ClientConnection *> updatesConn_;
std::vector<net::cproto::RPCAnswer> delayedUpdates_;
uint64_t serialDelays_ = 0;
cproto::ClientConnection::ConnectData connectData_;
};

Expand Down
12 changes: 6 additions & 6 deletions cpp_src/client/rpcclientmock.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ Error RPCClientMock::Delete(const Query& query, QueryResults& result, const Inte
auto conn = getConn();

NsArray nsArray;
query.WalkNested(true, true, [this, &nsArray](const Query& q) { nsArray.push_back(getNamespace(q.NsName())); });
query.WalkNested(true, true, false, [this, &nsArray](const Query& q) { nsArray.push_back(getNamespace(q.NsName())); });

result = QueryResults(conn, std::move(nsArray), nullptr, 0, config_.FetchAmount, config_.RequestTimeout);

Expand Down Expand Up @@ -69,7 +69,7 @@ Error RPCClientMock::Update(const Query& query, QueryResults& result, const Inte
auto conn = getConn();

NsArray nsArray;
query.WalkNested(true, true, [this, &nsArray](const Query& q) { nsArray.push_back(getNamespace(q.NsName())); });
query.WalkNested(true, true, false, [this, &nsArray](const Query& q) { nsArray.push_back(getNamespace(q.NsName())); });

result = QueryResults(conn, std::move(nsArray), nullptr, 0, config_.FetchAmount, config_.RequestTimeout);

Expand Down Expand Up @@ -289,10 +289,10 @@ Error RPCClientMock::selectImpl(const Query& query, QueryResults& result, cproto
flags = result.fetchFlags_ ? result.fetchFlags_ : (kResultsWithPayloadTypes | kResultsCJson);
}

bool hasJoins = !query.joinQueries_.empty();
bool hasJoins = !query.GetJoinQueries().empty();
if (!hasJoins) {
for (auto& mq : query.mergeQueries_) {
if (!mq.joinQueries_.empty()) {
for (auto& mq : query.GetMergeQueries()) {
if (!mq.GetJoinQueries().empty()) {
hasJoins = true;
break;
}
Expand All @@ -308,7 +308,7 @@ Error RPCClientMock::selectImpl(const Query& query, QueryResults& result, cproto

NsArray nsArray;
query.Serialize(qser);
query.WalkNested(true, true, [this, &nsArray](const Query& q) { nsArray.push_back(getNamespace(q.NsName())); });
query.WalkNested(true, true, false, [this, &nsArray](const Query& q) { nsArray.push_back(getNamespace(q.NsName())); });
h_vector<int32_t, 4> vers;
for (auto& ns : nsArray) {
shared_lock<shared_timed_mutex> lck(ns->lck_);
Expand Down
Loading

0 comments on commit fd258ad

Please sign in to comment.