Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce support for tablets #137

Merged
merged 4 commits into from
Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 68 additions & 1 deletion conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ type Conn struct {

timeouts int64

logger StdLogger
logger StdLogger
tabletsRoutingV1 bool
}

// connect establishes a connection to a Cassandra node using session's connection config.
Expand Down Expand Up @@ -724,6 +725,9 @@ func (c *Conn) recv(ctx context.Context) error {
} else if head.stream == -1 {
// TODO: handle cassandra event frames, we shouldnt get any currently
framer := newFramerWithExts(c.compressor, c.version, c.cqlProtoExts)
c.mu.Lock()
c.tabletsRoutingV1 = framer.tabletsRoutingV1
c.mu.Unlock()
if err := framer.readFrame(c, &head); err != nil {
return err
}
Expand All @@ -733,6 +737,9 @@ func (c *Conn) recv(ctx context.Context) error {
// reserved stream that we dont use, probably due to a protocol error
// or a bug in Cassandra, this should be an error, parse it and return.
framer := newFramerWithExts(c.compressor, c.version, c.cqlProtoExts)
c.mu.Lock()
c.tabletsRoutingV1 = framer.tabletsRoutingV1
c.mu.Unlock()
if err := framer.readFrame(c, &head); err != nil {
return err
}
Expand Down Expand Up @@ -1069,6 +1076,9 @@ func (c *Conn) exec(ctx context.Context, req frameBuilder, tracer Tracer) (*fram

// resp is basically a waiting semaphore protecting the framer
framer := newFramerWithExts(c.compressor, c.version, c.cqlProtoExts)
c.mu.Lock()
c.tabletsRoutingV1 = framer.tabletsRoutingV1
c.mu.Unlock()

call := &callReq{
timeout: make(chan struct{}),
Expand Down Expand Up @@ -1453,6 +1463,63 @@ func (c *Conn) executeQuery(ctx context.Context, qry *Query) *Iter {
return &Iter{err: err}
}

if len(framer.customPayload) > 0 {
if tabletInfo, ok := framer.customPayload["tablets-routing-v1"]; ok {
var firstToken string
var lastToken string
var replicas [][]interface{}
tabletInfoValue := []interface{}{&firstToken, &lastToken, &replicas}
Unmarshal(TupleTypeInfo{
NativeType: NativeType{proto: c.version, typ: TypeTuple},
Elems: []TypeInfo{
NativeType{typ: TypeBigInt},
NativeType{typ: TypeBigInt},
CollectionType{
NativeType: NativeType{proto: c.version, typ: TypeList},
Elem: TupleTypeInfo{
NativeType: NativeType{proto: c.version, typ: TypeTuple},
Elems: []TypeInfo{
NativeType{proto: c.version, typ: TypeUUID},
NativeType{proto: c.version, typ: TypeInt},
}},
},
},
}, tabletInfo, tabletInfoValue)

tablet := TabletInfo{}
tablet.firstToken, err = strconv.ParseInt(firstToken, 10, 64)
if err != nil {
return &Iter{err: err}
}
tablet.lastToken, err = strconv.ParseInt(lastToken, 10, 64)
if err != nil {
return &Iter{err: err}
}

tabletReplicas := make([]ReplicaInfo, 0, len(replicas))
for _, replica := range replicas {
if len(replica) != 2 {
return &Iter{err: err}
}
if hostId, ok := replica[0].(UUID); ok {
if shardId, ok := replica[1].(int); ok {
repInfo := ReplicaInfo{hostId, shardId}
tabletReplicas = append(tabletReplicas, repInfo)
} else {
return &Iter{err: err}
}
} else {
return &Iter{err: err}
}
}
tablet.replicas = tabletReplicas
tablet.keyspaceName = qry.routingInfo.keyspace
tablet.tableName = qry.routingInfo.table

addTablet(c.session.hostSource, &tablet)
}
}

if len(framer.traceID) > 0 && qry.trace != nil {
qry.trace.Trace(framer.traceID)
}
Expand Down
10 changes: 8 additions & 2 deletions connectionpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ type SetPartitioner interface {
SetPartitioner(partitioner string)
}

// interface to implement to receive the tablets value
// Experimental, this interface and use may change
type SetTablets interface {
SetTablets(tablets []*TabletInfo)
}

func setupTLSConfig(sslOpts *SslOptions) (*tls.Config, error) {
// Config.InsecureSkipVerify | EnableHostVerification | Result
// Config is nil | true | verify host
Expand Down Expand Up @@ -312,7 +318,7 @@ func newHostConnPool(session *Session, host *HostInfo, port, size int,
}

// Pick a connection from this connection pool for the given query.
func (pool *hostConnPool) Pick(token token) *Conn {
func (pool *hostConnPool) Pick(token token, keyspace string, table string) *Conn {
pool.mu.RLock()
defer pool.mu.RUnlock()

Expand All @@ -330,7 +336,7 @@ func (pool *hostConnPool) Pick(token token) *Conn {
}
}

return pool.connPicker.Pick(token)
return pool.connPicker.Pick(token, keyspace, table)
}

// Size returns the number of connections currently active in the pool
Expand Down
6 changes: 3 additions & 3 deletions connpicker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

type ConnPicker interface {
Pick(token) *Conn
Pick(token, string, string) *Conn
Put(*Conn)
Remove(conn *Conn)
Size() (int, int)
Expand Down Expand Up @@ -65,7 +65,7 @@ func (p *defaultConnPicker) Size() (int, int) {
return size, p.size - size
}

func (p *defaultConnPicker) Pick(token) *Conn {
func (p *defaultConnPicker) Pick(token, string, string) *Conn {
pos := int(atomic.AddUint32(&p.pos, 1) - 1)
size := len(p.conns)

Expand Down Expand Up @@ -104,7 +104,7 @@ func (*defaultConnPicker) NextShard() (shardID, nrShards int) {
// to the point where we have first connection.
type nopConnPicker struct{}

func (nopConnPicker) Pick(token) *Conn {
func (nopConnPicker) Pick(token, string, string) *Conn {
return nil
}

Expand Down
4 changes: 4 additions & 0 deletions frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ type framer struct {

flagLWT int
rateLimitingErrorCode int
tabletsRoutingV1 bool
}

func newFramer(compressor Compressor, version byte) *framer {
Expand Down Expand Up @@ -398,6 +399,8 @@ func newFramer(compressor Compressor, version byte) *framer {
f.header = nil
f.traceID = nil

f.tabletsRoutingV1 = false

return f
}

Expand Down Expand Up @@ -435,6 +438,7 @@ func newFramerWithExts(compressor Compressor, version byte, cqlProtoExts []cqlPr
tabletsRoutingV1, tabletsRoutingV1Ext{}))
return f
}
f.tabletsRoutingV1 = true
}

return f
Expand Down
154 changes: 154 additions & 0 deletions host_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,12 +472,151 @@ func (h *HostInfo) ScyllaShardAwarePortTLS() uint16 {
return h.scyllaShardAwarePortTLS
}

// Experimental, this interface and use may change
type ReplicaInfo struct {
hostId UUID
shardId int
}

// Experimental, this interface and use may change
type TabletInfo struct {
mu sync.RWMutex
keyspaceName string
tableName string
firstToken int64
lastToken int64
replicas []ReplicaInfo
}

func (t *TabletInfo) KeyspaceName() string {
t.mu.RLock()
defer t.mu.RUnlock()
return t.keyspaceName
}

func (t *TabletInfo) FirstToken() int64 {
t.mu.RLock()
defer t.mu.RUnlock()
return t.firstToken
}

func (t *TabletInfo) LastToken() int64 {
t.mu.RLock()
defer t.mu.RUnlock()
return t.lastToken
}

func (t *TabletInfo) TableName() string {
t.mu.RLock()
defer t.mu.RUnlock()
return t.tableName
}

func (t *TabletInfo) Replicas() []ReplicaInfo {
t.mu.RLock()
defer t.mu.RUnlock()
return t.replicas
}

// Search for place in tablets table with specific Keyspace and Table name
func findTablets(tablets []*TabletInfo, k string, t string) (int, int) {
l := -1
r := -1
for i, tablet := range tablets {
if tablet.KeyspaceName() == k && tablet.TableName() == t {
if l == -1 {
l = i
}
r = i
} else if l != -1 {
break
}
}

return l, r
}

func addTabletToTabletsList(tablets []*TabletInfo, tablet *TabletInfo) []*TabletInfo {
l, r := findTablets(tablets, tablet.keyspaceName, tablet.tableName)
if l == -1 && r == -1 {
l = 0
r = 0
} else {
r = r + 1
}

l1, r1 := l, r
l2, r2 := l1, r1

// find first overlaping range
for l1 < r1 {
mid := (l1 + r1) / 2
if tablets[mid].FirstToken() < tablet.FirstToken() {
l1 = mid + 1
} else {
r1 = mid
}
}
start := l1

if start > l && tablets[start-1].LastToken() > tablet.FirstToken() {
start = start - 1
}

// find last overlaping range
for l2 < r2 {
mid := (l2 + r2) / 2
if tablets[mid].LastToken() < tablet.LastToken() {
l2 = mid + 1
} else {
r2 = mid
}
}
end := l2
if end < r && tablets[end].FirstToken() >= tablet.LastToken() {
end = end - 1
}
if end == len(tablets) {
end = end - 1
}

updated_tablets := tablets
if start <= end {
// Delete elements from index start to end
updated_tablets = append(tablets[:start], tablets[end+1:]...)
}
// Insert tablet element at index start
updated_tablets2 := append(updated_tablets[:start], append([]*TabletInfo{tablet}, updated_tablets[start:]...)...)
return updated_tablets2
}

// Search for place in tablets table for token starting from index l to index r
func findTabletForToken(tablets []*TabletInfo, token token, l int, r int) *TabletInfo {
for l < r {
var m int
if r*l > 0 {
m = l + (r-l)/2
} else {
m = (r + l) / 2
}
if int64Token(tablets[m].LastToken()).Less(token) {
l = m + 1
} else {
r = m
}
}

return tablets[l]
}

// Polls system.peers at a specific interval to find new hosts
type ringDescriber struct {
session *Session
mu sync.Mutex
prevHosts []*HostInfo
prevPartitioner string
// Experimental, this interface and use may change
prevTablets []*TabletInfo
}

// Returns true if we are using system_schema.keyspaces instead of system.schema_keyspaces
Expand Down Expand Up @@ -835,6 +974,21 @@ func refreshRing(r *ringDescriber) error {

r.session.metadata.setPartitioner(partitioner)
r.session.policy.SetPartitioner(partitioner)

return nil
}

// Experimental, this interface and use may change
func addTablet(r *ringDescriber, tablet *TabletInfo) error {
r.mu.Lock()
defer r.mu.Unlock()

tablets := r.session.getTablets()
tablets = addTabletToTabletsList(tablets, tablet)

r.session.ring.setTablets(tablets)
r.session.policy.SetTablets(tablets)

return nil
}

Expand Down
Loading