diff --git a/cluster.go b/cluster.go index 1b8470466..c5b45cf2e 100644 --- a/cluster.go +++ b/cluster.go @@ -7,6 +7,7 @@ package gocql import ( "context" "errors" + "fmt" "net" "time" ) @@ -93,6 +94,24 @@ type ClusterConfig struct { // Default: 128 for older CQL versions MaxRequestsPerConn int + // Threshold for the number of inflight requests per connection + // after which the connection is considered as heavy loaded + // Default: 512 + HeavyLoadedConnectionThreshold int + + // When a connection is considered as heavy loaded, the driver + // could switch to the least loaded connection for the same node. + // The switch will happen if the other connection is at least + // HeavyLoadedSwitchConnectionPercentage percentage less busy + // (in terms of inflight requests). + // + // For the default value of 20%, if the heavy loaded connection + // has 100 inflight requests, the switch will happen only if the + // least busy connection has less than 80 inflight requests. + // + // Default: 20% + HeavyLoadedSwitchConnectionPercentage int + // Default consistency level. // Default: Quorum Consistency Consistency @@ -291,27 +310,29 @@ type Dialer interface { // the same host, and will not mark the node being down or up from events. func NewCluster(hosts ...string) *ClusterConfig { cfg := &ClusterConfig{ - Hosts: hosts, - CQLVersion: "3.0.0", - Timeout: 11 * time.Second, - ConnectTimeout: 11 * time.Second, - Port: 9042, - NumConns: 2, - Consistency: Quorum, - MaxPreparedStmts: defaultMaxPreparedStmts, - MaxRoutingKeyInfo: 1000, - PageSize: 5000, - DefaultTimestamp: true, - DriverName: defaultDriverName, - DriverVersion: defaultDriverVersion, - MaxWaitSchemaAgreement: 60 * time.Second, - ReconnectInterval: 60 * time.Second, - ConvictionPolicy: &SimpleConvictionPolicy{}, - ReconnectionPolicy: &ConstantReconnectionPolicy{MaxRetries: 3, Interval: 1 * time.Second}, - InitialReconnectionPolicy: &NoReconnectionPolicy{}, - SocketKeepalive: 15 * time.Second, - WriteCoalesceWaitTime: 200 * time.Microsecond, - MetadataSchemaRequestTimeout: 60 * time.Second, + Hosts: hosts, + CQLVersion: "3.0.0", + Timeout: 11 * time.Second, + ConnectTimeout: 11 * time.Second, + Port: 9042, + NumConns: 2, + Consistency: Quorum, + MaxPreparedStmts: defaultMaxPreparedStmts, + MaxRoutingKeyInfo: 1000, + PageSize: 5000, + DefaultTimestamp: true, + DriverName: defaultDriverName, + DriverVersion: defaultDriverVersion, + MaxWaitSchemaAgreement: 60 * time.Second, + ReconnectInterval: 60 * time.Second, + ConvictionPolicy: &SimpleConvictionPolicy{}, + ReconnectionPolicy: &ConstantReconnectionPolicy{MaxRetries: 3, Interval: 1 * time.Second}, + InitialReconnectionPolicy: &NoReconnectionPolicy{}, + SocketKeepalive: 15 * time.Second, + WriteCoalesceWaitTime: 200 * time.Microsecond, + MetadataSchemaRequestTimeout: 60 * time.Second, + HeavyLoadedConnectionThreshold: 512, + HeavyLoadedSwitchConnectionPercentage: 20, } return cfg @@ -374,6 +395,14 @@ func (cfg *ClusterConfig) Validate() error { return errors.New("ReconnectionPolicy.GetMaxRetries returns non-positive number") } + if cfg.HeavyLoadedSwitchConnectionPercentage > 100 || cfg.HeavyLoadedSwitchConnectionPercentage < 0 { + return fmt.Errorf("HeavyLoadedSwitchConnectionPercentage must be between 0 and 100, got %d", cfg.HeavyLoadedSwitchConnectionPercentage) + } + + if cfg.HeavyLoadedConnectionThreshold < 0 { + return fmt.Errorf("HeavyLoadedConnectionThreshold must be greater than or equal to 0, got %d", cfg.HeavyLoadedConnectionThreshold) + } + return nil } diff --git a/conn.go b/conn.go index cb363be71..6ec823e77 100644 --- a/conn.go +++ b/conn.go @@ -1615,6 +1615,10 @@ func (c *Conn) AvailableStreams() int { return c.streams.Available() } +func (c *Conn) StreamsInUse() int { + return c.streams.InUse() +} + func (c *Conn) UseKeyspace(keyspace string) error { q := &writeQueryFrame{statement: `USE "` + keyspace + `"`} q.params.consistency = c.session.cons diff --git a/internal/streams/streams.go b/internal/streams/streams.go index b31a96979..85770404f 100644 --- a/internal/streams/streams.go +++ b/internal/streams/streams.go @@ -144,3 +144,8 @@ func (s *IDGenerator) Available() int { func (s *IDGenerator) InUse() int { return int(atomic.LoadInt32(&s.inuseStreams)) } + +// SetStreamsInUse sets streams in use counter, to be used for testing only +func SetStreamsInUse(s *IDGenerator, val int32) { + atomic.StoreInt32(&s.inuseStreams, val) +} diff --git a/scylla.go b/scylla.go index 03fc8737b..02d16e593 100644 --- a/scylla.go +++ b/scylla.go @@ -416,15 +416,14 @@ func (p *scyllaConnPicker) maybeReplaceWithLessBusyConnection(c *Conn) *Conn { return c } alternative := p.leastBusyConn() - if alternative == nil || alternative.AvailableStreams()*120 > c.AvailableStreams()*100 { - return c - } else { + if alternative != nil && alternative.StreamsInUse()*100 <= c.StreamsInUse()*(100-c.session.cfg.HeavyLoadedSwitchConnectionPercentage) { return alternative } + return c } func isHeavyLoaded(c *Conn) bool { - return c.streams.NumStreams/2 > c.AvailableStreams() + return c.StreamsInUse() > c.session.cfg.HeavyLoadedConnectionThreshold } func (p *scyllaConnPicker) leastBusyConn() *Conn { diff --git a/scylla_test.go b/scylla_test.go index 4241df350..0c856d58c 100644 --- a/scylla_test.go +++ b/scylla_test.go @@ -99,6 +99,76 @@ func TestScyllaConnPickerHammerPickNilToken(t *testing.T) { wg.Wait() } +func TestScyllaConnPicker(t *testing.T) { + t.Parallel() + + t.Run("maybeReplaceWithLessBusyConnection", func(t *testing.T) { + + cfg := ClusterConfig{ + HeavyLoadedSwitchConnectionPercentage: 30, + HeavyLoadedConnectionThreshold: 100, + } + + tcases := []struct { + name string + streamsInUse [3]int32 + expected int + }{ + { + name: "all connections below threshold", + streamsInUse: [3]int32{99, 98, 97}, + expected: 0, + }, + { + name: "all connections in threshold, but none is switchable", + streamsInUse: [3]int32{110, 109, 108}, + expected: 0, + }, + { + name: "all connections in threshold, one is below threshold", + streamsInUse: [3]int32{110, 109, 70}, + expected: 2, + }, + { + name: "all connections in threshold, one is above threshold, but below switchable percentage", + streamsInUse: [3]int32{210, 130, 209}, + expected: 1, + }, + } + + for _, tcase := range tcases { + t.Run(tcase.name, func(t *testing.T) { + s := scyllaConnPicker{ + nrShards: 4, + msbIgnore: 12, + } + + conns := [3]*Conn{ + mockConn(0), + mockConn(1), + mockConn(2), + } + + for _, conn := range conns { + conn.session.cfg = cfg + s.Put(conn) + } + + for id, inUse := range tcase.streamsInUse { + streams.SetStreamsInUse(conns[id].streams, inUse) + } + + expectedConn := conns[tcase.expected] + + c := s.maybeReplaceWithLessBusyConnection(conns[0]) + if c != expectedConn { + t.Errorf("expected connection from shard %d, got %d", expectedConn.scyllaSupported.shard, c.scyllaSupported.shard) + } + }) + } + }) +} + func TestScyllaConnPickerRemove(t *testing.T) { t.Parallel() @@ -135,6 +205,7 @@ func mockConn(shard int) *Conn { partitioner: "org.apache.cassandra.dht.Murmur3Partitioner", shardingAlgorithm: "biased-token-round-robin", }, + session: &Session{}, } } diff --git a/session.go b/session.go index 40fe71960..3754c9115 100644 --- a/session.go +++ b/session.go @@ -120,9 +120,8 @@ func addrsToHosts(addrs []string, defaultPort int, logger StdLogger) ([]*HostInf // NewSession wraps an existing Node. func NewSession(cfg ClusterConfig) (*Session, error) { if err := cfg.Validate(); err != nil { - return nil, err + return nil, fmt.Errorf("gocql: unable to create session: cluster config validation failed: %v", err) } - // TODO: we should take a context in here at some point ctx, cancel := context.WithCancel(context.TODO())