Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce support for tablets #137

Merged
merged 4 commits into from
Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ jobs:
DOCKER_COMPOSE_VERSION: 2.20.0
run: sudo curl -L "https://github.com/docker/compose/releases/download/${DOCKER_COMPOSE_VERSION}/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose

- run: sudo sh -c "echo 2097152 >> /proc/sys/fs/aio-max-nr"
- run: ./integration.sh cassandra scylla
- run: ./integration.sh integration scylla
- run: ./integration.sh ccm
- run: ./integration.sh tablet
87 changes: 76 additions & 11 deletions common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,18 @@ import (
)

var (
flagCluster = flag.String("cluster", "127.0.0.1", "a comma-separated list of host:port tuples")
flagProto = flag.Int("proto", 0, "protcol version")
flagCQL = flag.String("cql", "3.0.0", "CQL version")
flagRF = flag.Int("rf", 1, "replication factor for test keyspace")
clusterSize = flag.Int("clusterSize", 1, "the expected size of the cluster")
flagRetry = flag.Int("retries", 5, "number of times to retry queries")
flagAutoWait = flag.Duration("autowait", 1000*time.Millisecond, "time to wait for autodiscovery to fill the hosts poll")
flagRunSslTest = flag.Bool("runssl", false, "Set to true to run ssl test")
flagRunAuthTest = flag.Bool("runauth", false, "Set to true to run authentication test")
flagCompressTest = flag.String("compressor", "", "compressor to use")
flagTimeout = flag.Duration("gocql.timeout", 5*time.Second, "sets the connection `timeout` for all operations")
flagCluster = flag.String("cluster", "127.0.0.1", "a comma-separated list of host:port tuples")
flagMultiNodeCluster = flag.String("multiCluster", "127.0.0.2", "a comma-separated list of host:port tuples")
flagProto = flag.Int("proto", 0, "protcol version")
flagCQL = flag.String("cql", "3.0.0", "CQL version")
flagRF = flag.Int("rf", 1, "replication factor for test keyspace")
clusterSize = flag.Int("clusterSize", 1, "the expected size of the cluster")
flagRetry = flag.Int("retries", 5, "number of times to retry queries")
flagAutoWait = flag.Duration("autowait", 1000*time.Millisecond, "time to wait for autodiscovery to fill the hosts poll")
flagRunSslTest = flag.Bool("runssl", false, "Set to true to run ssl test")
flagRunAuthTest = flag.Bool("runauth", false, "Set to true to run authentication test")
flagCompressTest = flag.String("compressor", "", "compressor to use")
flagTimeout = flag.Duration("gocql.timeout", 5*time.Second, "sets the connection `timeout` for all operations")

flagCassVersion cassVersion
)
Expand All @@ -38,6 +39,10 @@ func getClusterHosts() []string {
return strings.Split(*flagCluster, ",")
}

func getMultiNodeClusterHosts() []string {
return strings.Split(*flagMultiNodeCluster, ",")
}

