Skip to content

Commit

Permalink
Merge branch 'main' into 1129-fix-panic
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Nov 29, 2024
2 parents 0c5126f + 03635a8 commit 20ee891
Show file tree
Hide file tree
Showing 55 changed files with 128,113 additions and 1,061 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ Contents
MatrixOne is a hyper-converged cloud & edge native distributed database with a structure that separates storage, computation, and transactions to form a consolidated HSTAP data engine. This engine enables a single database system to accommodate diverse business loads such as OLTP, OLAP, and stream computing. It also supports deployment and utilization across public, private, and edge clouds, ensuring compatibility with diverse infrastructures.

<p align="center">
<img alt="MatrixOne" height="450" src="https://github.com/matrixorigin/artwork/blob/main/docs/overview/architecture/archi-en-1.png?raw=true">
<img alt="MatrixOne" height="450" src="https://github.com/matrixorigin/artwork/blob/main/docs/overview/architecture/architeture241113_en.png?raw=true">
</p>

## 🎯 <a id="key-features">Key Features</a>
Expand Down
2 changes: 1 addition & 1 deletion README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@

MatrixOne 是一款超融合异构分布式数据库,通过云原生化和存储、计算、事务分离的架构构建 HSTAP 超融合数据引擎,实现单一数据库系统支持 OLTP、OLAP、流计算等多种业务负载,并且支持公有云、私有云、边缘云部署和使用,实现异构基础设施的兼容。
<p align="center">
<img alt="MatrixOne" height="500" src="https://community-shared-data-1308875761.cos.ap-beijing.myqcloud.com/artwork/docs/overview/mo-new-arch.png?raw=true">
<img alt="MatrixOne" height="500" src="https://community-shared-data-1308875761.cos.ap-beijing.myqcloud.com/artwork/docs/overview/architecture.png?raw=true">
</p>

