Skip to content

Commit

Permalink
Add USING TIMEOUT to schema queries
Browse files Browse the repository at this point in the history
  • Loading branch information
sylwiaszunejko committed Jul 25, 2024
1 parent 7a8b873 commit 74b83a0
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 42 deletions.
38 changes: 21 additions & 17 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,9 @@ type ClusterConfig struct {
// If not specified, defaults to the global gocql.Logger.
Logger StdLogger

// The timeout for the requests to the schema tables. (default: 60s)
MetadataSchemaRequestTimeout time.Duration

// internal config for testing
disableControlConn bool
disableInit bool
Expand All @@ -275,23 +278,24 @@ type Dialer interface {
// the same host, and will not mark the node being down or up from events.
func NewCluster(hosts ...string) *ClusterConfig {
cfg := &ClusterConfig{
Hosts: hosts,
CQLVersion: "3.0.0",
Timeout: 11 * time.Second,
ConnectTimeout: 11 * time.Second,
Port: 9042,
NumConns: 2,
Consistency: Quorum,
MaxPreparedStmts: defaultMaxPreparedStmts,
MaxRoutingKeyInfo: 1000,
PageSize: 5000,
DefaultTimestamp: true,
MaxWaitSchemaAgreement: 60 * time.Second,
ReconnectInterval: 60 * time.Second,
ConvictionPolicy: &SimpleConvictionPolicy{},
ReconnectionPolicy: &ConstantReconnectionPolicy{MaxRetries: 3, Interval: 1 * time.Second},
SocketKeepalive: 15 * time.Second,
WriteCoalesceWaitTime: 200 * time.Microsecond,
Hosts: hosts,
CQLVersion: "3.0.0",
Timeout: 11 * time.Second,
ConnectTimeout: 11 * time.Second,
Port: 9042,
NumConns: 2,
Consistency: Quorum,
MaxPreparedStmts: defaultMaxPreparedStmts,
MaxRoutingKeyInfo: 1000,
PageSize: 5000,
DefaultTimestamp: true,
MaxWaitSchemaAgreement: 60 * time.Second,
ReconnectInterval: 60 * time.Second,
ConvictionPolicy: &SimpleConvictionPolicy{},
ReconnectionPolicy: &ConstantReconnectionPolicy{MaxRetries: 3, Interval: 1 * time.Second},
SocketKeepalive: 15 * time.Second,
WriteCoalesceWaitTime: 200 * time.Microsecond,
MetadataSchemaRequestTimeout: 60 * time.Second,
}

return cfg
Expand Down
10 changes: 5 additions & 5 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1769,9 +1769,9 @@ func (c *Conn) query(ctx context.Context, statement string, values ...interface{
}

func (c *Conn) querySystemPeers(ctx context.Context, version cassVersion) *Iter {
const (
peerSchema = "SELECT * FROM system.peers"
peerV2Schemas = "SELECT * FROM system.peers_v2"
var (
peerSchema = "SELECT * FROM system.peers" + c.session.control.usingTimeoutClause
peerV2Schemas = "SELECT * FROM system.peers_v2" + c.session.control.usingTimeoutClause
)

c.mu.Lock()
Expand Down Expand Up @@ -1804,11 +1804,11 @@ func (c *Conn) querySystemPeers(ctx context.Context, version cassVersion) *Iter
}

func (c *Conn) querySystemLocal(ctx context.Context) *Iter {
return c.query(ctx, "SELECT * FROM system.local WHERE key='local'")
return c.query(ctx, "SELECT * FROM system.local WHERE key='local'"+c.session.control.usingTimeoutClause)
}

func (c *Conn) awaitSchemaAgreement(ctx context.Context) (err error) {
const localSchemas = "SELECT schema_version FROM system.local WHERE key='local'"
var localSchemas = "SELECT schema_version FROM system.local WHERE key='local'" + c.session.control.usingTimeoutClause

var versions map[string]struct{}
var schemaVersion string
Expand Down
10 changes: 7 additions & 3 deletions control.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,17 @@ type controlConn struct {
retry RetryPolicy

quit chan struct{}

usingTimeoutClause string
}

func createControlConn(session *Session) *controlConn {

control := &controlConn{
session: session,
quit: make(chan struct{}),
retry: &SimpleRetryPolicy{NumRetries: 3},
session: session,
quit: make(chan struct{}),
retry: &SimpleRetryPolicy{NumRetries: 3},
usingTimeoutClause: "",
}

control.conn.Store((*connHost)(nil))
Expand Down
2 changes: 1 addition & 1 deletion host_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,7 @@ type ringDescriber struct {

// Returns true if we are using system_schema.keyspaces instead of system.schema_keyspaces
func checkSystemSchema(control *controlConn) (bool, error) {
iter := control.query("SELECT * FROM system_schema.keyspaces")
iter := control.query("SELECT * FROM system_schema.keyspaces" + control.usingTimeoutClause)
if err := iter.err; err != nil {
if errf, ok := err.(*errorFrame); ok {
if errf.code == ErrCodeSyntax {
Expand Down
14 changes: 7 additions & 7 deletions metadata_cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ func getKeyspaceMetadata(session *Session, keyspaceName string) (*KeyspaceMetada

var replication map[string]string

iter := session.control.query(stmt, keyspaceName)
iter := session.control.query(stmt+session.control.usingTimeoutClause, keyspaceName)
if iter.NumRows() == 0 {
return nil, ErrKeyspaceDoesNotExist
}
Expand All @@ -583,7 +583,7 @@ func getKeyspaceMetadata(session *Session, keyspaceName string) (*KeyspaceMetada

var strategyOptionsJSON []byte

iter := session.control.query(stmt, keyspaceName)
iter := session.control.query(stmt+session.control.usingTimeoutClause, keyspaceName)
if iter.NumRows() == 0 {
return nil, ErrKeyspaceDoesNotExist
}
Expand Down Expand Up @@ -631,7 +631,7 @@ func getTableMetadata(session *Session, keyspaceName string) ([]TableMetadata, e
view_name
FROM system_schema.views
WHERE keyspace_name = ?`
iter = session.control.query(stmt, keyspaceName)
iter = session.control.query(stmt+session.control.usingTimeoutClause, keyspaceName)
return iter
}

Expand Down Expand Up @@ -693,7 +693,7 @@ func getTableMetadata(session *Session, keyspaceName string) ([]TableMetadata, e
}
}

iter = session.control.query(stmt, keyspaceName)
iter = session.control.query(stmt+session.control.usingTimeoutClause, keyspaceName)

tables := []TableMetadata{}
table := TableMetadata{Keyspace: keyspaceName}
Expand Down Expand Up @@ -756,7 +756,7 @@ func (s *Session) scanColumnMetadataV1(keyspace string) ([]ColumnMetadata, error

var columns []ColumnMetadata

rows := s.control.query(stmt, keyspace).Scanner()
rows := s.control.query(stmt+s.control.usingTimeoutClause, keyspace).Scanner()
for rows.Next() {
var (
column = ColumnMetadata{Keyspace: keyspace}
Expand Down Expand Up @@ -817,7 +817,7 @@ func (s *Session) scanColumnMetadataV2(keyspace string) ([]ColumnMetadata, error

var columns []ColumnMetadata

rows := s.control.query(stmt, keyspace).Scanner()
rows := s.control.query(stmt+s.control.usingTimeoutClause, keyspace).Scanner()
for rows.Next() {
var (
column = ColumnMetadata{Keyspace: keyspace}
Expand Down Expand Up @@ -875,7 +875,7 @@ func (s *Session) scanColumnMetadataSystem(keyspace string) ([]ColumnMetadata, e

var columns []ColumnMetadata

rows := s.control.query(stmt, keyspace).Scanner()
rows := s.control.query(stmt+s.control.usingTimeoutClause, keyspace).Scanner()
for rows.Next() {
column := ColumnMetadata{Keyspace: keyspace}

Expand Down
18 changes: 9 additions & 9 deletions metadata_scylla.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ func getKeyspaceMetadata(session *Session, keyspaceName string) (*KeyspaceMetada

var replication map[string]string

iter := session.control.query(stmt, keyspaceName)
iter := session.control.query(stmt+session.control.usingTimeoutClause, keyspaceName)
if iter.NumRows() == 0 {
return nil, ErrKeyspaceDoesNotExist
}
Expand Down Expand Up @@ -541,7 +541,7 @@ func getTableMetadata(session *Session, keyspaceName string) ([]TableMetadata, e
}

stmt := `SELECT * FROM system_schema.tables WHERE keyspace_name = ?`
iter := session.control.query(stmt, keyspaceName)
iter := session.control.query(stmt+session.control.usingTimeoutClause, keyspaceName)

var tables []TableMetadata
table := TableMetadata{Keyspace: keyspaceName}
Expand Down Expand Up @@ -573,7 +573,7 @@ func getTableMetadata(session *Session, keyspaceName string) ([]TableMetadata, e

stmt = `SELECT * FROM system_schema.scylla_tables WHERE keyspace_name = ? AND table_name = ?`
for i, t := range tables {
iter := session.control.query(stmt, keyspaceName, t.Name)
iter := session.control.query(stmt+session.control.usingTimeoutClause, keyspaceName, t.Name)

table := TableMetadata{}
if iter.MapScan(map[string]interface{}{
Expand Down Expand Up @@ -601,7 +601,7 @@ func getColumnMetadata(session *Session, keyspaceName string) ([]ColumnMetadata,

var columns []ColumnMetadata

iter := session.control.query(stmt, keyspaceName)
iter := session.control.query(stmt+session.control.usingTimeoutClause, keyspaceName)
column := ColumnMetadata{Keyspace: keyspaceName}

for iter.MapScan(map[string]interface{}{
Expand Down Expand Up @@ -630,7 +630,7 @@ func getTypeMetadata(session *Session, keyspaceName string) ([]TypeMetadata, err
}

stmt := `SELECT * FROM system_schema.types WHERE keyspace_name = ?`
iter := session.control.query(stmt, keyspaceName)
iter := session.control.query(stmt+session.control.usingTimeoutClause, keyspaceName)

var types []TypeMetadata
tm := TypeMetadata{Keyspace: keyspaceName}
Expand Down Expand Up @@ -661,7 +661,7 @@ func getFunctionsMetadata(session *Session, keyspaceName string) ([]FunctionMeta
var functions []FunctionMetadata
function := FunctionMetadata{Keyspace: keyspaceName}

iter := session.control.query(stmt, keyspaceName)
iter := session.control.query(stmt+session.control.usingTimeoutClause, keyspaceName)
for iter.MapScan(map[string]interface{}{
"function_name": &function.Name,
"argument_types": &function.ArgumentTypes,
Expand Down Expand Up @@ -693,7 +693,7 @@ func getAggregatesMetadata(session *Session, keyspaceName string) ([]AggregateMe
var aggregates []AggregateMetadata
aggregate := AggregateMetadata{Keyspace: keyspaceName}

iter := session.control.query(stmt, keyspaceName)
iter := session.control.query(stmt+session.control.usingTimeoutClause, keyspaceName)
for iter.MapScan(map[string]interface{}{
"aggregate_name": &aggregate.Name,
"argument_types": &aggregate.ArgumentTypes,
Expand Down Expand Up @@ -725,7 +725,7 @@ func getIndexMetadata(session *Session, keyspaceName string) ([]IndexMetadata, e
var indexes []IndexMetadata
index := IndexMetadata{}

iter := session.control.query(stmt, keyspaceName)
iter := session.control.query(stmt+session.control.usingTimeoutClause, keyspaceName)
for iter.MapScan(map[string]interface{}{
"index_name": &index.Name,
"keyspace_name": &index.KeyspaceName,
Expand All @@ -752,7 +752,7 @@ func getViewMetadata(session *Session, keyspaceName string) ([]ViewMetadata, err

stmt := `SELECT * FROM system_schema.views WHERE keyspace_name = ?`

iter := session.control.query(stmt, keyspaceName)
iter := session.control.query(stmt+session.control.usingTimeoutClause, keyspaceName)

var views []ViewMetadata
view := ViewMetadata{KeyspaceName: keyspaceName}
Expand Down
35 changes: 35 additions & 0 deletions schema_queries_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
//go:build schema
// +build schema

package gocql

import (
"testing"
"time"
)

func TestSchemaQueries(t *testing.T) {
cluster := createCluster()

fallback := RoundRobinHostPolicy()
cluster.PoolConfig.HostSelectionPolicy = TokenAwareHostPolicy(fallback)

session := createSessionFromCluster(cluster, t)
defer session.Close()

assertTrue(t, "keyspace present in schemaDescriber", session.schemaDescriber.cache["gocql_test"].Name == "gocql_test")
}

func TestLowTimeout(t *testing.T) {
cluster := createCluster()
cluster.MetadataSchemaRequestTimeout = 1 * time.Nanosecond

fallback := RoundRobinHostPolicy()
cluster.PoolConfig.HostSelectionPolicy = TokenAwareHostPolicy(fallback)

_, err := cluster.CreateSession()
if err == nil {
t.Fatal("createSession should fail due to low timeout:", err)
}

}
4 changes: 4 additions & 0 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"fmt"
"io"
"net"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -236,6 +237,9 @@ func (s *Session) init() error {
conn := s.control.getConn().conn
conn.mu.Lock()
s.tabletsRoutingV1 = conn.isTabletSupported()
if isScyllaConn(conn) {
s.control.usingTimeoutClause = " USING TIMEOUT " + strconv.FormatInt(int64(s.cfg.MetadataSchemaRequestTimeout.Nanoseconds()), 10) + "ns"
}
conn.mu.Unlock()

if !s.cfg.DisableInitialHostLookup {
Expand Down

0 comments on commit 74b83a0

Please sign in to comment.