Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Analyzer adds FileService Cache read and hit statistics #20397

1,382 changes: 843 additions & 539 deletions pkg/pb/plan/plan.pb.go

Large diffs are not rendered by default.

16 changes: 8 additions & 8 deletions pkg/perfcounter/counter_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@ type FileServiceCounterSet struct {
}

Cache struct {
Read stats.Counter
Hit stats.Counter
Read stats.Counter // CacheRead
Hit stats.Counter // CacheHit
Memory struct {
Read stats.Counter
Hit stats.Counter
Read stats.Counter // CacheMemoryRead
Hit stats.Counter // CacheMemoryHit
}
Disk struct {
Read stats.Counter
Hit stats.Counter
Read stats.Counter // CacheDiskRead
Hit stats.Counter // CacheDiskHit
OpenIOEntryFile stats.Counter
OpenFullFile stats.Counter
CreateFile stats.Counter
Expand All @@ -56,8 +56,8 @@ type FileServiceCounterSet struct {
Evict stats.Counter
}
Remote struct {
Read stats.Counter
Hit stats.Counter
Read stats.Counter // CacheRemoteRead
Hit stats.Counter // CacheRemoteHit
}
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/colexec/deletion/deletion.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ func (deletion *Deletion) normalDelete(proc *process.Process) (vm.CallResult, er
}
analyzer.AddDeletedRows(int64(deletion.ctr.resBat.RowCount()))
analyzer.AddS3RequestCount(crs)
analyzer.AddFileServiceCacheInfo(crs)
analyzer.AddDiskIO(crs)
}
}
Expand All @@ -281,6 +282,7 @@ func (deletion *Deletion) normalDelete(proc *process.Process) (vm.CallResult, er
}
analyzer.AddDeletedRows(int64(deletion.ctr.resBat.RowCount()))
analyzer.AddS3RequestCount(crs)
analyzer.AddFileServiceCacheInfo(crs)
analyzer.AddDiskIO(crs)
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colexec/deletion/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ func (ctr *container) flush(proc *process.Process, analyzer process.Analyzer) (u
return 0, err
}
analyzer.AddS3RequestCount(crs)
analyzer.AddFileServiceCacheInfo(crs)
analyzer.AddDiskIO(crs)

bat := batch.New([]string{catalog.ObjectMeta_ObjectStats})
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colexec/external/external.go
Original file line number Diff line number Diff line change
Expand Up @@ -1066,6 +1066,7 @@ func scanZonemapFile(ctx context.Context, param *ExternalParam, proc *process.Pr
return err
}
analyzer.AddS3RequestCount(crs)
analyzer.AddFileServiceCacheInfo(crs)
analyzer.AddDiskIO(crs)

if param.Zoneparam.offset >= len(param.Zoneparam.bs) {
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/colexec/insert/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ func (insert *Insert) insert_table(proc *process.Process, analyzer process.Analy
}
analyzer.AddWrittenRows(int64(insert.ctr.buf.RowCount()))
analyzer.AddS3RequestCount(crs)
analyzer.AddFileServiceCacheInfo(crs)
analyzer.AddDiskIO(crs)
}
} else {
Expand All @@ -261,6 +262,7 @@ func (insert *Insert) insert_table(proc *process.Process, analyzer process.Analy
}
analyzer.AddWrittenRows(int64(insert.ctr.buf.RowCount()))
analyzer.AddS3RequestCount(crs)
analyzer.AddFileServiceCacheInfo(crs)
analyzer.AddDiskIO(crs)
}

Expand All @@ -281,6 +283,7 @@ func writeBatch(proc *process.Process, writer *colexec.S3Writer, bat *batch.Batc
return err
}
analyzer.AddS3RequestCount(crs)
analyzer.AddFileServiceCacheInfo(crs)
analyzer.AddDiskIO(crs)

err = writer.FillBlockInfoBat(blockInfos, stats, proc.GetMPool())
Expand All @@ -300,6 +303,7 @@ func flushTailBatch(proc *process.Process, writer *colexec.S3Writer, result *vm.
return err
}
analyzer.AddS3RequestCount(crs)
analyzer.AddFileServiceCacheInfo(crs)
analyzer.AddDiskIO(crs)

// if stats is not zero, then the blockInfos must not be nil
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colexec/lockop/lock_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -984,6 +984,7 @@ func hasNewVersionInRange(
defer func() {
if analyzer != nil {
analyzer.AddS3RequestCount(crs)
analyzer.AddFileServiceCacheInfo(crs)
analyzer.AddDiskIO(crs)
}
}()
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/colexec/mergeblock/mergeblock.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ func (mergeBlock *MergeBlock) Call(proc *process.Process) (vm.CallResult, error)
}
analyzer.AddWrittenRows(int64(mergeBlock.container.mp[i].RowCount()))
analyzer.AddS3RequestCount(crs)
analyzer.AddFileServiceCacheInfo(crs)
analyzer.AddDiskIO(crs)
}

