Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor mo_table_size/rows. #20222

Open
wants to merge 48 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
e8d93a5
table stats, got size, rows
gouhongshen Oct 31, 2024
09bebe9
update table stats task
gouhongshen Nov 1, 2024
649e05a
create & upgrade mo_table_stats
gouhongshen Nov 1, 2024
afe6b43
write stats to table
gouhongshen Nov 1, 2024
6edf29a
size, rows done
gouhongshen Nov 7, 2024
5efd574
fix non-sys visit mo_table_stats
gouhongshen Nov 7, 2024
8f8745e
alpha task, beta task, force update by query stats
gouhongshen Nov 8, 2024
4d70b70
stats config
gouhongshen Nov 10, 2024
8ed8308
debug commit, log batch mo table size
gouhongshen Nov 12, 2024
e57533b
adding variable, config for force_update and use old impl
gouhongshen Nov 14, 2024
e2a2b15
err returned cnt not correct
gouhongshen Nov 15, 2024
e484f84
fix view skip causing err cnt not match
gouhongshen Nov 16, 2024
a581280
all basic functions done
gouhongshen Nov 18, 2024
be6c6f4
different from to get change list
gouhongshen Nov 18, 2024
80b6e9a
different from done
gouhongshen Nov 18, 2024
b6f8df9
optimization
gouhongshen Nov 19, 2024
548d133
gama task, mo ctl, reset update time
gouhongshen Nov 20, 2024
cffb660
fix conflict
gouhongshen Nov 20, 2024
3a94c1b
adding metrics for mo table stats task
gouhongshen Nov 20, 2024
0eae313
fix sca
gouhongshen Nov 20, 2024
2e5b751
fix conflict
gouhongshen Nov 20, 2024
e86b98d
fix bvt
gouhongshen Nov 20, 2024
288c8a2
fix bvt, data race
gouhongshen Nov 21, 2024
b36c13d
fix bvt, data race
gouhongshen Nov 21, 2024
e7f0334
fix data race
gouhongshen Nov 21, 2024
b1ebc5e
gama task done
gouhongshen Nov 21, 2024
62993c2
fix data race
gouhongshen Nov 21, 2024
7337be9
fix gama
gouhongshen Nov 22, 2024
e3d868e
adding bvt
gouhongshen Nov 22, 2024
bbc9834
fix send to closed channel
gouhongshen Nov 22, 2024
284d6da
fix conflict
gouhongshen Nov 24, 2024
17e0174
add ut coverage
gouhongshen Nov 24, 2024
a193b52
fix bvt, ut
gouhongshen Nov 24, 2024
9e5a117
add ut coverage
gouhongshen Nov 25, 2024
0a25c41
update
gouhongshen Nov 25, 2024
6c56861
fix sca
gouhongshen Nov 25, 2024
38003a9
fix sca
gouhongshen Nov 25, 2024
19ea706
fix ctx cancel beta task
gouhongshen Nov 25, 2024
ba02720
update
gouhongshen Nov 25, 2024
7095a97
fix conflict
gouhongshen Nov 25, 2024
f580d49
fix data race
gouhongshen Nov 25, 2024
27d1805
fix sca
gouhongshen Nov 25, 2024
ec25d4d
fix sca
gouhongshen Nov 25, 2024
200128c
adding ut coverage
gouhongshen Nov 26, 2024
5032686
fix sca
gouhongshen Nov 26, 2024
c510af7
more ut
gouhongshen Nov 26, 2024
4e0731d
fix allocation
gouhongshen Nov 27, 2024
06cb4cd
fix race
gouhongshen Nov 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,31 @@ package v2_0_1

import (
"fmt"

"github.com/matrixorigin/matrixone/pkg/bootstrap/versions"
"github.com/matrixorigin/matrixone/pkg/catalog"
"github.com/matrixorigin/matrixone/pkg/frontend"
"github.com/matrixorigin/matrixone/pkg/util/executor"
)

var needMigrateMoPubs = false

var clusterUpgEntries = []versions.UpgradeEntry{
upg_mo_table_stats,
upg_mo_pubs_add_account_id_column,
upg_mo_cdc_task,
upg_mo_cdc_watermark,
}

var upg_mo_table_stats = versions.UpgradeEntry{
Schema: catalog.MO_CATALOG,
TableName: catalog.MO_TABLE_STATS,
UpgType: versions.CREATE_NEW_TABLE,
UpgSql: frontend.MoCatalogMoTableStatsDDL,
CheckFunc: func(txn executor.TxnExecutor, accountId uint32) (bool, error) {
return versions.CheckTableDefinition(txn, accountId, catalog.MO_CATALOG, catalog.MO_TABLE_STATS)
},
}

