Skip to content

Commit

Permalink
Introduce an independent region client interface
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato committed Dec 5, 2024
1 parent 3cfd66f commit 4d58b18
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 110 deletions.
110 changes: 17 additions & 93 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@ package pd

import (
"context"
"encoding/hex"
"fmt"
"net/url"
"runtime/trace"
"strings"
"sync"
Expand All @@ -32,6 +30,7 @@ import (
"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/pd/client/clients/metastorage"
"github.com/tikv/pd/client/clients/region"
"github.com/tikv/pd/client/clients/tso"
"github.com/tikv/pd/client/constants"
"github.com/tikv/pd/client/errs"
Expand All @@ -43,15 +42,6 @@ import (
"go.uber.org/zap"
)

// Region contains information of a region's meta and its peers.
type Region struct {
Meta *metapb.Region
Leader *metapb.Peer
DownPeers []*metapb.Peer
PendingPeers []*metapb.Peer
Buckets *metapb.Buckets
}

// GlobalConfigItem standard format of KV pair in GlobalConfig client
type GlobalConfigItem struct {
EventType pdpb.EventType
Expand All @@ -64,30 +54,6 @@ type GlobalConfigItem struct {
type RPCClient interface {
// GetAllMembers gets the members Info from PD
GetAllMembers(ctx context.Context) ([]*pdpb.Member, error)
// GetRegion gets a region and its leader Peer from PD by key.
// The region may expire after split. Caller is responsible for caching and
// taking care of region change.
// Also, it may return nil if PD finds no Region for the key temporarily,
// client should retry later.
GetRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*Region, error)
// GetRegionFromMember gets a region from certain members.
GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string, opts ...opt.GetRegionOption) (*Region, error)
// GetPrevRegion gets the previous region and its leader Peer of the region where the key is located.
GetPrevRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*Region, error)
// GetRegionByID gets a region and its leader Peer from PD by id.
GetRegionByID(ctx context.Context, regionID uint64, opts ...opt.GetRegionOption) (*Region, error)
// Deprecated: use BatchScanRegions instead.
// ScanRegions gets a list of regions, starts from the region that contains key.
// Limit limits the maximum number of regions returned. It returns all the regions in the given range if limit <= 0.
// If a region has no leader, corresponding leader will be placed by a peer
// with empty value (PeerID is 0).
ScanRegions(ctx context.Context, key, endKey []byte, limit int, opts ...opt.GetRegionOption) ([]*Region, error)
// BatchScanRegions gets a list of regions, starts from the region that contains key.
// Limit limits the maximum number of regions returned. It returns all the regions in the given ranges if limit <= 0.
// If a region has no leader, corresponding leader will be placed by a peer
// with empty value (PeerID is 0).
// The returned regions are flattened, even there are key ranges located in the same region, only one region will be returned.
BatchScanRegions(ctx context.Context, keyRanges []KeyRange, limit int, opts ...opt.GetRegionOption) ([]*Region, error)
// GetStore gets a store from PD by store id.
// The store may expire later. Caller is responsible for caching and taking care
// of store change.
Expand All @@ -105,17 +71,6 @@ type RPCClient interface {
// determine the safepoint for multiple services, it does not trigger a GC
// job. Use UpdateGCSafePoint to trigger the GC job if needed.
UpdateServiceGCSafePoint(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error)
// ScatterRegion scatters the specified region. Should use it for a batch of regions,
// and the distribution of these regions will be dispersed.
// NOTICE: This method is the old version of ScatterRegions, you should use the later one as your first choice.
ScatterRegion(ctx context.Context, regionID uint64) error
// ScatterRegions scatters the specified regions. Should use it for a batch of regions,
// and the distribution of these regions will be dispersed.
ScatterRegions(ctx context.Context, regionsID []uint64, opts ...opt.RegionsOption) (*pdpb.ScatterRegionResponse, error)
// SplitRegions split regions by given split keys
SplitRegions(ctx context.Context, splitKeys [][]byte, opts ...opt.RegionsOption) (*pdpb.SplitRegionsResponse, error)
// SplitAndScatterRegions split regions by given split keys and scatter new regions
SplitAndScatterRegions(ctx context.Context, splitKeys [][]byte, opts ...opt.RegionsOption) (*pdpb.SplitAndScatterRegionsResponse, error)
// GetOperator gets the status of operator of the specified region.
GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error)

Expand All @@ -141,6 +96,7 @@ type RPCClient interface {
// on your needs.
WithCallerComponent(callerComponent caller.Component) RPCClient

region.Client
tso.Client
metastorage.Client
// KeyspaceClient manages keyspace metadata.
Expand Down Expand Up @@ -214,38 +170,6 @@ type SecurityOption struct {
SSLKEYBytes []byte
}

// KeyRange defines a range of keys in bytes.
type KeyRange struct {
StartKey []byte
EndKey []byte
}

// NewKeyRange creates a new key range structure with the given start key and end key bytes.
// Notice: the actual encoding of the key range is not specified here. It should be either UTF-8 or hex.
// - UTF-8 means the key has already been encoded into a string with UTF-8 encoding, like:
// []byte{52 56 54 53 54 99 54 99 54 102 50 48 53 55 54 102 55 50 54 99 54 52}, which will later be converted to "48656c6c6f20576f726c64"
// by using `string()` method.
// - Hex means the key is just a raw hex bytes without encoding to a UTF-8 string, like:
// []byte{72, 101, 108, 108, 111, 32, 87, 111, 114, 108, 100}, which will later be converted to "48656c6c6f20576f726c64"
// by using `hex.EncodeToString()` method.
func NewKeyRange(startKey, endKey []byte) *KeyRange {
return &KeyRange{startKey, endKey}
}

// EscapeAsUTF8Str returns the URL escaped key strings as they are UTF-8 encoded.
func (r *KeyRange) EscapeAsUTF8Str() (startKeyStr, endKeyStr string) {
startKeyStr = url.QueryEscape(string(r.StartKey))
endKeyStr = url.QueryEscape(string(r.EndKey))
return
}

// EscapeAsHexStr returns the URL escaped key strings as they are hex encoded.
func (r *KeyRange) EscapeAsHexStr() (startKeyStr, endKeyStr string) {
startKeyStr = url.QueryEscape(hex.EncodeToString(r.StartKey))
endKeyStr = url.QueryEscape(hex.EncodeToString(r.EndKey))
return
}

// NewClient creates a PD client.
func NewClient(
callerComponent caller.Component,
Expand Down Expand Up @@ -634,12 +558,12 @@ func (c *client) GetMinTS(ctx context.Context) (physical int64, logical int64, e
return minTS.Physical, minTS.Logical, nil
}

func handleRegionResponse(res *pdpb.GetRegionResponse) *Region {
func handleRegionResponse(res *pdpb.GetRegionResponse) *region.Region {
if res.Region == nil {
return nil
}

r := &Region{
r := &region.Region{
Meta: res.Region,
Leader: res.Leader,
PendingPeers: res.PendingPeers,
Expand All @@ -652,7 +576,7 @@ func handleRegionResponse(res *pdpb.GetRegionResponse) *Region {
}

// GetRegionFromMember implements the RPCClient interface.
func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string, _ ...opt.GetRegionOption) (*Region, error) {
func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string, _ ...opt.GetRegionOption) (*region.Region, error) {

Check warning on line 579 in client/client.go

View check run for this annotation

Codecov / codecov/patch

client/client.go#L579

Added line #L579 was not covered by tests
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.GetRegionFromMember", opentracing.ChildOf(span.Context()))
defer span.Finish()
Expand Down Expand Up @@ -691,7 +615,7 @@ func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs
}

// GetRegion implements the RPCClient interface.
func (c *client) GetRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*Region, error) {
func (c *client) GetRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*region.Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.GetRegion", opentracing.ChildOf(span.Context()))
defer span.Finish()
Expand Down Expand Up @@ -731,7 +655,7 @@ func (c *client) GetRegion(ctx context.Context, key []byte, opts ...opt.GetRegio
}

// GetPrevRegion implements the RPCClient interface.
func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*Region, error) {
func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*region.Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.GetPrevRegion", opentracing.ChildOf(span.Context()))
defer span.Finish()
Expand Down Expand Up @@ -771,7 +695,7 @@ func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...opt.GetR
}

// GetRegionByID implements the RPCClient interface.
func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...opt.GetRegionOption) (*Region, error) {
func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...opt.GetRegionOption) (*region.Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.GetRegionByID", opentracing.ChildOf(span.Context()))
defer span.Finish()
Expand Down Expand Up @@ -811,7 +735,7 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...opt
}

// ScanRegions implements the RPCClient interface.
func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int, opts ...opt.GetRegionOption) ([]*Region, error) {
func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int, opts ...opt.GetRegionOption) ([]*region.Region, error) {

Check warning on line 738 in client/client.go

View check run for this annotation

Codecov / codecov/patch

client/client.go#L738

Added line #L738 was not covered by tests
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.ScanRegions", opentracing.ChildOf(span.Context()))
defer span.Finish()
Expand Down Expand Up @@ -862,7 +786,7 @@ func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int,
}

// BatchScanRegions implements the RPCClient interface.
func (c *client) BatchScanRegions(ctx context.Context, ranges []KeyRange, limit int, opts ...opt.GetRegionOption) ([]*Region, error) {
func (c *client) BatchScanRegions(ctx context.Context, ranges []region.KeyRange, limit int, opts ...opt.GetRegionOption) ([]*region.Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.BatchScanRegions", opentracing.ChildOf(span.Context()))
defer span.Finish()
Expand Down Expand Up @@ -915,10 +839,10 @@ func (c *client) BatchScanRegions(ctx context.Context, ranges []KeyRange, limit
return handleBatchRegionsResponse(resp), nil
}

func handleBatchRegionsResponse(resp *pdpb.BatchScanRegionsResponse) []*Region {
regions := make([]*Region, 0, len(resp.GetRegions()))
func handleBatchRegionsResponse(resp *pdpb.BatchScanRegionsResponse) []*region.Region {
regions := make([]*region.Region, 0, len(resp.GetRegions()))
for _, r := range resp.GetRegions() {
region := &Region{
region := &region.Region{
Meta: r.Region,
Leader: r.Leader,
PendingPeers: r.PendingPeers,
Expand All @@ -932,21 +856,21 @@ func handleBatchRegionsResponse(resp *pdpb.BatchScanRegionsResponse) []*Region {
return regions
}

func handleRegionsResponse(resp *pdpb.ScanRegionsResponse) []*Region {
var regions []*Region
func handleRegionsResponse(resp *pdpb.ScanRegionsResponse) []*region.Region {
var regions []*region.Region

Check warning on line 860 in client/client.go

View check run for this annotation

Codecov / codecov/patch

client/client.go#L859-L860

Added lines #L859 - L860 were not covered by tests
if len(resp.GetRegions()) == 0 {
// Make it compatible with old server.
metas, leaders := resp.GetRegionMetas(), resp.GetLeaders()
for i := range metas {
r := &Region{Meta: metas[i]}
r := &region.Region{Meta: metas[i]}

Check warning on line 865 in client/client.go

View check run for this annotation

Codecov / codecov/patch

client/client.go#L865

Added line #L865 was not covered by tests
if i < len(leaders) {
r.Leader = leaders[i]
}
regions = append(regions, r)
}
} else {
for _, r := range resp.GetRegions() {
region := &Region{
region := &region.Region{

Check warning on line 873 in client/client.go

View check run for this annotation

Codecov / codecov/patch

client/client.go#L873

Added line #L873 was not covered by tests
Meta: r.Region,
Leader: r.Leader,
PendingPeers: r.PendingPeers,
Expand Down
105 changes: 105 additions & 0 deletions client/clients/region/region_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright 2024 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package region

import (
"context"
"encoding/hex"
"net/url"

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/tikv/pd/client/opt"
)

// Region contains information of a region's meta and its peers.
type Region struct {
Meta *metapb.Region
Leader *metapb.Peer
DownPeers []*metapb.Peer
PendingPeers []*metapb.Peer
Buckets *metapb.Buckets
}

// KeyRange defines a range of keys in bytes.
type KeyRange struct {
StartKey []byte
EndKey []byte
}

// NewKeyRange creates a new key range structure with the given start key and end key bytes.
// Notice: the actual encoding of the key range is not specified here. It should be either UTF-8 or hex.
// - UTF-8 means the key has already been encoded into a string with UTF-8 encoding, like:
// []byte{52 56 54 53 54 99 54 99 54 102 50 48 53 55 54 102 55 50 54 99 54 52}, which will later be converted to "48656c6c6f20576f726c64"
// by using `string()` method.
// - Hex means the key is just a raw hex bytes without encoding to a UTF-8 string, like:
// []byte{72, 101, 108, 108, 111, 32, 87, 111, 114, 108, 100}, which will later be converted to "48656c6c6f20576f726c64"
// by using `hex.EncodeToString()` method.
func NewKeyRange(startKey, endKey []byte) *KeyRange {
return &KeyRange{startKey, endKey}
}

// EscapeAsUTF8Str returns the URL escaped key strings as they are UTF-8 encoded.
func (r *KeyRange) EscapeAsUTF8Str() (startKeyStr, endKeyStr string) {
startKeyStr = url.QueryEscape(string(r.StartKey))
endKeyStr = url.QueryEscape(string(r.EndKey))
return
}

// EscapeAsHexStr returns the URL escaped key strings as they are hex encoded.
func (r *KeyRange) EscapeAsHexStr() (startKeyStr, endKeyStr string) {
startKeyStr = url.QueryEscape(hex.EncodeToString(r.StartKey))
endKeyStr = url.QueryEscape(hex.EncodeToString(r.EndKey))
return
}

// Client defines the interface of a region client.
type Client interface {
// GetRegion gets a region and its leader Peer from PD by key.
// The region may expire after split. Caller is responsible for caching and
// taking care of region change.
// Also, it may return nil if PD finds no Region for the key temporarily,
// client should retry later.
GetRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*Region, error)
// GetRegionFromMember gets a region from certain members.
GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string, opts ...opt.GetRegionOption) (*Region, error)
// GetPrevRegion gets the previous region and its leader Peer of the region where the key is located.
GetPrevRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*Region, error)
// GetRegionByID gets a region and its leader Peer from PD by id.
GetRegionByID(ctx context.Context, regionID uint64, opts ...opt.GetRegionOption) (*Region, error)
// Deprecated: use BatchScanRegions instead.
// ScanRegions gets a list of regions, starts from the region that contains key.
// Limit limits the maximum number of regions returned. It returns all the regions in the given range if limit <= 0.
// If a region has no leader, corresponding leader will be placed by a peer
// with empty value (PeerID is 0).
ScanRegions(ctx context.Context, key, endKey []byte, limit int, opts ...opt.GetRegionOption) ([]*Region, error)
// BatchScanRegions gets a list of regions, starts from the region that contains key.
// Limit limits the maximum number of regions returned. It returns all the regions in the given ranges if limit <= 0.
// If a region has no leader, corresponding leader will be placed by a peer
// with empty value (PeerID is 0).
// The returned regions are flattened, even there are key ranges located in the same region, only one region will be returned.
BatchScanRegions(ctx context.Context, keyRanges []KeyRange, limit int, opts ...opt.GetRegionOption) ([]*Region, error)
// ScatterRegion scatters the specified region. Should use it for a batch of regions,
// and the distribution of these regions will be dispersed.
// NOTICE: This method is the old version of ScatterRegions, you should use the later one as your first choice.
ScatterRegion(ctx context.Context, regionID uint64) error
// ScatterRegions scatters the specified regions. Should use it for a batch of regions,
// and the distribution of these regions will be dispersed.
ScatterRegions(ctx context.Context, regionsID []uint64, opts ...opt.RegionsOption) (*pdpb.ScatterRegionResponse, error)
// SplitRegions split regions by given split keys
SplitRegions(ctx context.Context, splitKeys [][]byte, opts ...opt.RegionsOption) (*pdpb.SplitRegionsResponse, error)
// SplitAndScatterRegions split regions by given split keys and scatter new regions
SplitAndScatterRegions(ctx context.Context, splitKeys [][]byte, opts ...opt.RegionsOption) (*pdpb.SplitAndScatterRegionsResponse, error)
}
2 changes: 1 addition & 1 deletion client/http/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/pingcap/kvproto/pkg/encryptionpb"
"github.com/pingcap/kvproto/pkg/keyspacepb"
"github.com/pingcap/kvproto/pkg/pdpb"
pd "github.com/tikv/pd/client"
pd "github.com/tikv/pd/client/clients/region"
)

// ServiceSafePoint is the safepoint for a specific service
Expand Down
Loading

0 comments on commit 4d58b18

Please sign in to comment.