Expand All @@ -106,6 +107,7 @@ func (mergeBlock *MergeBlock) Call(proc *process.Process) (vm.CallResult, error)
}
analyzer.AddWrittenRows(int64(bat.RowCount()))
analyzer.AddS3RequestCount(crs)
analyzer.AddFileServiceCacheInfo(crs)
analyzer.AddDiskIO(crs)

bat.Clean(proc.GetMPool())
Expand All @@ -124,6 +126,7 @@ func (mergeBlock *MergeBlock) Call(proc *process.Process) (vm.CallResult, error)
}
analyzer.AddWrittenRows(int64(mergeBlock.container.mp[0].RowCount()))
analyzer.AddS3RequestCount(crs)
analyzer.AddFileServiceCacheInfo(crs)
analyzer.AddDiskIO(crs)
}

Expand All @@ -136,6 +139,7 @@ func (mergeBlock *MergeBlock) Call(proc *process.Process) (vm.CallResult, error)
}
analyzer.AddWrittenRows(int64(bat.RowCount()))
analyzer.AddS3RequestCount(crs)
analyzer.AddFileServiceCacheInfo(crs)
analyzer.AddDiskIO(crs)

bat.Clean(proc.GetMPool())
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/colexec/multi_update/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func (update *MultiUpdate) delete_table(
}
analyzer.AddDeletedRows(int64(deleteBatch.RowCount()))
analyzer.AddS3RequestCount(crs)
analyzer.AddFileServiceCacheInfo(crs)
analyzer.AddDiskIO(crs)
}
}
Expand Down Expand Up @@ -137,6 +138,7 @@ func (update *MultiUpdate) delete_table(
}
analyzer.AddDeletedRows(int64(deleteBatch.RowCount()))
analyzer.AddS3RequestCount(crs)
analyzer.AddFileServiceCacheInfo(crs)
analyzer.AddDiskIO(crs)
}
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/colexec/multi_update/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ func (update *MultiUpdate) insert_table(
}
analyzer.AddWrittenRows(int64(insertBatch.RowCount()))
analyzer.AddS3RequestCount(crs)
analyzer.AddFileServiceCacheInfo(crs)
analyzer.AddDiskIO(crs)
}
}
Expand All @@ -193,6 +194,7 @@ func (update *MultiUpdate) insert_table(
}
analyzer.AddWrittenRows(int64(insertBatch.RowCount()))
analyzer.AddS3RequestCount(crs)
analyzer.AddFileServiceCacheInfo(crs)
analyzer.AddDiskIO(crs)
}
}
Expand Down Expand Up @@ -255,6 +257,7 @@ func (update *MultiUpdate) check_null_and_insert_table(
}
analyzer.AddWrittenRows(int64(insertBatch.RowCount()))
analyzer.AddS3RequestCount(crs)
analyzer.AddFileServiceCacheInfo(crs)
analyzer.AddDiskIO(crs)

}
Expand Down Expand Up @@ -291,6 +294,7 @@ func (update *MultiUpdate) check_null_and_insert_table(
}
analyzer.AddWrittenRows(int64(insertBatch.RowCount()))
analyzer.AddS3RequestCount(crs)
analyzer.AddFileServiceCacheInfo(crs)
analyzer.AddDiskIO(crs)
}
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/colexec/multi_update/multi_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ func (update *MultiUpdate) updateFlushS3Info(proc *process.Process, analyzer pro
}
analyzer.AddDeletedRows(int64(batBufs[actionDelete].RowCount()))
analyzer.AddS3RequestCount(crs)
analyzer.AddFileServiceCacheInfo(crs)
analyzer.AddDiskIO(crs)

case actionInsert:
Expand All @@ -292,6 +293,7 @@ func (update *MultiUpdate) updateFlushS3Info(proc *process.Process, analyzer pro
}
analyzer.AddWrittenRows(int64(batBufs[actionInsert].RowCount()))
analyzer.AddS3RequestCount(crs)
analyzer.AddFileServiceCacheInfo(crs)
analyzer.AddDiskIO(crs)

case actionUpdate:
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/colexec/multi_update/s3writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ func (writer *s3Writer) sortAndSyncOneTable(
return
}
analyzer.AddS3RequestCount(crs)
analyzer.AddFileServiceCacheInfo(crs)
analyzer.AddDiskIO(crs)

return writer.fillInsertBlockInfo(proc, idx, partitionIdx, blockInfos, objStats, rowCount)
Expand Down Expand Up @@ -487,6 +488,7 @@ func (writer *s3Writer) sortAndSyncOneTable(
return
}
analyzer.AddS3RequestCount(crs)
analyzer.AddFileServiceCacheInfo(crs)
analyzer.AddDiskIO(crs)