var upg_mo_pubs_add_account_id_column = versions.UpgradeEntry{
Schema: catalog.MO_CATALOG,
TableName: catalog.MO_PUBS,
Expand Down
2 changes: 2 additions & 0 deletions pkg/catalog/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ const (
MO_CDC_WATERMARK = "mo_cdc_watermark"

MO_DATA_KEY = "mo_data_key"

MO_TABLE_STATS = "mo_table_stats_alpha"
)

func IsSystemTable(id uint64) bool {
Expand Down
8 changes: 8 additions & 0 deletions pkg/cnservice/distributed_tae.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import (
"github.com/matrixorigin/matrixone/pkg/config"
"github.com/matrixorigin/matrixone/pkg/defines"
"github.com/matrixorigin/matrixone/pkg/fileservice"
"github.com/matrixorigin/matrixone/pkg/frontend"
"github.com/matrixorigin/matrixone/pkg/sql/colexec"
ie "github.com/matrixorigin/matrixone/pkg/util/internalExecutor"
"github.com/matrixorigin/matrixone/pkg/util/status"
"github.com/matrixorigin/matrixone/pkg/vm/engine/disttae"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio"
Expand Down Expand Up @@ -56,6 +58,10 @@ func (s *service) initDistributedTAE(
// start I/O pipeline
blockio.Start(s.cfg.UUID)

internalExecutorFactory := func() ie.InternalExecutor {
return frontend.NewInternalExecutor(s.cfg.UUID)
}

// engine
distributeTaeMp, err := mpool.NewMPool("distributed_tae", 0, mpool.NoFixed)
if err != nil {
Expand All @@ -74,6 +80,8 @@ func (s *service) initDistributedTAE(

disttae.WithCNTransferTxnLifespanThreshold(
s.cfg.Engine.CNTransferTxnLifespanThreshold),
disttae.WithMoTableStats(s.cfg.Engine.Stats),
disttae.WithSQLExecFunc(internalExecutorFactory),
)
pu.StorageEngine = s.storeEngine

Expand Down
13 changes: 11 additions & 2 deletions pkg/cnservice/server_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ import (
"runtime/debug"
"strings"

"go.uber.org/zap"

"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/common/morpc"
"github.com/matrixorigin/matrixone/pkg/common/system"
Expand All @@ -40,6 +38,8 @@ import (
qclient "github.com/matrixorigin/matrixone/pkg/queryservice/client"
"github.com/matrixorigin/matrixone/pkg/sql/plan/function/ctl"
"github.com/matrixorigin/matrixone/pkg/txn/client"
"github.com/matrixorigin/matrixone/pkg/vm/engine/disttae"
"go.uber.org/zap"
)

func (s *service) initQueryService() error {
Expand Down Expand Up @@ -96,6 +96,7 @@ func (s *service) initQueryCommandHandler() {
s.queryService.AddHandleFunc(query.CmdMethod_FileServiceCacheEvict, s.handleFileServiceCacheEvictRequest, false)
s.queryService.AddHandleFunc(query.CmdMethod_MetadataCache, s.handleMetadataCacheRequest, false)
s.queryService.AddHandleFunc(query.CmdMethod_FaultInjection, s.handleFaultInjection, false)
s.queryService.AddHandleFunc(query.CmdMethod_CtlMoTableStats, s.handleMoTableStats, false)
}

func (s *service) handleKillConn(ctx context.Context, req *query.Request, resp *query.Response, _ *morpc.Buffer) error {
Expand Down Expand Up @@ -156,6 +157,14 @@ func (s *service) handleFaultInjection(ctx context.Context, req *query.Request,
return nil
}

func (s *service) handleMoTableStats(ctx context.Context, req *query.Request, resp *query.Response, _ *morpc.Buffer) error {
ret := disttae.HandleMoTableStatsCtl(req.CtlMoTableStatsRequest.Cmd)
resp.CtlMoTableStatsResponse = query.CtlMoTableStatsResponse{
Resp: ret,
}
return nil
}

func (s *service) handleCtlReader(ctx context.Context, req *query.Request, resp *query.Response, _ *morpc.Buffer) error {
resp.CtlReaderResponse = new(query.CtlReaderResponse)

Expand Down
9 changes: 7 additions & 2 deletions pkg/cnservice/server_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ import (
"strings"
"time"

"go.uber.org/zap"

"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/common/runtime"
"github.com/matrixorigin/matrixone/pkg/container/vector"
Expand All @@ -40,6 +38,8 @@ import (
db_holder "github.com/matrixorigin/matrixone/pkg/util/export/etl/db"
ie "github.com/matrixorigin/matrixone/pkg/util/internalExecutor"
"github.com/matrixorigin/matrixone/pkg/util/metric/mometric"
"github.com/matrixorigin/matrixone/pkg/vm/engine/disttae"
"go.uber.org/zap"
)

func (s *service) adjustSQLAddress() {
Expand Down Expand Up @@ -277,6 +277,11 @@ func (s *service) registerExecutorsLocked() {
export.WithFileService(s.etlFS),
),
)
// init mo table stats task
s.task.runner.RegisterExecutor(
task.TaskCode_MOTableStats,
disttae.GetMOTableStatsExecutor(s.cfg.UUID, s.storeEngine, ieFactory))

// init metric task
s.task.runner.RegisterExecutor(
task.TaskCode_MetricStorageUsage,
Expand Down
7 changes: 5 additions & 2 deletions pkg/cnservice/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ import (
"sync/atomic"
"time"

"go.uber.org/zap"

"github.com/matrixorigin/matrixone/pkg/bootstrap"
"github.com/matrixorigin/matrixone/pkg/clusterservice"
"github.com/matrixorigin/matrixone/pkg/cnservice/cnclient"
Expand Down Expand Up @@ -57,7 +55,9 @@ import (
"github.com/matrixorigin/matrixone/pkg/util/executor"
"github.com/matrixorigin/matrixone/pkg/util/toml"
"github.com/matrixorigin/matrixone/pkg/vm/engine"
"github.com/matrixorigin/matrixone/pkg/vm/engine/disttae"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/options"
"go.uber.org/zap"
)

var (
Expand Down Expand Up @@ -130,7 +130,10 @@ type Config struct {
Type EngineType `toml:"type"`
Logstore options.LogstoreType `toml:"logstore"`

MoTableStatsUseOldImpl bool `toml:"mo-table-stats-use-old-impl"`
CNTransferTxnLifespanThreshold time.Duration `toml:"cn-transfer-txn-lifespan-threshold"`

Stats disttae.MoTableStatsConfig `toml:"stats"`
}

// parameters for cn-server related buffer.
Expand Down
6 changes: 6 additions & 0 deletions pkg/frontend/authenticate.go
Original file line number Diff line number Diff line change
Expand Up @@ -927,6 +927,7 @@ var (
"mo_snapshots": 0,
"mo_cdc_task": 0,
"mo_cdc_watermark": 0,
catalog.MO_TABLE_STATS: 0,
}
sysAccountTables = map[string]struct{}{
catalog.MOVersionTable: {},
Expand Down Expand Up @@ -967,6 +968,7 @@ var (
catalog.MO_RETENTION: 0,
"mo_cdc_task": 0,
"mo_cdc_watermark": 0,
catalog.MO_TABLE_STATS: 0,
}
createDbInformationSchemaSql = "create database information_schema;"
createAutoTableSql = MoCatalogMoAutoIncrTableDDL
Expand Down Expand Up @@ -1004,6 +1006,7 @@ var (
MoCatalogMoCdcTaskDDL,
MoCatalogMoCdcWatermarkDDL,
MoCatalogMoDataKeyDDL,
MoCatalogMoTableStatsDDL,
}

//drop tables for the tenant
Expand Down Expand Up @@ -7504,6 +7507,9 @@ func createTablesInMoCatalogOfGeneralTenant2(bh BackgroundExec, ca *createAccoun
if strings.HasPrefix(sql, "create table mo_catalog.mo_data_key") {
return true
}
if strings.HasPrefix(sql, fmt.Sprintf("create table mo_catalog.%s", catalog.MO_TABLE_STATS)) {
return true
}
return false
}

Expand Down
69 changes: 69 additions & 0 deletions pkg/frontend/func_mo_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright 2024 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package frontend

import (
"context"
"testing"

"github.com/golang/mock/gomock"
"github.com/matrixorigin/matrixone/pkg/sql/plan/function"
"github.com/matrixorigin/matrixone/pkg/testutil"
"github.com/stretchr/testify/require"
)

func TestGetVariables(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

ctx := context.Background()

ses := newSes(nil, ctrl)
defer ses.Close()

tempExecCtx := ExecCtx{
reqCtx: ctx,
ses: ses,
}
defer tempExecCtx.Close()
ses.txnCompileCtx.SetExecCtx(&tempExecCtx)

proc := testutil.NewProc()
proc.SetResolveVariableFunc(ses.txnCompileCtx.ResolveVariable)

{
require.NoError(t, ses.SetSessionSysVar(ctx, function.MoTableRowsSizeForceUpdateVarName, "yes"))
require.True(t, function.GetForceUpdateVariable(proc))

require.NoError(t, ses.SetSessionSysVar(ctx, function.MoTableRowsSizeForceUpdateVarName, "no"))
require.False(t, function.GetForceUpdateVariable(proc))
}

{
require.NoError(t, ses.SetSessionSysVar(ctx, function.MoTableRowSizeUseOldImplVarName, "yes"))
require.True(t, function.GetUseOldImplVariable(proc))

require.NoError(t, ses.SetSessionSysVar(ctx, function.MoTableRowSizeUseOldImplVarName, "no"))
require.False(t, function.GetUseOldImplVariable(proc))
}

{
require.NoError(t, ses.SetSessionSysVar(ctx, function.MoTableRowSizeResetUpdateTimeVarName, "yes"))
require.True(t, function.GetResetUpdateTimeVariable(proc))

require.NoError(t, ses.SetSessionSysVar(ctx, function.MoTableRowSizeResetUpdateTimeVarName, "no"))
require.False(t, function.GetResetUpdateTimeVariable(proc))
}
}
12 changes: 12 additions & 0 deletions pkg/frontend/predefined.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,18 @@ var (
update_time timestamp not null default current_timestamp on update current_timestamp,
primary key(account_id, key_id)
)`

MoCatalogMoTableStatsDDL = fmt.Sprintf(`create table mo_catalog.%s (
account_id bigint signed,
database_id bigint signed,
table_id bigint signed,
database_name varchar(255),
table_name varchar(255),
table_stats json,
update_time datetime(6) not null,
takes bigint unsigned,
primary key(account_id, database_id, table_id)
)`, catalog.MO_TABLE_STATS)
)

// `mo_catalog` database system tables
Expand Down
4 changes: 4 additions & 0 deletions pkg/frontend/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,10 @@ func setPu(service string, pu *config.ParameterUnit) {
getServerLevelVars(service).Pu.Store(pu)
}

func SetPUForExternalUT(service string, pu *config.ParameterUnit) {
setPu(service, pu)
}

func getPu(service string) *config.ParameterUnit {
return getServerLevelVars(service).Pu.Load().(*config.ParameterUnit)
}
Expand Down
24 changes: 24 additions & 0 deletions pkg/frontend/variables.go
Original file line number Diff line number Diff line change
Expand Up @@ -3646,6 +3646,30 @@ var gSysVarsDefs = map[string]SystemVariable{
Type: InitSystemVariableStringType("optimizer_switch"),
Default: "index_merge=on,index_merge_union=on,index_merge_sort_union=on,index_merge_intersection=on,engine_condition_pushdown=on,index_condition_pushdown=on,mrr=on,mrr_cost_based=on,block_nested_loop=on,batched_key_access=off,materialization=on,semijoin=on,loosescan=on,firstmatch=on,duplicateweedout=on,subquery_materialization_cost_based=on,use_index_extensions=on,condition_fanout_filter=on,derived_merge=on,use_invisible_indexes=off,skip_scan=on,hash_join=on,subquery_to_derived=off,prefer_ordering_index=on,hypergraph_optimizer=off,derived_condition_pushdown=on,hash_set_operations=on",
},
"mo_table_stats.force_update": {
Name: "mo_table_stats.force_update",
Scope: ScopeSession,
Dynamic: true,
SetVarHintApplies: false,
Type: InitSystemVariableStringType("mo_table_stats.force_update"),
Default: "",
},
"mo_table_stats.use_old_impl": {
Name: "mo_table_stats.use_old_impl",
Scope: ScopeSession,
Dynamic: true,
SetVarHintApplies: false,
Type: InitSystemVariableStringType("mo_table_stats.use_old_impl"),
Default: "",
},
"mo_table_stats.reset_update_time": {
Name: "mo_table_stats.reset_update_time",
Scope: ScopeSession,
Dynamic: true,
SetVarHintApplies: false,
Type: InitSystemVariableStringType("mo_table_stats.reset_update_time"),
Default: "",
},
}

func updateTimeZone(ctx context.Context, sess *Session, sv *SystemVariables, name string, val interface{}) error {
Expand Down
Loading