Skip to content

Commit

Permalink
Update to version v4.13.0
Browse files Browse the repository at this point in the history
  • Loading branch information
IISannikov committed Dec 22, 2023
1 parent d0afc09 commit 652d165
Show file tree
Hide file tree
Showing 451 changed files with 20,309 additions and 10,082 deletions.
5 changes: 2 additions & 3 deletions .clang-format
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ AlignAfterOpenBracket: Align
AlignConsecutiveAssignments: false
AlignConsecutiveDeclarations: false
AlignEscapedNewlinesLeft: true
AlignOperands: true
AlignOperands: true
AlignTrailingComments: true
AllowAllParametersOfDeclarationOnNextLine: true
AllowShortBlocksOnASingleLine: false
Expand All @@ -20,7 +20,7 @@ AlwaysBreakBeforeMultilineStrings: true
AlwaysBreakTemplateDeclarations: true
BinPackArguments: true
BinPackParameters: true
BraceWrapping:
BraceWrapping:
AfterClass: false
AfterControlStatement: false
AfterEnum: false
Expand All @@ -45,7 +45,6 @@ ConstructorInitializerIndentWidth: 4
ContinuationIndentWidth: 4
Cpp11BracedListStyle: true
DerivePointerAlignment: true
PointerAlignment: Left
DisableFormat: false
ExperimentalAutoDetectBinPacking: false
ForEachMacros: [ foreach, Q_FOREACH, BOOST_FOREACH ]
Expand Down
73 changes: 40 additions & 33 deletions bindings.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (db *reindexerImpl) modifyItem(ctx context.Context, namespace string, ns *r

rdSer := newSerializer(out.GetBuf())
rawQueryParams := rdSer.readRawQueryParams(func(nsid int) {
ns.cjsonState.ReadPayloadType(&rdSer.Serializer)
ns.cjsonState.ReadPayloadType(&rdSer.Serializer, db.binding, ns.name)
})

if rawQueryParams.count == 0 {
Expand All @@ -67,11 +67,9 @@ func (db *reindexerImpl) modifyItem(ctx context.Context, namespace string, ns *r

resultp := rdSer.readRawtItemParams(rawQueryParams.shardId)

ns.cacheItems.Remove(cacheKey{resultp.id, resultp.shardid})

if len(precepts) > 0 && (resultp.cptr != 0 || resultp.data != nil) && reflect.TypeOf(item).Kind() == reflect.Ptr {
nsArrEntry := nsArrayEntry{ns, ns.cjsonState.Copy()}
if _, err := unpackItem(&nsArrEntry, &resultp, false, true, item); err != nil {
if _, err := unpackItem(db.binding, &nsArrEntry, &rawQueryParams, &resultp, false, true, item); err != nil {
return 0, err
}
}
Expand Down Expand Up @@ -122,49 +120,58 @@ func (db *reindexerImpl) getNS(namespace string) (*reindexerNamespace, error) {
return ns, nil
}

func unpackItem(ns *nsArrayEntry, params *rawResultItemParams, allowUnsafe bool, nonCacheableData bool, item interface{}) (interface{}, error) {
func unpackItem(bin bindings.RawBinding, ns *nsArrayEntry, rqparams *rawResultQueryParams, params *rawResultItemParams, allowUnsafe bool, nonCacheableData bool, item interface{}) (interface{}, error) {
useCache := item == nil && (ns.deepCopyIface || allowUnsafe) && !nonCacheableData
needCopy := ns.deepCopyIface && !allowUnsafe
var err error
useCache = useCache && ns.cacheItems != nil && !params.version.IsEmpty()
var nsTag int64
if useCache {
if nsTagData, ok := rqparams.nsIncarnationTags[params.shardid]; ok && len(nsTagData) > params.nsid {
nsTag = nsTagData[params.nsid]
} else {
useCache = false
}
}

if useCache && ns.cacheItems != nil && !params.version.IsEmpty() {
if citem, ok := ns.cacheItems.Get(cacheKey{params.id, params.shardid}); ok && citem.version == params.version {
if useCache {
cacheKey := cacheKey{id: params.id, shardID: params.shardid, nsTag: nsTag}
citem, found := ns.cacheItems.Get(cacheKey)
if found && citem.itemVersion == params.version.Counter && citem.shardingVersion == rqparams.shardingConfigVersion {
item = citem.item
} else {
item = reflect.New(ns.rtype).Interface()
dec := ns.localCjsonState.NewDecoder(item, logger)
dec := ns.localCjsonState.NewDecoder(item, bin)
if params.cptr != 0 {
err = dec.DecodeCPtr(params.cptr, item)
} else if params.data != nil {
err = dec.Decode(params.data, item)
} else {
panic(fmt.Errorf("Internal error while decoding item id %d from ns %s: cptr and data are both null", params.id, ns.name))
panic(fmt.Errorf("rq: internal error while decoding item id %d from ns %s: cptr and data are both null", params.id, ns.name))
}
if err != nil {
return item, err
}

if citem, ok := ns.cacheItems.Get(cacheKey{params.id, params.shardid}); ok {
if citem.version == params.version {
item = citem.item
} else if !params.version.IsCompatibleWith(citem.version) || params.version.IsNewerThen(citem.version) {
ns.cacheItems.Add(cacheKey{params.id, params.shardid}, &cacheItem{item: item, version: params.version})
}
} else {
ns.cacheItems.Add(cacheKey{params.id, params.shardid}, &cacheItem{item: item, version: params.version})
if !found || params.version.Counter > citem.itemVersion || citem.shardingVersion != rqparams.shardingConfigVersion {
ns.cacheItems.Add(cacheKey, &cacheItem{
item: item,
itemVersion: params.version.Counter,
shardingVersion: rqparams.shardingConfigVersion,
})
}
}
} else {
if item == nil {
item = reflect.New(ns.rtype).Interface()
}
dec := ns.localCjsonState.NewDecoder(item, logger)
dec := ns.localCjsonState.NewDecoder(item, bin)
if params.cptr != 0 {
err = dec.DecodeCPtr(params.cptr, item)
} else if params.data != nil {
err = dec.Decode(params.data, item)
} else {
panic(fmt.Errorf("Internal error while decoding item id %d from ns %s: cptr and data are both null", params.id, ns.name))
panic(fmt.Errorf("rq: internal error while decoding item id %d from ns %s: cptr and data are both null", params.id, ns.name))
}
if err != nil {
return item, err
Expand All @@ -177,19 +184,19 @@ func unpackItem(ns *nsArrayEntry, params *rawResultItemParams, allowUnsafe bool,
if deepCopy, ok := item.(DeepCopy); ok {
item = deepCopy.DeepCopy()
} else {
panic(fmt.Errorf("Internal error %s must implement DeepCopy interface", reflect.TypeOf(item).Name()))
panic(fmt.Errorf("rq: internal error %s must implement DeepCopy interface", reflect.TypeOf(item).Name()))
}
}

return item, err
}

func (db *reindexerImpl) rawResultToJson(rawResult []byte, jsonName string, totalName string, initJson []byte, initOffsets []int) (json []byte, offsets []int, explain []byte, err error) {
func (db *reindexerImpl) rawResultToJson(rawResult []byte, jsonName string, totalName string, initJson []byte, initOffsets []int, namespace string) (json []byte, offsets []int, explain []byte, err error) {

ser := newSerializer(rawResult)
rawQueryParams := ser.readRawQueryParams(func(nsid int) {
var state cjson.State
state.ReadPayloadType(&ser.Serializer)
state.ReadPayloadType(&ser.Serializer, db.binding, namespace)
})
explain = rawQueryParams.explainResults

Expand Down Expand Up @@ -228,7 +235,7 @@ func (db *reindexerImpl) rawResultToJson(rawResult []byte, jsonName string, tota
jsonBuf.Write(item.data)

if (rawQueryParams.flags&bindings.ResultsWithJoined) != 0 && ser.GetVarUInt() != 0 {
panic("Sorry, not implemented: Can't return join query results as json")
panic("rq: sorry, not implemented: can not return join query results as json")
}
}
jsonBuf.WriteString("]}")
Expand Down Expand Up @@ -300,7 +307,7 @@ func (db *reindexerImpl) prepareQuery(ctx context.Context, q *Query, asJson bool
result, err = db.binding.SelectQuery(ctx, ser.Bytes(), asJson, q.ptVersions, fetchCount)

if err == nil && result.GetBuf() == nil {
panic(fmt.Errorf("result.Buffer is nil"))
panic(fmt.Errorf("rq: result.Buffer is nil"))
}
return
}
Expand Down Expand Up @@ -338,7 +345,7 @@ func (db *reindexerImpl) execToJsonQuery(ctx context.Context, q *Query, jsonRoot
}
defer result.Free()
var explain []byte
q.json, q.jsonOffsets, explain, err = db.rawResultToJson(result.GetBuf(), jsonRoot, q.totalName, q.json, q.jsonOffsets)
q.json, q.jsonOffsets, explain, err = db.rawResultToJson(result.GetBuf(), jsonRoot, q.totalName, q.json, q.jsonOffsets, q.Namespace)
if err != nil {
return errJSONIterator(err)
}
Expand Down Expand Up @@ -388,16 +395,14 @@ func (db *reindexerImpl) deleteQuery(ctx context.Context, q *Query) (int, error)
ser := newSerializer(result.GetBuf())
// skip total count
rawQueryParams := ser.readRawQueryParams(func(nsid int) {
ns.cjsonState.ReadPayloadType(&ser.Serializer)
ns.cjsonState.ReadPayloadType(&ser.Serializer, db.binding, ns.name)
})

for i := 0; i < rawQueryParams.count; i++ {
params := ser.readRawtItemParams(rawQueryParams.shardId)
_ = ser.readRawtItemParams(rawQueryParams.shardId)
if (rawQueryParams.flags&bindings.ResultsWithJoined) != 0 && ser.GetVarUInt() != 0 {
panic("Internal error: joined items in delete query result")
}
// Update cache
ns.cacheItems.Remove(cacheKey{params.id, params.shardid})
}
if !ser.Eof() {
panic("Internal error: data after end of delete query result")
Expand Down Expand Up @@ -429,16 +434,14 @@ func (db *reindexerImpl) updateQuery(ctx context.Context, q *Query) *Iterator {
ser := newSerializer(result.GetBuf())
// skip total count
rawQueryParams := ser.readRawQueryParams(func(nsid int) {
ns.cjsonState.ReadPayloadType(&ser.Serializer)
ns.cjsonState.ReadPayloadType(&ser.Serializer, db.binding, ns.name)
})

for i := 0; i < rawQueryParams.count; i++ {
params := ser.readRawtItemParams(rawQueryParams.shardId)
_ = ser.readRawtItemParams(rawQueryParams.shardId)
if (rawQueryParams.flags&bindings.ResultsWithJoined) != 0 && ser.GetVarUInt() != 0 {
panic("Internal error: joined items in update query result")
}
// Update cache
ns.cacheItems.Remove(cacheKey{params.id, params.shardid})
}

if !ser.Eof() {
Expand Down Expand Up @@ -547,6 +550,10 @@ func WithOpenTelemetry() interface{} {
return bindings.OptionOpenTelemetry{EnableTracing: true}
}

func WithStrictJoinHandlers() interface{} {
return bindings.OptionStrictJoinHandlers{EnableStrictJoinHandlers: true}
}

// WithReconnectionStrategy allows to configure the behavior during reconnect after error.
// Strategy used for reconnect to server on connection error
// AllowUnknownNodes allows to add dsn from cluster node, that was not set in client dsn list
Expand Down
31 changes: 15 additions & 16 deletions bindings/builtin/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@ type Logger interface {
// Separate mutexes for logger object itself and for reindexer_enable_logger call:
// logMtx provides safe access to the logger
// logEnableMtx provides atomic logic for (enable + set) and (disable + reset) procedures
// This logger is global to easily export it into CGO (however it may lead to some confusion if there are multiple builtin instances in the app)
var logMtx sync.RWMutex
var logEnableMtx sync.Mutex
var logger Logger
var emptyLogger bindings.NullLogger

var enableDebug bool

Expand Down Expand Up @@ -235,7 +237,10 @@ func (binding *Builtin) Init(u []url.URL, options ...interface{}) error {
options: C.uint16_t(connectOptions.Opts),
}

caps := *bindings.DefaultBindingCapabilities().WithResultsWithShardIDs(true).WithQrIdleTimeouts(true)
caps := *bindings.DefaultBindingCapabilities().
WithResultsWithShardIDs(true).
WithQrIdleTimeouts(true).
WithIncarnationTags(true)
ccaps := C.BindingCapabilities{
caps: C.int64_t(caps.Value),
}
Expand Down Expand Up @@ -399,21 +404,6 @@ func (binding *Builtin) RenameNamespace(ctx context.Context, srcNs string, dstNs
return err2go(C.reindexer_rename_namespace(binding.rx, str2c(srcNs), str2c(dstNs), ctxInfo.cCtx))
}

func (binding *Builtin) EnableStorage(ctx context.Context, path string) error {
l := len(path)
if l > 0 && path[l-1] != '/' {
path += "/"
}

ctxInfo, err := binding.StartWatchOnCtx(ctx)
if err != nil {
return err
}
defer binding.ctxWatcher.StopWatchOnCtx(ctxInfo)

return err2go(C.reindexer_enable_storage(binding.rx, str2c(path), ctxInfo.cCtx))
}

func (binding *Builtin) AddIndex(ctx context.Context, namespace string, indexDef bindings.IndexDef) error {
bIndexDef, err := json.Marshal(indexDef)
if err != nil {
Expand Down Expand Up @@ -630,6 +620,15 @@ func (binding *Builtin) DisableLogger() {
binding.setLogger(nil)
}

func (binding *Builtin) GetLogger() bindings.Logger {
logMtx.RLock()
defer logMtx.RUnlock()
if logger != nil {
return logger
}
return &emptyLogger
}

func (binding *Builtin) ReopenLogFiles() error {
fmt.Println("builtin binding ReopenLogFiles method is dummy")
return nil
Expand Down
9 changes: 9 additions & 0 deletions bindings/builtin/windows_config.go.in
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// +build windows
//go:generate cmd /c cd ..\.. && mkdir build & cd build && cmake -G "MinGW Makefiles" -DCMAKE_BUILD_TYPE=Release .. && cmake --build . --target reindexer -- -j4

package builtin

// #cgo CXXFLAGS: -std=c++17 -g -O2 -Wall -Wpedantic -Wextra -I../../cpp_src
// #cgo CFLAGS: -std=c99 -g -O2 -Wall -Wpedantic -Wno-unused-variable -I../../cpp_src
// #cgo LDFLAGS: -L${SRCDIR}/../../build/cpp_src/ @cgo_ld_flags@ -g -lshlwapi -ldbghelp -lws2_32
import "C"
8 changes: 4 additions & 4 deletions bindings/builtinserver/builtinserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,6 @@ func (server *BuiltinServer) RenameNamespace(ctx context.Context, srcNs string,
return server.builtin.RenameNamespace(ctx, srcNs, dstNs)
}

func (server *BuiltinServer) EnableStorage(ctx context.Context, namespace string) error {
return server.builtin.EnableStorage(ctx, namespace)
}

func (server *BuiltinServer) AddIndex(ctx context.Context, namespace string, indexDef bindings.IndexDef) error {
return server.builtin.AddIndex(ctx, namespace, indexDef)
}
Expand Down Expand Up @@ -249,6 +245,10 @@ func (server *BuiltinServer) DisableLogger() {
server.builtin.DisableLogger()
}

func (server *BuiltinServer) GetLogger() bindings.Logger {
return server.builtin.GetLogger()
}

func (server *BuiltinServer) ReopenLogFiles() error {
return err2go(C.reopen_log_files(server.svc))
}
Expand Down
25 changes: 18 additions & 7 deletions bindings/builtinserver/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,20 @@ type StorageConf struct {
Autorepair bool `yaml:"autorepair"`
}

const ServerThreadingDedicated = "dedicated"
const ServerThreadingShared = "shared"

type NetConf struct {
HTTPAddr string `yaml:"httpaddr"`
RPCAddr string `yaml:"rpcaddr"`
WebRoot string `yaml:"webroot"`
Security bool `yaml:"security"`
HTTPAddr string `yaml:"httpaddr"`
HTTPThreading string `yaml:"http_threading"` // "dedicated" or "shared"
RPCAddr string `yaml:"rpcaddr"`
RPCThreading string `yaml:"rpc_threading"` // "dedicated" or "shared"
UnixRPCAddr string `yaml:"urpcaddr"`
UnixRPCThreading string `yaml:"urpc_threading"` // "dedicated" or "shared"
WebRoot string `yaml:"webroot"`
Security bool `yaml:"security"`
HttpReadTimeoutSec int `yaml:"http_read_timeout,omitempty"`
HttpWriteTimeoutSec int `yaml:"http_write_timeout,omitempty"`
}

type LoggerConf struct {
Expand Down Expand Up @@ -70,9 +79,11 @@ func DefaultServerConfig() *ServerConfig {
Autorepair: false,
},
Net: NetConf{
HTTPAddr: "0.0.0.0:9088",
RPCAddr: "0.0.0.0:6534",
Security: false,
HTTPAddr: "0.0.0.0:9088",
HTTPThreading: "shared",
RPCAddr: "0.0.0.0:6534",
RPCThreading: "shared",
Security: false,
},
Logger: LoggerConf{
ServerLog: "stdout",
Expand Down
9 changes: 9 additions & 0 deletions bindings/builtinserver/windows_config.go.in
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// +build windows
//go:generate cmd /c cd ..\.. && mkdir build && cd build && cmake -G "MinGW Makefiles" -DLINK_RESOURCES=On -DCMAKE_BUILD_TYPE=Release .. && cmake --build . --target reindexer reindexer_server_library -- -j4 ${CMAKE_BUILD_ARGS}

package builtinserver

// #cgo CXXFLAGS: -std=c++17 -g -O2 -Wall -Wpedantic -Wextra -I../../cpp_src
// #cgo CFLAGS: -std=c99 -g -O2 -Wall -Wpedantic -Wno-unused-variable -I../../cpp_src
// #cgo LDFLAGS: -L${SRCDIR}/../../build/cpp_src/ -L${SRCDIR}/../../build/cpp_src/server/ @cgo_ld_flags@ -g -lstdc++ -lshlwapi -ldbghelp -lws2_32
import "C"
Loading

0 comments on commit 652d165

Please sign in to comment.