Skip to content

Commit

Permalink
br: use the dedicated grpc client for ingest (pingcap#51034)
Browse files Browse the repository at this point in the history
  • Loading branch information
Leavrth authored Mar 5, 2024
1 parent c1befbb commit 61d6e5f
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 18 deletions.
4 changes: 4 additions & 0 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,10 @@ func (rc *Client) Close() {
rc.rawKVClient.Close()
}

if err := rc.fileImporter.Close(); err != nil {
log.Warn("failed to close file improter")
}

log.Info("Restore client closed")
}

Expand Down
92 changes: 74 additions & 18 deletions br/pkg/restore/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,23 +113,31 @@ type ImporterClient interface {
storeID uint64,
) (import_sstpb.ImportSSTClient, error)

CloseGrpcClient() error

SupportMultiIngest(ctx context.Context, stores []uint64) (bool, error)
}

type importClient struct {
mu sync.Mutex
metaClient split.SplitClient
clients map[uint64]import_sstpb.ImportSSTClient
tlsConf *tls.Config

mu sync.Mutex
// Notice: In order to avoid leak for BRIE via SQL, it needs to close grpc client connection before br task exits.
// So it caches the grpc connection instead of import_sstpb.ImportSSTClient.
// used for any request except the ingest reqeust
conns map[uint64]*grpc.ClientConn
// used for ingest request
ingestConns map[uint64]*grpc.ClientConn

tlsConf *tls.Config
keepaliveConf keepalive.ClientParameters
}

// NewImportClient returns a new ImporterClient.
func NewImportClient(metaClient split.SplitClient, tlsConf *tls.Config, keepaliveConf keepalive.ClientParameters) ImporterClient {
return &importClient{
metaClient: metaClient,
clients: make(map[uint64]import_sstpb.ImportSSTClient),
conns: make(map[uint64]*grpc.ClientConn),
ingestConns: make(map[uint64]*grpc.ClientConn),
tlsConf: tlsConf,
keepaliveConf: keepaliveConf,
}
Expand Down Expand Up @@ -188,7 +196,7 @@ func (ic *importClient) IngestSST(
storeID uint64,
req *import_sstpb.IngestRequest,
) (*import_sstpb.IngestResponse, error) {
client, err := ic.GetImportClient(ctx, storeID)
client, err := ic.GetIngestClient(ctx, storeID)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -200,23 +208,17 @@ func (ic *importClient) MultiIngest(
storeID uint64,
req *import_sstpb.MultiIngestRequest,
) (*import_sstpb.IngestResponse, error) {
client, err := ic.GetImportClient(ctx, storeID)
client, err := ic.GetIngestClient(ctx, storeID)
if err != nil {
return nil, errors.Trace(err)
}
return client.MultiIngest(ctx, req)
}

func (ic *importClient) GetImportClient(
func (ic *importClient) createGrpcConn(
ctx context.Context,
storeID uint64,
) (import_sstpb.ImportSSTClient, error) {
ic.mu.Lock()
defer ic.mu.Unlock()
client, ok := ic.clients[storeID]
if ok {
return client, nil
}
) (*grpc.ClientConn, error) {
store, err := ic.metaClient.GetStore(ctx, storeID)
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -240,12 +242,58 @@ func (ic *importClient) GetImportClient(
grpc.WithConnectParams(grpc.ConnectParams{Backoff: bfConf}),
grpc.WithKeepaliveParams(ic.keepaliveConf),
)
return conn, errors.Trace(err)
}

func (ic *importClient) cachedConnectionFrom(
ctx context.Context,
storeID uint64,
caches map[uint64]*grpc.ClientConn,
) (import_sstpb.ImportSSTClient, error) {
ic.mu.Lock()
defer ic.mu.Unlock()
conn, ok := caches[storeID]
if ok {
return import_sstpb.NewImportSSTClient(conn), nil
}
conn, err := ic.createGrpcConn(ctx, storeID)
if err != nil {
return nil, errors.Trace(err)
}
client = import_sstpb.NewImportSSTClient(conn)
ic.clients[storeID] = client
return client, errors.Trace(err)
caches[storeID] = conn
return import_sstpb.NewImportSSTClient(conn), nil
}

func (ic *importClient) GetImportClient(
ctx context.Context,
storeID uint64,
) (import_sstpb.ImportSSTClient, error) {
return ic.cachedConnectionFrom(ctx, storeID, ic.conns)
}

func (ic *importClient) GetIngestClient(
ctx context.Context,
storeID uint64,
) (import_sstpb.ImportSSTClient, error) {
return ic.cachedConnectionFrom(ctx, storeID, ic.ingestConns)
}

func (ic *importClient) CloseGrpcClient() error {
ic.mu.Lock()
defer ic.mu.Unlock()
for id, conn := range ic.conns {
if err := conn.Close(); err != nil {
return errors.Trace(err)
}
delete(ic.conns, id)
}
for id, conn := range ic.ingestConns {
if err := conn.Close(); err != nil {
return errors.Trace(err)
}
delete(ic.ingestConns, id)
}
return nil
}

func (ic *importClient) SupportMultiIngest(ctx context.Context, stores []uint64) (bool, error) {
Expand Down Expand Up @@ -315,6 +363,13 @@ func NewFileImporter(
}
}

func (importer *FileImporter) Close() error {
if importer != nil && importer.importClient != nil {
return importer.importClient.CloseGrpcClient()
}
return nil
}

// CheckMultiIngestSupport checks whether all stores support multi-ingest
func (importer *FileImporter) CheckMultiIngestSupport(ctx context.Context, pdClient pd.Client) error {
allStores, err := util.GetAllTiKVStores(ctx, pdClient, util.SkipTiFlash)
Expand Down Expand Up @@ -967,6 +1022,7 @@ func (importer *FileImporter) downloadSSTV2(
if err != nil {
return nil, errors.Trace(err)
}
sstMeta.ApiVersion = apiVersion
downloadMetasMap[file.Name] = sstMeta
downloadReqsMap[file.Name] = req
}
Expand Down

0 comments on commit 61d6e5f

Please sign in to comment.