Skip to content

Commit

Permalink
Merge branch 'feature/security' into 'feature/clusterization'
Browse files Browse the repository at this point in the history
#1644 security features

See merge request itv-backend/reindexer!1731
  • Loading branch information
reindexer-bot committed Dec 13, 2024
1 parent d2d4679 commit 1d87bca
Show file tree
Hide file tree
Showing 245 changed files with 244,230 additions and 2,042 deletions.
15 changes: 10 additions & 5 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,11 @@ jobs:
build:
strategy:
matrix:
os: [ubuntu-20.04, ubuntu-22.04, macos-13]
os: [ubuntu-22.04, macos-13]
include:
- os: ubuntu-24.04
cc: gcc-12
cxx: g++-12
- os: ubuntu-latest
sanitizer: ASAN
cc: gcc-10
Expand Down Expand Up @@ -111,7 +114,7 @@ jobs:
test:
strategy:
matrix:
os: [ubuntu-20.04, ubuntu-22.04, macos-13]
os: [ubuntu-22.04, ubuntu-24.04, macos-13]
test: ['C++', 'GO']
include:
- os: ubuntu-latest
Expand Down Expand Up @@ -192,7 +195,7 @@ jobs:
test-pyreindexer:
strategy:
matrix:
os: [ubuntu-20.04, ubuntu-22.04]
os: [ubuntu-22.04, ubuntu-24.04]
fail-fast: false
runs-on: ${{matrix.os}}
needs: build
Expand All @@ -210,7 +213,7 @@ jobs:
run: |
if [[ $OS == ubuntu* ]]; then
sudo ./dependencies.sh
python3 -m pip install setuptools
python3 -m pip install setuptools build
else
./dependencies.sh
fi
Expand Down Expand Up @@ -239,7 +242,9 @@ jobs:
with:
repository: restream/reindexer-py
- name: Install PyReindexer
run: sudo python3 setup.py install
run: |
python -m build
python -m pip install .
- name: Test PyReindexer
run: |
cd pyreindexer
Expand Down
10 changes: 5 additions & 5 deletions appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ environment:
TOOLCHAIN: x86_64-8.1.0-posix-seh-rt_v6-rev0
APPVEYOR_BUILD_WORKER_IMAGE: Visual Studio 2019

- BUILD_TYPE: Release
COMPILER: MinGW
PLATFORM: Win32
TOOLCHAIN: i686-8.1.0-posix-dwarf-rt_v6-rev0
APPVEYOR_BUILD_WORKER_IMAGE: Visual Studio 2019
# - BUILD_TYPE: Release
# COMPILER: MinGW
# PLATFORM: Win32
# TOOLCHAIN: i686-8.1.0-posix-dwarf-rt_v6-rev0
# APPVEYOR_BUILD_WORKER_IMAGE: Visual Studio 2019

build_script:
- git describe --tags
Expand Down
6 changes: 6 additions & 0 deletions bindings.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package reindexer