func addSslOptions(cluster *ClusterConfig) *ClusterConfig {
if *flagRunSslTest {
cluster.Port = 9142
Expand Down Expand Up @@ -102,6 +107,35 @@ func createCluster(opts ...func(*ClusterConfig)) *ClusterConfig {
return cluster
}

func createMultiNodeCluster(opts ...func(*ClusterConfig)) *ClusterConfig {
clusterHosts := getMultiNodeClusterHosts()
cluster := NewCluster(clusterHosts...)
cluster.ProtoVersion = *flagProto
cluster.CQLVersion = *flagCQL
cluster.Timeout = *flagTimeout
cluster.Consistency = Quorum
cluster.MaxWaitSchemaAgreement = 2 * time.Minute // travis might be slow
if *flagRetry > 0 {
cluster.RetryPolicy = &SimpleRetryPolicy{NumRetries: *flagRetry}
}

switch *flagCompressTest {
case "snappy":
cluster.Compressor = &SnappyCompressor{}
case "":
default:
panic("invalid compressor: " + *flagCompressTest)
}

cluster = addSslOptions(cluster)

for _, opt := range opts {
opt(cluster)
}

return cluster
}

func createKeyspace(tb testing.TB, cluster *ClusterConfig, keyspace string) {
// TODO: tb.Helper()
c := *cluster
Expand Down Expand Up @@ -149,6 +183,37 @@ func createSessionFromCluster(cluster *ClusterConfig, tb testing.TB) *Session {
return session
}

func createSessionFromMultiNodeCluster(cluster *ClusterConfig, tb testing.TB) *Session {
keyspace := "test1"

session, err := cluster.CreateSession()
if err != nil {
tb.Fatal("createSession:", err)
}

initOnce.Do(func() {
if err = createTable(session, `DROP KEYSPACE IF EXISTS `+keyspace); err != nil {
panic(fmt.Sprintf("unable to drop keyspace: %v", err))
}

if err = createTable(session, fmt.Sprintf(`CREATE KEYSPACE %s
WITH replication = {
'class': 'NetworkTopologyStrategy',
'replication_factor': 1,
'initial_tablets': 8
};`, keyspace)); err != nil {
panic(fmt.Sprintf("unable to create keyspace: %v", err))
}

})

if err := session.control.awaitSchemaAgreement(); err != nil {
tb.Fatal(err)
}

return session
}

func createSession(tb testing.TB, opts ...func(config *ClusterConfig)) *Session {
cluster := createCluster(opts...)
return createSessionFromCluster(cluster, tb)
Expand Down
69 changes: 68 additions & 1 deletion conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ type Conn struct {

timeouts int64

logger StdLogger
logger StdLogger
tabletsRoutingV1 bool
}

// connect establishes a connection to a Cassandra node using session's connection config.
Expand Down Expand Up @@ -724,6 +725,9 @@ func (c *Conn) recv(ctx context.Context) error {
} else if head.stream == -1 {
// TODO: handle cassandra event frames, we shouldnt get any currently
framer := newFramerWithExts(c.compressor, c.version, c.cqlProtoExts)
c.mu.Lock()
c.tabletsRoutingV1 = framer.tabletsRoutingV1
c.mu.Unlock()
if err := framer.readFrame(c, &head); err != nil {
return err
}
Expand All @@ -733,6 +737,9 @@ func (c *Conn) recv(ctx context.Context) error {
// reserved stream that we dont use, probably due to a protocol error
// or a bug in Cassandra, this should be an error, parse it and return.
framer := newFramerWithExts(c.compressor, c.version, c.cqlProtoExts)
c.mu.Lock()
c.tabletsRoutingV1 = framer.tabletsRoutingV1
c.mu.Unlock()
if err := framer.readFrame(c, &head); err != nil {
return err
}
Expand Down Expand Up @@ -1069,6 +1076,9 @@ func (c *Conn) exec(ctx context.Context, req frameBuilder, tracer Tracer) (*fram

// resp is basically a waiting semaphore protecting the framer
framer := newFramerWithExts(c.compressor, c.version, c.cqlProtoExts)
c.mu.Lock()
c.tabletsRoutingV1 = framer.tabletsRoutingV1
c.mu.Unlock()

call := &callReq{
timeout: make(chan struct{}),
Expand Down Expand Up @@ -1453,6 +1463,63 @@ func (c *Conn) executeQuery(ctx context.Context, qry *Query) *Iter {
return &Iter{err: err}
}

if len(framer.customPayload) > 0 {
if tabletInfo, ok := framer.customPayload["tablets-routing-v1"]; ok {
var firstToken string
var lastToken string
var replicas [][]interface{}
tabletInfoValue := []interface{}{&firstToken, &lastToken, &replicas}
Unmarshal(TupleTypeInfo{
NativeType: NativeType{proto: c.version, typ: TypeTuple},
Elems: []TypeInfo{
NativeType{typ: TypeBigInt},
NativeType{typ: TypeBigInt},
CollectionType{
NativeType: NativeType{proto: c.version, typ: TypeList},
Elem: TupleTypeInfo{
NativeType: NativeType{proto: c.version, typ: TypeTuple},
Elems: []TypeInfo{
NativeType{proto: c.version, typ: TypeUUID},
NativeType{proto: c.version, typ: TypeInt},
}},
},
},
}, tabletInfo, tabletInfoValue)

tablet := TabletInfo{}
tablet.firstToken, err = strconv.ParseInt(firstToken, 10, 64)
if err != nil {
return &Iter{err: err}
}
tablet.lastToken, err = strconv.ParseInt(lastToken, 10, 64)
if err != nil {
return &Iter{err: err}
}

tabletReplicas := make([]ReplicaInfo, 0, len(replicas))
for _, replica := range replicas {
if len(replica) != 2 {
return &Iter{err: err}
}
if hostId, ok := replica[0].(UUID); ok {
if shardId, ok := replica[1].(int); ok {
repInfo := ReplicaInfo{hostId, shardId}
tabletReplicas = append(tabletReplicas, repInfo)
} else {
return &Iter{err: err}
}
} else {
return &Iter{err: err}
}
}
tablet.replicas = tabletReplicas
tablet.keyspaceName = qry.routingInfo.keyspace
tablet.tableName = qry.routingInfo.table

addTablet(c.session.hostSource, &tablet)
}
}

if len(framer.traceID) > 0 && qry.trace != nil {
qry.trace.Trace(framer.traceID)
}
Expand Down
10 changes: 8 additions & 2 deletions connectionpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ type SetPartitioner interface {
SetPartitioner(partitioner string)
}

// interface to implement to receive the tablets value
// Experimental, this interface and use may change
type SetTablets interface {
SetTablets(tablets []*TabletInfo)
}

func setupTLSConfig(sslOpts *SslOptions) (*tls.Config, error) {
// Config.InsecureSkipVerify | EnableHostVerification | Result
// Config is nil | true | verify host
Expand Down Expand Up @@ -312,7 +318,7 @@ func newHostConnPool(session *Session, host *HostInfo, port, size int,
}

// Pick a connection from this connection pool for the given query.
func (pool *hostConnPool) Pick(token token) *Conn {
func (pool *hostConnPool) Pick(token token, keyspace string, table string) *Conn {
pool.mu.RLock()
defer pool.mu.RUnlock()

Expand All @@ -330,7 +336,7 @@ func (pool *hostConnPool) Pick(token token) *Conn {
}
}

return pool.connPicker.Pick(token)
return pool.connPicker.Pick(token, keyspace, table)
}

// Size returns the number of connections currently active in the pool
Expand Down
6 changes: 3 additions & 3 deletions connpicker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

type ConnPicker interface {
Pick(token) *Conn
Pick(token, string, string) *Conn
Put(*Conn)
Remove(conn *Conn)
Size() (int, int)
Expand Down Expand Up @@ -65,7 +65,7 @@ func (p *defaultConnPicker) Size() (int, int) {
return size, p.size - size
}

func (p *defaultConnPicker) Pick(token) *Conn {
func (p *defaultConnPicker) Pick(token, string, string) *Conn {
pos := int(atomic.AddUint32(&p.pos, 1) - 1)
size := len(p.conns)

Expand Down Expand Up @@ -104,7 +104,7 @@ func (*defaultConnPicker) NextShard() (shardID, nrShards int) {
// to the point where we have first connection.
type nopConnPicker struct{}

func (nopConnPicker) Pick(token) *Conn {
func (nopConnPicker) Pick(token, string, string) *Conn {
return nil
}

Expand Down
54 changes: 54 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,60 @@ services:
interval: 5s
timeout: 5s
retries: 18
node_2:
image: scylladb/scylla-nightly
command: |
--experimental-features consistent-topology-changes
--experimental-features tablets
--smp 2
--memory 1G
--seeds 192.168.100.12
networks:
public:
ipv4_address: 192.168.100.12
healthcheck:
test: [ "CMD", "cqlsh", "192.168.100.12", "-e", "select * from system.local" ]
interval: 5s
timeout: 5s
retries: 18
node_3:
image: scylladb/scylla-nightly
command: |
--experimental-features consistent-topology-changes
--experimental-features tablets
--smp 2
--memory 1G
--seeds 192.168.100.12
networks:
public:
ipv4_address: 192.168.100.13
healthcheck:
test: [ "CMD", "cqlsh", "192.168.100.13", "-e", "select * from system.local" ]
interval: 5s
timeout: 5s
retries: 18
depends_on:
node_2:
condition: service_healthy
node_4:
image: scylladb/scylla-nightly
command: |
--experimental-features consistent-topology-changes
--experimental-features tablets
--smp 2
--memory 1G
--seeds 192.168.100.12
networks:
public:
ipv4_address: 192.168.100.14
healthcheck:
test: [ "CMD", "cqlsh", "192.168.100.14", "-e", "select * from system.local" ]
interval: 5s
timeout: 5s
retries: 18
depends_on:
node_3:
condition: service_healthy
networks:
public:
driver: bridge
Expand Down
14 changes: 14 additions & 0 deletions frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ type framer struct {

flagLWT int
rateLimitingErrorCode int
tabletsRoutingV1 bool
}

func newFramer(compressor Compressor, version byte) *framer {
Expand Down Expand Up @@ -398,6 +399,8 @@ func newFramer(compressor Compressor, version byte) *framer {
f.header = nil
f.traceID = nil

f.tabletsRoutingV1 = false

return f
}

Expand Down Expand Up @@ -427,6 +430,17 @@ func newFramerWithExts(compressor Compressor, version byte, cqlProtoExts []cqlPr
f.rateLimitingErrorCode = castedExt.rateLimitErrorCode
}

if tabletsExt := findCQLProtoExtByName(cqlProtoExts, tabletsRoutingV1); tabletsExt != nil {
_, ok := tabletsExt.(*tabletsRoutingV1Ext)
if !ok {
Logger.Println(
fmt.Errorf("Failed to cast CQL protocol extension identified by name %s to type %T",
tabletsRoutingV1, tabletsRoutingV1Ext{}))
return f
}
f.tabletsRoutingV1 = true
}

return f
}

Expand Down
Loading
Loading