Skip to content

Commit

Permalink
Have a test to make sure paging feature works properly with allow
Browse files Browse the repository at this point in the history
filtering queries

It is possible that empty response from nodes can confuse driver make it
producing partial results
To ensure it is not happening we need to test this scenario
  • Loading branch information
dkropachev committed Jul 8, 2024
1 parent ed9f13a commit bc4e3a3
Showing 1 changed file with 130 additions and 0 deletions.
130 changes: 130 additions & 0 deletions cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"math/big"
"net"
"reflect"
"sort"
"strconv"
"strings"
"testing"
Expand Down Expand Up @@ -312,6 +313,135 @@ func TestPaging(t *testing.T) {
}
}

func TestPagingWithAllowFiltering(t *testing.T) {
session := createSession(t)

t.Cleanup(func() {
if err := session.Query("DROP TABLE gocql_test.pagging_with_allow_filtering").Exec(); err != nil {
t.Fatal("drop table:", err)
}
session.Close()
})

if session.cfg.ProtoVersion == 1 {
t.Skip("Paging not supported. Please use Cassandra >= 2.0")
}

const (
targetP1 = 50
targetP2 = 50
totalExpectedResults = 30
pageSize = 5
deletedRageStart = 10
deletedRageEnd = 20
// Some record range is being deleted, to test tombstones appearance
expectedCount = totalExpectedResults - (deletedRageEnd - deletedRageStart)
)

paginatedSelect := fmt.Sprintf("SELECT c1, f1 FROM gocql_test.pagging_with_allow_filtering WHERE p1 = %d AND p2 = %d AND f1 < %d ALLOW FILTERING;", targetP1, targetP2, totalExpectedResults)
validateResult := func(t *testing.T, results []int) {
if len(results) != expectedCount {
t.Fatalf("expected %d got %d: %d", expectedCount, len(results), results)
}

sort.Ints(results)

expect := make([]int, 0, expectedCount)
for i := 0; i < totalExpectedResults; i++ {
if i >= deletedRageStart && i < deletedRageEnd {
continue
}
expect = append(expect, i)
}

if !reflect.DeepEqual(results, expect) {
t.Fatalf("expected %v\ngot %v", expect, results)
}
}

t.Run("Prepare", func(t *testing.T) {
if err := createTable(session,
"CREATE TABLE gocql_test.pagging_with_allow_filtering (p1 int, p2 int, c1 int, f1 int, "+
"PRIMARY KEY ((p1, p2), c1)) WITH CLUSTERING ORDER BY (c1 DESC)"); err != nil {
t.Fatal("create table:", err)
}

// Insert extra records
for i := 0; i < 100; i++ {
if err := session.Query("INSERT INTO gocql_test.pagging_with_allow_filtering (p1,p2,c1,f1) VALUES (?,?,?,?)", i, i, i, i).Exec(); err != nil {
t.Fatal("insert:", err)
}
}

// Insert records to a target partition
for i := 0; i < 100; i++ {
if err := session.Query("INSERT INTO gocql_test.pagging_with_allow_filtering (p1,p2,c1,f1) VALUES (?,?,?,?)", targetP1, targetP2, i, i).Exec(); err != nil {
t.Fatal("insert:", err)
}
}

if err := session.Query("DELETE FROM gocql_test.pagging_with_allow_filtering WHERE p1 = ? AND p2 = ? AND c1 >= ? AND c1 < ?", targetP1, targetP2, deletedRageStart, deletedRageEnd).Exec(); err != nil {
t.Fatal("insert:", err)
}
})

t.Run("AutoPagination", func(t *testing.T) {
for _, c := range []Consistency{One, Quorum} {
t.Run(c.String(), func(t *testing.T) {
iter := session.Query(paginatedSelect).Consistency(c).PageSize(pageSize).Iter()

var c1, f1 int
var results []int

for iter.Scan(&c1, &f1) {
if c1 != f1 {
t.Fatalf("expected c1 and f1 values to be the same, but got c1=%d f1=%d", c1, f1)
}
results = append(results, f1)
}
if err := iter.Close(); err != nil {
t.Fatal("select:", err.Error())
}
validateResult(t, results)
})
}
})

t.Run("ManualPagination", func(t *testing.T) {
for _, c := range []Consistency{One, Quorum} {
t.Run(c.String(), func(t *testing.T) {

var c1, f1 int
var results []int
var currentPageState []byte

qry := session.Query(paginatedSelect).Consistency(c).PageSize(pageSize)

for {
iter := qry.PageState(currentPageState).Iter()
for iter.Scan(&c1, &f1) {
if c1 != f1 {
t.Fatalf("expected c1 and f1 values to be the same, but got c1=%d f1=%d", c1, f1)
}
results = append(results, f1)
}
if err := iter.Close(); err != nil {
t.Fatal("select:", err.Error())
}
newPageState := iter.PageState()
if len(newPageState) == 0 || len(currentPageState) == len(newPageState) && bytes.Compare(newPageState, currentPageState) == 0 {
break
}
currentPageState = newPageState
}

validateResult(t, results)
})
}
})

}

func TestPagingWithBind(t *testing.T) {
session := createSession(t)
defer session.Close()
Expand Down

0 comments on commit bc4e3a3

Please sign in to comment.