import (
"context"
"crypto/tls"
"fmt"
"reflect"
"strconv"
Expand Down Expand Up @@ -555,6 +556,11 @@ func WithStrictJoinHandlers() interface{} {
return bindings.OptionStrictJoinHandlers{EnableStrictJoinHandlers: true}
}

// Enables connection to Reindexer using TLS. If tls.Config is nil TLS is disabled
func WithTLSConfig(config *tls.Config) interface{} {
return bindings.OptionTLS{Config: config}
}

// WithReconnectionStrategy allows to configure the behavior during reconnect after error.
// Strategy used for reconnect to server on connection error
// AllowUnknownNodes allows to add dsn from cluster node, that was not set in client dsn list
Expand Down
1 change: 0 additions & 1 deletion bindings/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,5 +200,4 @@ const (
const (
ShardingNotSet = -1
ShardingProxyOff = -2
NotSharded = -3
)
21 changes: 16 additions & 5 deletions bindings/cproto/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bufio"
"bytes"
"context"
"crypto/tls"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -142,6 +143,8 @@ type connectionImpl struct {
requestDedicatedThread bool
loggerOwner LoggerOwner
eventsHandler bindings.EventsHandler

configTLS *tls.Config
}

type newConnParams struct {
Expand All @@ -154,6 +157,7 @@ type newConnParams struct {
requestDedicatedThread bool
caps bindings.BindingCapabilities
envetsHandler bindings.EventsHandler
tls bindings.OptionTLS
}

func newConnection(
Expand All @@ -175,6 +179,7 @@ func newConnection(
requestDedicatedThread: params.requestDedicatedThread,
loggerOwner: loggerOwner,
eventsHandler: eventsHandler,
configTLS: params.tls.Config,
}
for i := 0; i < queueSize; i++ {
c.seqs <- uint32(i)
Expand Down Expand Up @@ -259,15 +264,21 @@ func (c *connectionImpl) deadlineTicker() {
}

func (c *connectionImpl) connect(ctx context.Context, dsn *url.URL) (err error) {
var d net.Dialer
if dsn.Scheme == "cproto" {
if c.conn, err = d.DialContext(ctx, "tcp", dsn.Host); err != nil {
var netDialer net.Dialer
if dsn.Scheme == "cprotos" {
tlsDialer := tls.Dialer{Config: c.configTLS, NetDialer: &netDialer}
if c.conn, err = tlsDialer.DialContext(ctx, "tcp", dsn.Host); err != nil {
return err
}
c.conn.(*tls.Conn).NetConn().(*net.TCPConn).SetNoDelay(true)
} else if dsn.Scheme == "cproto" {
if c.conn, err = netDialer.DialContext(ctx, "tcp", dsn.Host); err != nil {
return err
}
c.conn.(*net.TCPConn).SetNoDelay(true)
} else {
d.LocalAddr = nil
if c.conn, err = d.DialContext(ctx, "unix", dsn.Host); err != nil {
netDialer.LocalAddr = nil
if c.conn, err = netDialer.DialContext(ctx, "unix", dsn.Host); err != nil {
return err
}
}
Expand Down
7 changes: 6 additions & 1 deletion bindings/cproto/cproto.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ var emptyLogger bindings.NullLogger
func init() {
rand.Seed(time.Now().UnixNano())
bindings.RegisterBinding("cproto", new(NetCProto))
bindings.RegisterBinding("cprotos", new(NetCProto))
if runtime.GOOS != "windows" {
bindings.RegisterBinding("ucproto", new(NetCProto))
}
Expand Down Expand Up @@ -88,6 +89,7 @@ type NetCProto struct {
lock sync.RWMutex
logger bindings.Logger
logMtx sync.RWMutex
tls bindings.OptionTLS
}

type dsn struct {
Expand Down Expand Up @@ -310,6 +312,8 @@ func (binding *NetCProto) Init(u []url.URL, eh bindings.EventsHandler, options .
case bindings.OptionReconnectionStrategy:
binding.dsn.reconnectionStrategy = v.Strategy
binding.dsn.allowUnknownNodes = v.AllowUnknownNodes
case bindings.OptionTLS:
binding.tls = v

default:
fmt.Printf("Unknown cproto option: %#v\n", option)
Expand Down Expand Up @@ -396,7 +400,8 @@ func (binding *NetCProto) createConnParams() newConnParams {
appName: binding.appName,
enableCompression: binding.compression.EnableCompression,
requestDedicatedThread: binding.dedicatedThreads.DedicatedThreads,
caps: binding.caps}
caps: binding.caps,
tls: binding.tls}
}

func (binding *NetCProto) createEventConn(ctx context.Context, connParams newConnParams, eventsSubOptsJSON []byte) error {
Expand Down
5 changes: 5 additions & 0 deletions bindings/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package bindings

import (
"context"
"crypto/tls"
"net/url"
"time"

Expand Down Expand Up @@ -384,6 +385,10 @@ type OptionReconnectionStrategy struct {
AllowUnknownNodes bool
}

type OptionTLS struct {
Config *tls.Config
}

type Status struct {
Err error
CProto StatusCProto
Expand Down
26 changes: 25 additions & 1 deletion cpp_src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ option(ENABLE_SSE "Enable SSE instructions" ON)
option(ENABLE_SERVER_AS_PROCESS_IN_TEST "Run reindexer servers as separate processes in tests" OFF)
option(ENABLE_V3_FOLLOWERS "Enable compatibility mode with reindexer v3 followers. This is temporary flag and will be removed in further releases" OFF)

if(APPLE)
option(ENABLE_OPENSSL "Enable OpenSSL" OFF)
else()
option(ENABLE_OPENSSL "Enable OpenSSL" ON)
endif()

if(NOT GRPC_PACKAGE_PROVIDER)
set(GRPC_PACKAGE_PROVIDER "CONFIG")
Expand Down Expand Up @@ -308,6 +313,7 @@ add_library(${TARGET} STATIC ${HDRS} ${SRCS} ${VENDORS})
add_definitions(-DREINDEX_CORE_BUILD=1)
add_definitions(-DFMT_HEADER_ONLY=1)
add_definitions(-DSPDLOG_FMT_EXTERNAL=1)
add_definitions(-DFMT_USE_FULL_CACHE_DRAGONBOX=1)

# add_definitions(-DREINDEX_FT_EXTRA_DEBUG=1)
if (ENABLE_SERVER_AS_PROCESS_IN_TEST)
Expand Down Expand Up @@ -473,6 +479,13 @@ set(THREADS_PREFER_PTHREAD_FLAG TRUE)
find_package(Threads REQUIRED ON)
list(APPEND REINDEXER_LIBRARIES ${CMAKE_THREAD_LIBS_INIT} )

include(CMakeRC)

file(GLOB CHINA_DICT LIST_DIRECTORIES false ${REINDEXER_SOURCE_PATH}/resource/china_dict/*.lex)
cmrc_add_resource_library(friso_dict_resources WHENCE ${REINDEXER_SOURCE_PATH}/resource ${CHINA_DICT})
list(APPEND REINDEXER_LIBRARIES friso_dict_resources)
add_dependencies(${TARGET} friso_dict_resources)

if(WITH_CPPTRACE)
ExternalProject_Add(
cpptrace_lib
Expand Down Expand Up @@ -559,6 +572,18 @@ if(WIN32)
list(APPEND REINDEXER_LIBRARIES shlwapi dbghelp ws2_32)
endif()

if (MSVC)
add_definitions(-DNOMINMAX)
endif()

if(ENABLE_OPENSSL)
find_package(OpenSSL)
if(OPENSSL_FOUND)
include_directories(SYSTEM ${OPENSSL_INCLUDE_DIR})
add_definitions(-DWITH_OPENSSL)
endif()
endif()

set(REINDEXER_LIBRARIES_GLOBAL ${REINDEXER_LIBRARIES} PARENT_SCOPE)
set(REINDEXER_LINK_DIRECTORIES_GLOBAL ${REINDEXER_LINK_DIRECTORIES} PARENT_SCOPE)

Expand All @@ -580,7 +605,6 @@ endif()
if(NOT REINDEXER_VERSION_CUR_H STREQUAL REINDEXER_VERSION_H)
file(WRITE ${PROJECT_BINARY_DIR}/reindexer_version.h ${REINDEXER_VERSION_H})
endif()

include_directories(${PROJECT_BINARY_DIR})

string ( REGEX REPLACE "v([0-9]+)\\.([0-9]+)\\.([0-9]+)(.*)" "\\1.\\2.\\3" REINDEXER_VERSION ${REINDEXER_VERSION_FULL})
Expand Down
5 changes: 1 addition & 4 deletions cpp_src/client/connectopts.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@
namespace reindexer {
namespace client {

enum ConnectOpt {
kConnectOptCreateIfMissing = 1 << 0,
kConnectOptCheckClusterID = 1 << 1,
};
enum ConnectOpt { kConnectOptCreateIfMissing = 1 << 0, kConnectOptCheckClusterID = 1 << 1 };

struct ConnectOpts {
bool IsCreateDBIfMissing() const { return options & kConnectOptCreateIfMissing; }
Expand Down
8 changes: 6 additions & 2 deletions cpp_src/client/cororeindexer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ CoroReindexer& CoroReindexer::operator=(CoroReindexer&& rdx) noexcept {
}

Error CoroReindexer::Connect(const std::string& dsn, net::ev::dynamic_loop& loop, const ConnectOpts& opts) {
return Connect(DSN(dsn), loop, opts);
}
Error CoroReindexer::Connect(const DSN& dsn, net::ev::dynamic_loop& loop, const ConnectOpts& opts) {
return impl_->Connect(dsn, loop, opts);
}
void CoroReindexer::Stop() { impl_->Stop(); }
Expand Down Expand Up @@ -110,8 +113,9 @@ int64_t CoroReindexer::AddConnectionStateObserver(CoroReindexer::ConnectionState
}
Error CoroReindexer::RemoveConnectionStateObserver(int64_t id) { return impl_->RemoveConnectionStateObserver(id); }

[[nodiscard]] Error CoroReindexer::ShardingControlRequest(const sharding::ShardingControlRequestData& request) noexcept {
return impl_->ShardingControlRequest(request, ctx_);
Error CoroReindexer::ShardingControlRequest(const sharding::ShardingControlRequestData& request,
sharding::ShardingControlResponseData& response) noexcept {
return impl_->ShardingControlRequest(request, response, ctx_);
}

} // namespace client
Expand Down
13 changes: 10 additions & 3 deletions cpp_src/client/cororeindexer.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#pragma once

#include <chrono>
#include "client/coroqueryresults.h"
#include "client/corotransaction.h"
#include "client/internalrdxcontext.h"
Expand All @@ -16,10 +15,12 @@ class SnapshotChunk;
struct SnapshotOpts;
struct ReplicationStateV2;
struct ClusterizationStatus;
class DSN;

namespace sharding {
struct ShardingControlRequestData;
}
struct ShardingControlResponseData;
} // namespace sharding

namespace client {

Expand Down Expand Up @@ -50,6 +51,7 @@ class CoroReindexer {
/// @param loop - event loop for connections and coroutines handling
/// @param opts - Connect options. May contain any of <br>
Error Connect(const std::string& dsn, net::ev::dynamic_loop& loop, const client::ConnectOpts& opts = client::ConnectOpts());
Error Connect(const DSN& dsn, net::ev::dynamic_loop& loop, const client::ConnectOpts& opts = client::ConnectOpts());
/// Stop - shutdown connector
void Stop();
/// Open or create namespace
Expand Down Expand Up @@ -246,7 +248,9 @@ class CoroReindexer {

/// Execute sharding control request during the sharding config change
/// @param request - control params
[[nodiscard]] Error ShardingControlRequest(const sharding::ShardingControlRequestData& request) noexcept;
/// @param response - control response
Error ShardingControlRequest(const sharding::ShardingControlRequestData& request,
sharding::ShardingControlResponseData& response) noexcept;

/// Add cancelable context
/// @param cancelCtx - context pointer
Expand All @@ -260,6 +264,9 @@ class CoroReindexer {
/// Add shard info
/// @param id - shard id
CoroReindexer WithShardId(int id, bool parallel) { return {impl_, ctx_.WithShardId(id, parallel)}; }
/// Add emmiter server id
/// @param serverId - emmiter server id
CoroReindexer WithEmmiterServerId(int serverId) { return {impl_, ctx_.WithEmmiterServerId(serverId)}; }

typedef CoroQueryResults QueryResultsT;
typedef Item ItemT;
Expand Down
5 changes: 3 additions & 2 deletions cpp_src/client/corotransaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -204,13 +204,14 @@ Item CoroTransaction::NewItem(ClientT* client) {
template Item CoroTransaction::NewItem<ReindexerImpl>(ReindexerImpl* client);

CoroTransaction::Impl::Impl(RPCClient* rpcClient, int64_t txId, std::chrono::milliseconds requestTimeout,
std::chrono::milliseconds execTimeout, Namespace* ns) noexcept
std::chrono::milliseconds execTimeout, Namespace* ns, int emmiterServerId) noexcept
: txId_(txId),
rpcClient_(rpcClient),
requestTimeout_(requestTimeout),
execTimeout_(execTimeout),
localTm_(std::make_unique<TagsMatcher>(ns->GetTagsMatcher())),
ns_(ns) {
ns_(ns),
emmiterServerId_(emmiterServerId) {
assert(rpcClient_);
assert(ns_);
const auto sessinTsOpt = rpcClient_->conn_.LoginTs();
Expand Down
7 changes: 4 additions & 3 deletions cpp_src/client/corotransaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ class CoroTransaction {
friend class Transaction;
explicit CoroTransaction(Error status) noexcept : i_(std::move(status)) {}
CoroTransaction(RPCClient* rpcClient, int64_t txId, std::chrono::milliseconds requestTimeout, std::chrono::milliseconds execTimeout,
Namespace* ns) noexcept
: i_(rpcClient, txId, requestTimeout, execTimeout, ns) {}
Namespace* ns, int emmiterServerId) noexcept
: i_(rpcClient, txId, requestTimeout, execTimeout, ns, emmiterServerId) {}

Error addTxItem(Item&& item, ItemModifyMode mode, lsn_t lsn);
Error addTxItemRaw(std::string_view cjson, ItemModifyMode mode, lsn_t lsn);
Expand All @@ -74,7 +74,7 @@ class CoroTransaction {

struct Impl {
Impl(RPCClient* rpcClient, int64_t txId, std::chrono::milliseconds requestTimeout, std::chrono::milliseconds execTimeout,
Namespace* ns) noexcept;
Namespace* ns, int emmiterServerId) noexcept;
Impl(Error&& status) noexcept;
Impl(Impl&&) noexcept;
Impl& operator=(Impl&&) noexcept;
Expand All @@ -88,6 +88,7 @@ class CoroTransaction {
std::unique_ptr<TagsMatcher> localTm_;
Namespace* ns_{nullptr};
steady_clock_w::time_point sessionTs_;
int emmiterServerId_ = -1;
};

Impl i_;
Expand Down
Loading

0 comments on commit 1d87bca

Please sign in to comment.