Skip to content

Commit

Permalink
support multi batches for keyviz schema data (#1700)
Browse files Browse the repository at this point in the history
Signed-off-by: mornyx <[email protected]>
  • Loading branch information
mornyx authored Jul 18, 2024
1 parent a743c8e commit e6e78c7
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 23 deletions.
98 changes: 75 additions & 23 deletions pkg/keyvisual/decorator/tidb_requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"net/url"
"strconv"
"strings"
"time"

"github.com/joomcode/errorx"
Expand All @@ -19,8 +20,9 @@ import (
)

const (
schemaVersionPath = "/tidb/ddl/global_schema_version"
etcdGetTimeout = time.Second
schemaVersionPath = "/tidb/ddl/global_schema_version"
etcdGetTimeout = time.Second
tableInfosBatchSize = 512
)

var (
Expand Down Expand Up @@ -73,34 +75,54 @@ func (s *tidbLabelStrategy) updateMap(ctx context.Context) {
}
var tableInfos []*model.TableInfo
encodeName := url.PathEscape(db.Name.O)
if err := s.request(fmt.Sprintf("/schema/%s", encodeName), &tableInfos); err != nil {
if err := s.request(fmt.Sprintf("/schema/%s?id_name_only=true", encodeName), &tableInfos); err != nil {
log.Error("fail to send schema request", zap.String("component", distro.R().TiDB), zap.Error(err))
updateSuccess = false
continue
}
for _, table := range tableInfos {
indices := make(map[int64]string, len(table.Indices))
for _, index := range table.Indices {
indices[index.ID] = index.Name.O
if len(tableInfos) == 0 {
continue
}
if tableInfos[0].Version != nil {
// ?id_name_only=true doesn't work, fallback.
log.Debug("use fallback")
s.updateTableMap(db.Name.O, tableInfos)
continue
}

/* Split into small batches */
log.Debug("use batch")

var tableIDBatches [][]string
batch := make([]string, 0, tableInfosBatchSize)
n := 0
for _, info := range tableInfos {
batch = append(batch, strconv.FormatInt(info.ID, 10))
n++
if n == tableInfosBatchSize {
tableIDBatches = append(tableIDBatches, batch)
batch = make([]string, 0, tableInfosBatchSize)
n = 0
}
detail := &tableDetail{
Name: table.Name.O,
DB: db.Name.O,
ID: table.ID,
Indices: indices,
}
if len(batch) > 0 {
tableIDBatches = append(tableIDBatches, batch)
}
for _, batch := range tableIDBatches {
var tableInfoBatch map[string]*model.TableInfo
if err := s.request(fmt.Sprintf("/schema?table_ids=%s", strings.Join(batch, ",")), &tableInfoBatch); err != nil {
log.Error("fail to send schema request", zap.String("component", distro.R().TiDB), zap.Error(err))
updateSuccess = false
continue
}
s.TableMap.Store(table.ID, detail)
if partition := table.GetPartitionInfo(); partition != nil {
for _, partitionDef := range partition.Definitions {
detail := &tableDetail{
Name: fmt.Sprintf("%s/%s", table.Name.O, partitionDef.Name.O),
DB: db.Name.O,
ID: partitionDef.ID,
Indices: indices,
}
s.TableMap.Store(partitionDef.ID, detail)
}
if len(tableInfoBatch) == 0 {
continue
}
tableInfoBatchSlice := make([]*model.TableInfo, 0, len(tableInfoBatch))
for _, info := range tableInfoBatch {
tableInfoBatchSlice = append(tableInfoBatchSlice, info)
}
s.updateTableMap(db.Name.O, tableInfoBatchSlice)
}
}

Expand All @@ -110,6 +132,36 @@ func (s *tidbLabelStrategy) updateMap(ctx context.Context) {
}
}

func (s *tidbLabelStrategy) updateTableMap(dbname string, tableInfos []*model.TableInfo) {
if len(tableInfos) == 0 {
return
}
for _, table := range tableInfos {
indices := make(map[int64]string, len(table.Indices))
for _, index := range table.Indices {
indices[index.ID] = index.Name.O
}
detail := &tableDetail{
Name: table.Name.O,
DB: dbname,
ID: table.ID,
Indices: indices,
}
s.TableMap.Store(table.ID, detail)
if partition := table.GetPartitionInfo(); partition != nil {
for _, partitionDef := range partition.Definitions {
detail := &tableDetail{
Name: fmt.Sprintf("%s/%s", table.Name.O, partitionDef.Name.O),
DB: dbname,
ID: partitionDef.ID,
Indices: indices,
}
s.TableMap.Store(partitionDef.ID, detail)
}
}
}
}

func (s *tidbLabelStrategy) request(path string, v interface{}) error {
data, err := s.tidbClient.SendGetRequest(path)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/tidb/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type TableInfo struct {
Name CIStr `json:"name"`
Indices []*IndexInfo `json:"index_info"`
Partition *PartitionInfo `json:"partition"`
Version *int64 `json:"version"`
}

// GetPartitionInfo returns the partition information.
Expand Down

0 comments on commit e6e78c7

Please sign in to comment.