## 🎯 <a id="key-features">核心特性</a>
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ require (
github.com/lni/dragonboat/v4 v4.0.0-20220815145555-6f622e8bcbef
github.com/lni/goutils v1.3.1-0.20220604063047-388d67b4dbc4
github.com/lni/vfs v0.2.1-0.20220616104132-8852fd867376
github.com/matrixorigin/monlp v0.0.0-20240825091235-be436dc30e78
github.com/matrixorigin/mysql v1.8.2-0.20241106110439-6ac9ee94770d
github.com/matrixorigin/simdcsv v0.0.0-20230210060146-09b8e45209dd
github.com/minio/minio-go/v7 v7.0.78
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -503,8 +503,6 @@ github.com/matrixorigin/goutils v1.3.1-0.20220604063047-388d67b4dbc4 h1:+SmZP2bG
github.com/matrixorigin/goutils v1.3.1-0.20220604063047-388d67b4dbc4/go.mod h1:LIHvF0fflR+zyXUQFQOiHPpKANf3UIr7DFIv5CBPOoU=
github.com/matrixorigin/memberlist v0.5.1-0.20230322082342-95015c95ee76 h1:MpmqMPooJ0Ea7W4ldIGbQV4D3z+sEiCu6C6aTibiwiQ=
github.com/matrixorigin/memberlist v0.5.1-0.20230322082342-95015c95ee76/go.mod h1:yvyXLpo0QaGE59Y7hDTsTzDD25JYBZ4mHgHUZ8lrOI0=
github.com/matrixorigin/monlp v0.0.0-20240825091235-be436dc30e78 h1:1NvZ4SBw0lH7h38VhCVxYEa61K8N+0DBv9JQhAwU48Q=
github.com/matrixorigin/monlp v0.0.0-20240825091235-be436dc30e78/go.mod h1:RQQhaM4xSocKuNi0ZvKZZAiErpINJgZrPB+vZDvBkeU=
github.com/matrixorigin/mysql v1.8.2-0.20241106110439-6ac9ee94770d h1:27vD3JGbrFmaQtDYQT/W1jFFr0xvipdwH5R4bZPGQdE=
github.com/matrixorigin/mysql v1.8.2-0.20241106110439-6ac9ee94770d/go.mod h1:RJNMd/LBgWRCpGanqXvqjVaoYXeYBS+i0MSeoN3hBMo=
github.com/matrixorigin/simdcsv v0.0.0-20230210060146-09b8e45209dd h1:DvqhuH3kOpsE6vXZA5WEaRNAUUUcf44S1p5VInbjdfU=
Expand Down
2 changes: 1 addition & 1 deletion pkg/catalog/secondary_index_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func fullTextIndexParamsToMap(def *tree.FullTextIndex) (map[string]string, error
// fulltext index here
if def.IndexOption != nil {
parsername := strings.ToLower(def.IndexOption.ParserName)
if parsername != "ngram" && parsername != "default" && parsername != "json" {
if parsername != "ngram" && parsername != "default" && parsername != "json" && parsername != "json_value" {
return nil, moerr.NewInternalErrorNoCtx(fmt.Sprintf("invalid parser %s", parsername))
}
res["parser"] = parsername
Expand Down
26 changes: 17 additions & 9 deletions pkg/container/bytejson/fttokenizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,39 @@ package bytejson
import (
"iter"
"strconv"
)

"github.com/matrixorigin/monlp/tokenizer"
const (
MAX_TOKEN_SIZE = 127
)

type Token struct {
TokenBytes [1 + MAX_TOKEN_SIZE]byte
TokenPos int32
BytePos int32
}

// TokenizeValue tokenizes the values of the ByteJson object
// note that we do not break word with space, do not normalize
// case, 3-gram, etc etc, only truncate the string to 23 bytes.
func (bj ByteJson) TokenizeValue(includeKey bool) iter.Seq[tokenizer.Token] {
return func(yield func(tokenizer.Token) bool) {
// case, 3-gram, etc etc, only truncate the string to 127 bytes.
func (bj ByteJson) TokenizeValue(includeKey bool) iter.Seq[Token] {
return func(yield func(Token) bool) {
tokenizeOne(bj, 1, includeKey, yield)
}
}

func fillToken(t *tokenizer.Token, s []byte, pos int32) {
func fillToken(t *Token, s []byte, pos int32) {
copy(t.TokenBytes[1:], s)
if len(s) > tokenizer.MAX_TOKEN_SIZE {
t.TokenBytes[0] = tokenizer.MAX_TOKEN_SIZE
if len(s) > MAX_TOKEN_SIZE {
t.TokenBytes[0] = MAX_TOKEN_SIZE
} else {
t.TokenBytes[0] = byte(len(s))
}
t.TokenPos = pos
}

func tokenizeOne(bj ByteJson, pos int32, includeKey bool, yield func(tokenizer.Token) bool) int32 {
var t tokenizer.Token
func tokenizeOne(bj ByteJson, pos int32, includeKey bool, yield func(Token) bool) int32 {
var t Token

switch bj.Type {
case TpCodeObject:
Expand Down
36 changes: 26 additions & 10 deletions pkg/container/bytejson/fttokenizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ package bytejson

import (
"encoding/json"
"fmt"
"testing"

"github.com/matrixorigin/monlp/tokenizer"
"github.com/stretchr/testify/require"
)

type tokenTestCase struct {
Expand All @@ -27,15 +28,15 @@ type tokenTestCase struct {
tokensWithKey []string
}

func checkTokens(t *testing.T, tokens []tokenizer.Token, expected []string) {
func checkTokens(t *testing.T, tokens []Token, expected []string) {
if len(tokens) != len(expected) {
t.Fatalf("expected %d tokens, got %d", len(expected), len(tokens))
}

for i := range tokens {
var tk tokenizer.Token
if len(expected[i]) > tokenizer.MAX_TOKEN_SIZE {
tk.TokenBytes[0] = byte(tokenizer.MAX_TOKEN_SIZE)
var tk Token
if len(expected[i]) > MAX_TOKEN_SIZE {
tk.TokenBytes[0] = byte(MAX_TOKEN_SIZE)
} else {
tk.TokenBytes[0] = byte(len(expected[i]))
}
Expand All @@ -61,8 +62,8 @@ func TestByteJson(t *testing.T) {
},
{
input: `{"a": [1.2, 2.0], "b": [3, true, "hello"], "c": "abcdefghijklmnopqrstuvwxyz"}`,
tokens: []string{"1.2", "2", "3", "hello", "abcdefghijklmnopqrstuvw"},
tokensWithKey: []string{"a", "1.2", "2", "b", "3", "hello", "c", "abcdefghijklmnopqrstuvw"},
tokens: []string{"1.2", "2", "3", "hello", "abcdefghijklmnopqrstuvwxyz"},
tokensWithKey: []string{"a", "1.2", "2", "b", "3", "hello", "c", "abcdefghijklmnopqrstuvwxyz"},
},
{
input: `{"a": "相见时难别亦难", "b": "I come, I see, I 征服", "c": "相见时难别亦难,东风无力百花残。 春蚕到死丝方尽,蜡炬成灰泪始干。"}`,
Expand All @@ -72,7 +73,7 @@ func TestByteJson(t *testing.T) {
{
input: `{"a bcdefghijklmnopqrstuvwxyz": 1, "学而时习之,不亦说乎": "说什么说, 就你话多"}`,
tokens: []string{"1", "说什么说, 就你话多"},
tokensWithKey: []string{"a bcdefghijklmnopqrstuv", "1", "学而时习之,不亦说乎", "说什么说, 就你话多"},
tokensWithKey: []string{"a bcdefghijklmnopqrstuvwxyz", "1", "学而时习之,不亦说乎", "说什么说, 就你话多"},
},
}

Expand All @@ -82,16 +83,31 @@ func TestByteJson(t *testing.T) {
t.Fatal(err)
}

var tokens []tokenizer.Token
var tokens []Token
for tk := range bj.TokenizeValue(false) {
tokens = append(tokens, tk)
}
checkTokens(t, tokens, tc.tokens)

var tokensWithKey []tokenizer.Token
var tokensWithKey []Token
for tk := range bj.TokenizeValue(true) {
tokensWithKey = append(tokensWithKey, tk)
}
checkTokens(t, tokensWithKey, tc.tokensWithKey)
}
}

func TestFillToken(t *testing.T) {
var tok Token
lv := "1234567890"
fmt.Printf("%s %d\n", lv, len(lv))

fillToken(&tok, []byte(lv), 0)
require.Equal(t, 10, int(tok.TokenBytes[0]))

for i := 0; i < 20; i++ {
lv += lv
}
fillToken(&tok, []byte(lv), 0)
require.Equal(t, 127, int(tok.TokenBytes[0]))
}
2 changes: 1 addition & 1 deletion pkg/fulltext/fulltext.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import (
"strings"

"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/monlp/tokenizer"
"github.com/matrixorigin/matrixone/pkg/sql/parsers/tree"
"github.com/matrixorigin/monlp/tokenizer"
)

/*
Expand Down
4 changes: 0 additions & 4 deletions pkg/lockservice/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,6 @@ func (l Lock) release() {

func (l Lock) closeWaiter(w *waiter, logger *log.MOLogger) bool {
canRemove := func() bool {
if !l.isLockRow() {
panic("BUG: range lock cannot call closeWaiter")
}

if l.holders.size() > 0 {
return false
}
Expand Down
40 changes: 39 additions & 1 deletion pkg/lockservice/lock_table_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ type localLockTable struct {

options struct {
beforeCloseFirstWaiter func(c *lockContext)
beforeWait func(c *lockContext) func()
afterWait func(c *lockContext) func()
}
}

Expand Down Expand Up @@ -97,6 +99,7 @@ func (l *localLockTable) doLock(
c *lockContext,
blocked bool) {
var old *waiter
var oldOffset int
var err error
table := l.bind.Table
for {
Expand Down Expand Up @@ -129,6 +132,14 @@ func (l *localLockTable) doLock(
return
}

if oldOffset != c.offset {
if old != nil {
old.disableNotify()
old.close("doLock, lock next row", l.logger)
}
c.txn.clearBlocked(old, l.logger)
}

// we handle remote lock on current rpc io read goroutine, so we can not wait here, otherwise
// the rpc will be blocked.
if c.opts.async {
Expand All @@ -139,10 +150,21 @@ func (l *localLockTable) doLock(
// txn is locked by service.lock or service_remote.lock. We must unlock here. And lock again after
// wait result. Because during wait, deadlock detection may be triggered, and need call txn.fetchWhoWaitingMe,
// or other concurrent txn method.
oldOffset = c.offset
oldTxnID := c.txn.txnID
old = c.w
c.txn.Unlock()

if l.options.beforeWait != nil {
l.options.beforeWait(c)()
}

v := c.w.wait(c.ctx, l.logger)

if l.options.afterWait != nil {
l.options.afterWait(c)()
}

c.txn.Lock()

logLocalLockWaitOnResult(l.logger, c.txn, table, c.rows[c.idx], c.opts, c.w, v)
Expand Down Expand Up @@ -474,7 +496,8 @@ func (l *localLockTable) addRowLockLocked(
func (l *localLockTable) handleLockConflictLocked(
c *lockContext,
key []byte,
conflictWith Lock) error {
conflictWith Lock,
) error {
if c.opts.Policy == pb.WaitPolicy_FastFail {
return ErrLockConflict
}
Expand Down Expand Up @@ -502,6 +525,21 @@ func (l *localLockTable) handleLockConflictLocked(
// waiter added, we need to active deadlock check.
c.txn.setBlocked(c.w, l.logger)
logLocalLockWaitOn(l.logger, c.txn, l.bind.Table, c.w, key, conflictWith)

if c.opts.Granularity != pb.Granularity_Range {
return nil
}

if len(c.rangeLastWaitKey) > 0 {
v, ok := l.mu.store.Get(c.rangeLastWaitKey)
if !ok {
panic("BUG: missing range last wait key")
}
if ok && v.closeWaiter(c.w, l.logger) {
l.mu.store.Delete(c.rangeLastWaitKey)
}
}
c.rangeLastWaitKey = key
return nil
}

Expand Down
Loading

0 comments on commit 20ee891

Please sign in to comment.