if isDelete {
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/colexec/output/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func (output *Output) Call(proc *process.Process) (vm.CallResult, error) {
return result, err
}
analyzer.AddS3RequestCount(crs)
analyzer.AddFileServiceCacheInfo(crs)
analyzer.AddDiskIO(crs)

// TODO: analyzer.Output(result.Batch)
Expand Down Expand Up @@ -120,6 +121,7 @@ func (output *Output) Call(proc *process.Process) (vm.CallResult, error) {
return result, err
}
analyzer.AddS3RequestCount(crs)
analyzer.AddFileServiceCacheInfo(crs)
analyzer.AddDiskIO(crs)

result.Batch = bat
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colexec/table_function/metadata_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func (s *metadataScanState) start(tf *TableFunction, proc *process.Process, nthR
return err
}
analyzer.AddS3RequestCount(crs)
analyzer.AddFileServiceCacheInfo(crs)
analyzer.AddDiskIO(crs)

for i := range metaInfos {
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colexec/table_scan/table_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ func (tableScan *TableScan) Call(proc *process.Process) (vm.CallResult, error) {
return vm.CancelResult, err
}
analyzer.AddS3RequestCount(crs)
analyzer.AddFileServiceCacheInfo(crs)
analyzer.AddDiskIO(crs)

if isEnd {
Expand Down
10 changes: 10 additions & 0 deletions pkg/sql/compile/analyze_module.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,16 @@ func applyOpStatsToNode(op *models.PhyOperator, nodes []*plan.Node, scopeParalle
node.AnalyzeInfo.S3Delete += op.OpStats.S3Delete
node.AnalyzeInfo.S3DeleteMul += op.OpStats.S3DeleteMul
node.AnalyzeInfo.DiskIO += op.OpStats.DiskIO

node.AnalyzeInfo.CacheRead += op.OpStats.CacheRead
node.AnalyzeInfo.CacheHit += op.OpStats.CacheHit
node.AnalyzeInfo.CacheMemoryRead += op.OpStats.CacheMemoryRead
node.AnalyzeInfo.CacheMemoryHit += op.OpStats.CacheMemoryHit
node.AnalyzeInfo.CacheDiskRead += op.OpStats.CacheDiskRead
node.AnalyzeInfo.CacheDiskHit += op.OpStats.CacheDiskHit
node.AnalyzeInfo.CacheRemoteRead += op.OpStats.CacheRemoteRead
node.AnalyzeInfo.CacheRemoteHit += op.OpStats.CacheRemoteHit

node.AnalyzeInfo.WrittenRows += op.OpStats.WrittenRows
node.AnalyzeInfo.DeletedRows += op.OpStats.DeletedRows

Expand Down
34 changes: 34 additions & 0 deletions pkg/sql/models/logic_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,15 @@ const S3Get = "S3 Get Count"
const S3Delete = "S3 Delete Count"
const S3DeleteMul = "S3 DeleteMul Count"

const FSCacheRead = "FileService Cache Read"
const FSCacheHit = "FileService Cache Hit"
const FSCacheMemoryRead = "FileService Cache Memory Read"
const FSCacheMemoryHit = "FileService Cache Memory Hit"
const FSCacheDiskRead = "FileService Cache Disk Read"
const FSCacheDiskHit = "FileService Cache Disk Hit"
const FSCacheRemoteRead = "FileService Cache Remote Read"
const FSCacheRemoteHit = "FileService Cache Remote Hit"

const Network = "Network"

type ExplainData struct {
Expand Down Expand Up @@ -252,6 +261,15 @@ func (graphData *GraphData) StatisticsGlobalResource(ctx context.Context) error
gS3DeleteCount := NewStatisticValue(S3Delete, "count")
gS3DeleteMulCount := NewStatisticValue(S3DeleteMul, "count")

gFSCacheRead := NewStatisticValue(FSCacheRead, "count")
gFSCacheHit := NewStatisticValue(FSCacheHit, "count")
gFSCacheMemoryRead := NewStatisticValue(FSCacheMemoryRead, "count")
gFSCacheMemoryHit := NewStatisticValue(FSCacheMemoryHit, "count")
gFSCacheDiskRead := NewStatisticValue(FSCacheDiskRead, "count")
gFSCacheDiskHit := NewStatisticValue(FSCacheDiskHit, "count")
gFSCacheRemoteRead := NewStatisticValue(FSCacheRemoteRead, "count")
gFSCacheRemoteHit := NewStatisticValue(FSCacheRemoteHit, "count")

// network
gNetwork := NewStatisticValue(Network, "byte")

Expand Down Expand Up @@ -308,6 +326,22 @@ func (graphData *GraphData) StatisticsGlobalResource(ctx context.Context) error
gS3DeleteCount.Value += ioValue.Value
case S3DeleteMul:
gS3DeleteMulCount.Value += ioValue.Value
case FSCacheRead:
gFSCacheRead.Value += ioValue.Value
case FSCacheHit:
gFSCacheHit.Value += ioValue.Value
case FSCacheMemoryRead:
gFSCacheMemoryRead.Value += ioValue.Value
case FSCacheMemoryHit:
gFSCacheMemoryHit.Value += ioValue.Value
case FSCacheDiskRead:
gFSCacheDiskRead.Value += ioValue.Value
case FSCacheDiskHit:
gFSCacheDiskHit.Value += ioValue.Value
case FSCacheRemoteRead:
gFSCacheRemoteRead.Value += ioValue.Value
case FSCacheRemoteHit:
gFSCacheRemoteHit.Value += ioValue.Value
}
}

Expand Down
8 changes: 8 additions & 0 deletions pkg/sql/plan/deepcopy.go
Original file line number Diff line number Diff line change
Expand Up @@ -1049,6 +1049,14 @@ func DeepCopyAnalyzeInfo(analyzeinfo *plan.AnalyzeInfo) *plan.AnalyzeInfo {
S3Head: analyzeinfo.GetS3Head(),
S3Delete: analyzeinfo.GetS3Delete(),
S3DeleteMul: analyzeinfo.GetS3DeleteMul(),
CacheRead: analyzeinfo.GetCacheRead(),
CacheHit: analyzeinfo.GetCacheHit(),
CacheMemoryRead: analyzeinfo.GetCacheMemoryRead(),
CacheMemoryHit: analyzeinfo.GetCacheMemoryHit(),
CacheDiskRead: analyzeinfo.GetCacheDiskRead(),
CacheDiskHit: analyzeinfo.GetCacheDiskHit(),
CacheRemoteRead: analyzeinfo.GetCacheRemoteRead(),
CacheRemoteHit: analyzeinfo.GetCacheRemoteHit(),
NetworkIO: analyzeinfo.GetNetworkIO(),
ScanTime: analyzeinfo.GetScanTime(),
InsertTime: analyzeinfo.GetInsertTime(),
Expand Down
50 changes: 50 additions & 0 deletions pkg/sql/plan/explain/marshal_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,14 @@ const S3Put = "S3 Put Count"
const S3Get = "S3 Get Count"
const S3Delete = "S3 Delete Count"
const S3DeleteMul = "S3 DeleteMul Count"
const FSCacheRead = "FileService Cache Read"
const FSCacheHit = "FileService Cache Hit"
const FSCacheMemoryRead = "FileService Cache Memory Read"
const FSCacheMemoryHit = "FileService Cache Memory Hit"
const FSCacheDiskRead = "FileService Cache Disk Read"
const FSCacheDiskHit = "FileService Cache Disk Hit"
const FSCacheRemoteRead = "FileService Cache Remote Read"
const FSCacheRemoteHit = "FileService Cache Remote Hit"

func GetStatistic4Trace(ctx context.Context, node *plan.Node, options *ExplainOptions) (s statistic.StatsArray) {
s.Reset()
Expand Down Expand Up @@ -858,6 +866,48 @@ func (m MarshalNodeImpl) GetStatistics(ctx context.Context, options *ExplainOpti
Value: analyzeInfo.S3DeleteMul,
Unit: Statistic_Unit_count, //"count",
},
//--------------------------------------------------------------------------------------
{
Name: FSCacheRead,
Value: analyzeInfo.CacheRead,
Unit: Statistic_Unit_count, //"count",
},
{
Name: FSCacheHit,
Value: analyzeInfo.CacheHit,
Unit: Statistic_Unit_count, //"count",
},
{
Name: FSCacheMemoryRead,
Value: analyzeInfo.CacheMemoryRead,
Unit: Statistic_Unit_count, //"count",
},
{
Name: FSCacheMemoryHit,
Value: analyzeInfo.CacheMemoryHit,
Unit: Statistic_Unit_count, //"count",
},
{
Name: FSCacheDiskRead,
Value: analyzeInfo.CacheDiskRead,
Unit: Statistic_Unit_count, //"count",
},
{
Name: FSCacheDiskHit,
Value: analyzeInfo.CacheDiskHit,
Unit: Statistic_Unit_count, //"count",
},
{
Name: FSCacheRemoteRead,
Value: analyzeInfo.CacheRemoteRead,
Unit: Statistic_Unit_count, //"count",
},
{
Name: FSCacheRemoteHit,
Value: analyzeInfo.CacheRemoteHit,
Unit: Statistic_Unit_count, //"count",
},
//--------------------------------------------------------------------------------------
}

nw := []models.StatisticValue{
Expand Down
Loading
Loading