Skip to content

Commit

Permalink
sql: use WAL mode and set pragmas correctly (#98)
Browse files Browse the repository at this point in the history
This 
 - sets PRAGMAs correctly, before they were ignored as they used a wrong syntax
 - removes the `shared=cache` option as #97 removed the need for it
 - removes the mechanism retrying on SQLITE_BUSY, which is then unnecessary
 - adds code to issue `BEGIN` on read-only transactions and `BEGIN IMMEDIATE` for transactions expected to write

The last point is necessary in order to avoid `SQLITE_BUSY` (5) errors in presence of multiple concurrent writing transactions. Without it `busy_timeout` would not be sufficient, see discussion in #98

Signed-off-by: Silvio Moioli <[email protected]>
  • Loading branch information
moio authored Sep 18, 2024
1 parent 0106849 commit 4704559
Show file tree
Hide file tree
Showing 14 changed files with 146 additions and 129 deletions.
61 changes: 32 additions & 29 deletions pkg/cache/sql/db/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,10 @@ import (
"os"
"reflect"
"sync"
"time"

"github.com/pkg/errors"
"github.com/rancher/lasso/pkg/cache/sql/db/transaction"
"k8s.io/apimachinery/pkg/util/wait"
"modernc.org/sqlite"
sqlite3 "modernc.org/sqlite/lib"
_ "modernc.org/sqlite"
)

const (
Expand All @@ -37,7 +34,7 @@ type Client struct {

// Connection represents a connection pool.
type Connection interface {
Begin() (*sql.Tx, error)
BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error)
Exec(query string, args ...any) (sql.Result, error)
Prepare(query string) (*sql.Stmt, error)
Close() error
Expand Down Expand Up @@ -93,8 +90,6 @@ type Decryptor interface {
Decrypt([]byte, []byte, uint32) ([]byte, error)
}

var backoffRetry = wait.Backoff{Duration: 50 * time.Millisecond, Factor: 2, Steps: 10}

// NewClient returns a Client. If the given connection is nil then a default one will be created.
func NewClient(c Connection, encryptor Encryptor, decryptor Decryptor) (*Client, error) {
client := &Client{
Expand Down Expand Up @@ -128,24 +123,8 @@ func (c *Client) Prepare(stmt string) *sql.Stmt {
func (c *Client) QueryForRows(ctx context.Context, stmt transaction.Stmt, params ...any) (*sql.Rows, error) {
c.connLock.RLock()
defer c.connLock.RUnlock()
var rows *sql.Rows
var err error

err = wait.ExponentialBackoff(backoffRetry, func() (bool, error) {
rows, err = stmt.QueryContext(ctx, params...)
if err != nil {
sqlErr, ok := err.(*sqlite.Error)
if ok && sqlErr.Code() == sqlite3.SQLITE_BUSY {
return false, nil
}
return false, err
}
return true, nil
})
if err != nil {
return nil, err
}
return rows, nil
return stmt.QueryContext(ctx, params...)
}

// CloseStmt will call close on the given Closable. It is intended to be used with a sql statement. This function is meant
Expand Down Expand Up @@ -241,12 +220,21 @@ func (c *Client) ReadInt(rows Rows) (int, error) {
return result, nil
}

// Begin attempt to begin a transaction, and returns it along with a function for unlocking the
// database once the transaction is done.
func (c *Client) Begin() (TXClient, error) {
// BeginTx attempts to begin a transaction.
// If forWriting is true, this method blocks until all other concurrent forWriting
// transactions have either committed or rolled back.
// If forWriting is false, it is assumed the returned transaction will exclusively
// be used for DQL (eg. SELECT) queries.
// Not respecting the above rule might result in transactions failing with unexpected
// SQLITE_BUSY (5) errors (aka "Runtime error: database is locked").
// See discussion in https://github.com/rancher/lasso/pull/98 for details
func (c *Client) BeginTx(ctx context.Context, forWriting bool) (TXClient, error) {
c.connLock.RLock()
defer c.connLock.RUnlock()
sqlTx, err := c.conn.Begin()
// note: this assumes _txlock=immediate in the connection string, see NewConnection
sqlTx, err := c.conn.BeginTx(ctx, &sql.TxOptions{
ReadOnly: !forWriting,
})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -332,7 +320,22 @@ func (c *Client) NewConnection() error {
return err
}

sqlDB, err := sql.Open("sqlite", "file:"+InformerObjectCacheDBPath+"?mode=rwc&cache=shared&_journal_mode=wal&_synchronous=off&_foreign_keys=on&_busy_timeout=1000000")
sqlDB, err := sql.Open("sqlite", "file:"+InformerObjectCacheDBPath+"?"+
// open SQLite file in read-write mode, creating it if it does not exist
"mode=rwc&"+
// use the WAL journal mode for consistency and efficiency
"_pragma=journal_mode=wal&"+
// do not even attempt to attain durability. Database is thrown away at pod restart
"_pragma=synchronous=off&"+
// do check foreign keys and honor ON DELETE CASCADE
"_pragma=foreign_keys=on&"+
// if two transactions want to write at the same time, allow 2 minutes for the first to complete
// before baling out
"_pragma=busy_timeout=120000&"+
// default to IMMEDIATE mode for transactions. Setting this parameter is the only current way
// to be able to switch between DEFERRED and IMMEDIATE modes in modernc.org/sqlite's implementation
// of BeginTx
"_txlock=immediate")
if err != nil {
return err
}
Expand Down
25 changes: 19 additions & 6 deletions pkg/cache/sql/db/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,27 +395,40 @@ func TestBegin(t *testing.T) {
var tests []testCase

// Tests with shouldEncryptSet to false
tests = append(tests, testCase{description: "Begin(), with no errors", test: func(t *testing.T) {
tests = append(tests, testCase{description: "BeginTx(), with no errors", test: func(t *testing.T) {
c := SetupMockConnection(t)
e := SetupMockEncryptor(t)
d := SetupMockDecryptor(t)

sqlTx := &sql.Tx{}
c.EXPECT().Begin().Return(sqlTx, nil)
c.EXPECT().BeginTx(context.Background(), &sql.TxOptions{ReadOnly: true}).Return(sqlTx, nil)
client := SetupClient(t, c, e, d)
txC, err := client.Begin()
txC, err := client.BeginTx(context.Background(), false)
assert.Nil(t, err)
assert.NotNil(t, txC)
},
})
tests = append(tests, testCase{description: "Begin(), with connection Begin() error", test: func(t *testing.T) {
tests = append(tests, testCase{description: "BeginTx(), with forWriting option set", test: func(t *testing.T) {
c := SetupMockConnection(t)
e := SetupMockEncryptor(t)
d := SetupMockDecryptor(t)

c.EXPECT().Begin().Return(nil, fmt.Errorf("error"))
sqlTx := &sql.Tx{}
c.EXPECT().BeginTx(context.Background(), &sql.TxOptions{ReadOnly: false}).Return(sqlTx, nil)
client := SetupClient(t, c, e, d)
txC, err := client.BeginTx(context.Background(), true)
assert.Nil(t, err)
assert.NotNil(t, txC)
},
})
tests = append(tests, testCase{description: "BeginTx(), with connection Begin() error", test: func(t *testing.T) {
c := SetupMockConnection(t)
e := SetupMockEncryptor(t)
d := SetupMockDecryptor(t)

c.EXPECT().BeginTx(context.Background(), &sql.TxOptions{ReadOnly: true}).Return(nil, fmt.Errorf("error"))
client := SetupClient(t, c, e, d)
_, err := client.Begin()
_, err := client.BeginTx(context.Background(), false)
assert.NotNil(t, err)
},
})
Expand Down
13 changes: 7 additions & 6 deletions pkg/cache/sql/db/db_mocks_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions pkg/cache/sql/informer/factory/factory_mocks_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pkg/cache/sql/informer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ type Store interface {
}

type DBClient interface {
Begin() (db.TXClient, error)
BeginTx(ctx context.Context, forWriting bool) (db.TXClient, error)
QueryForRows(ctx context.Context, stmt transaction.Stmt, params ...any) (*sql.Rows, error)
ReadObjects(rows db.Rows, typ reflect.Type, shouldDecrypt bool) ([]any, error)
ReadStrings(rows db.Rows) ([]string, error)
Expand All @@ -85,7 +85,7 @@ type DBClient interface {

// NewIndexer returns a cache.Indexer backed by SQLite for objects of the given example type
func NewIndexer(indexers cache.Indexers, s Store) (*Indexer, error) {
tx, err := s.Begin()
tx, err := s.BeginTx(context.Background(), true)
if err != nil {
return nil, err
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/cache/sql/informer/indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestNewIndexer(t *testing.T) {
},
}
storeName := "someStoreName"
store.EXPECT().Begin().Return(client, nil)
store.EXPECT().BeginTx(gomock.Any(), true).Return(client, nil)
store.EXPECT().GetName().AnyTimes().Return(storeName)
client.EXPECT().Exec(fmt.Sprintf(createTableFmt, storeName, storeName)).Return(nil)
client.EXPECT().Exec(fmt.Sprintf(createIndexFmt, storeName, storeName)).Return(nil)
Expand All @@ -69,7 +69,7 @@ func TestNewIndexer(t *testing.T) {
return []string{objKey}, nil
},
}
store.EXPECT().Begin().Return(nil, fmt.Errorf("error"))
store.EXPECT().BeginTx(gomock.Any(), true).Return(nil, fmt.Errorf("error"))
_, err := NewIndexer(indexers, store)
assert.NotNil(t, err)
}})
Expand All @@ -84,7 +84,7 @@ func TestNewIndexer(t *testing.T) {
},
}
storeName := "someStoreName"
store.EXPECT().Begin().Return(client, nil)
store.EXPECT().BeginTx(gomock.Any(), true).Return(client, nil)
store.EXPECT().GetName().AnyTimes().Return(storeName)
client.EXPECT().Exec(fmt.Sprintf(createTableFmt, storeName, storeName)).Return(fmt.Errorf("error"))
_, err := NewIndexer(indexers, store)
Expand All @@ -101,7 +101,7 @@ func TestNewIndexer(t *testing.T) {
},
}
storeName := "someStoreName"
store.EXPECT().Begin().Return(client, nil)
store.EXPECT().BeginTx(gomock.Any(), true).Return(client, nil)
store.EXPECT().GetName().AnyTimes().Return(storeName)
client.EXPECT().Exec(fmt.Sprintf(createTableFmt, storeName, storeName)).Return(nil)
client.EXPECT().Exec(fmt.Sprintf(createIndexFmt, storeName, storeName)).Return(fmt.Errorf("error"))
Expand All @@ -119,7 +119,7 @@ func TestNewIndexer(t *testing.T) {
},
}
storeName := "someStoreName"
store.EXPECT().Begin().Return(client, nil)
store.EXPECT().BeginTx(gomock.Any(), true).Return(client, nil)
store.EXPECT().GetName().AnyTimes().Return(storeName)
client.EXPECT().Exec(fmt.Sprintf(createTableFmt, storeName, storeName)).Return(nil)
client.EXPECT().Exec(fmt.Sprintf(createIndexFmt, storeName, storeName)).Return(nil)
Expand Down
24 changes: 12 additions & 12 deletions pkg/cache/sql/informer/informer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,21 @@ func TestNewInformer(t *testing.T) {

// NewStore() from store package logic. This package is only concerned with whether it returns err or not as NewStore
// is tested in depth in its own package.
dbClient.EXPECT().Begin().Return(txClient, nil)
dbClient.EXPECT().BeginTx(gomock.Any(), true).Return(txClient, nil)
txClient.EXPECT().Exec(gomock.Any(), gomock.Any()).Return(nil)
txClient.EXPECT().Commit().Return(nil)
dbClient.EXPECT().Prepare(gomock.Any()).Return(&sql.Stmt{}).AnyTimes()

// NewIndexer() logic (within NewListOptionIndexer(). This test is only concerned with whether it returns err or not as NewIndexer
// is tested in depth in its own indexer_test.go
dbClient.EXPECT().Begin().Return(txClient, nil)
dbClient.EXPECT().BeginTx(gomock.Any(), true).Return(txClient, nil)
txClient.EXPECT().Exec(gomock.Any(), gomock.Any()).Return(nil)
txClient.EXPECT().Exec(gomock.Any(), gomock.Any()).Return(nil)
txClient.EXPECT().Commit().Return(nil)

// NewListOptionIndexer() logic. This test is only concerned with whether it returns err or not as NewIndexer
// is tested in depth in its own indexer_test.go
dbClient.EXPECT().Begin().Return(txClient, nil)
dbClient.EXPECT().BeginTx(context.Background(), true).Return(txClient, nil)
txClient.EXPECT().Exec(gomock.Any(), gomock.Any()).Return(nil)
txClient.EXPECT().Exec(gomock.Any(), gomock.Any()).Return(nil)
txClient.EXPECT().Exec(gomock.Any(), gomock.Any()).Return(nil)
Expand All @@ -77,7 +77,7 @@ func TestNewInformer(t *testing.T) {

// NewStore() from store package logic. This package is only concerned with whether it returns err or not as NewStore
// is tested in depth in its own package.
dbClient.EXPECT().Begin().Return(txClient, nil)
dbClient.EXPECT().BeginTx(gomock.Any(), true).Return(txClient, nil)
txClient.EXPECT().Exec(gomock.Any(), gomock.Any()).Return(nil)
txClient.EXPECT().Commit().Return(fmt.Errorf("error"))

Expand All @@ -94,14 +94,14 @@ func TestNewInformer(t *testing.T) {

// NewStore() from store package logic. This package is only concerned with whether it returns err or not as NewStore
// is tested in depth in its own package.
dbClient.EXPECT().Begin().Return(txClient, nil)
dbClient.EXPECT().BeginTx(gomock.Any(), true).Return(txClient, nil)
txClient.EXPECT().Exec(gomock.Any(), gomock.Any()).Return(nil)
txClient.EXPECT().Commit().Return(nil)
dbClient.EXPECT().Prepare(gomock.Any()).Return(&sql.Stmt{}).AnyTimes()

// NewIndexer() logic (within NewListOptionIndexer(). This test is only concerned with whether it returns err or not as NewIndexer
// is tested in depth in its own indexer_test.go
dbClient.EXPECT().Begin().Return(txClient, nil)
dbClient.EXPECT().BeginTx(gomock.Any(), true).Return(txClient, nil)
txClient.EXPECT().Exec(gomock.Any(), gomock.Any()).Return(nil)
txClient.EXPECT().Exec(gomock.Any(), gomock.Any()).Return(nil)
txClient.EXPECT().Commit().Return(fmt.Errorf("error"))
Expand All @@ -119,21 +119,21 @@ func TestNewInformer(t *testing.T) {

// NewStore() from store package logic. This package is only concerned with whether it returns err or not as NewStore
// is tested in depth in its own package.
dbClient.EXPECT().Begin().Return(txClient, nil)
dbClient.EXPECT().BeginTx(gomock.Any(), true).Return(txClient, nil)
txClient.EXPECT().Exec(gomock.Any(), gomock.Any()).Return(nil)
txClient.EXPECT().Commit().Return(nil)
dbClient.EXPECT().Prepare(gomock.Any()).Return(&sql.Stmt{}).AnyTimes()

// NewIndexer() logic (within NewListOptionIndexer(). This test is only concerned with whether it returns err or not as NewIndexer
// is tested in depth in its own indexer_test.go
dbClient.EXPECT().Begin().Return(txClient, nil)
dbClient.EXPECT().BeginTx(gomock.Any(), true).Return(txClient, nil)
txClient.EXPECT().Exec(gomock.Any(), gomock.Any()).Return(nil)
txClient.EXPECT().Exec(gomock.Any(), gomock.Any()).Return(nil)
txClient.EXPECT().Commit().Return(nil)

// NewListOptionIndexer() logic. This test is only concerned with whether it returns err or not as NewIndexer
// is tested in depth in its own indexer_test.go
dbClient.EXPECT().Begin().Return(txClient, nil)
dbClient.EXPECT().BeginTx(gomock.Any(), true).Return(txClient, nil)
txClient.EXPECT().Exec(gomock.Any(), gomock.Any()).Return(nil)
txClient.EXPECT().Exec(gomock.Any(), gomock.Any()).Return(nil)
txClient.EXPECT().Exec(gomock.Any(), gomock.Any()).Return(nil)
Expand Down Expand Up @@ -162,21 +162,21 @@ func TestNewInformer(t *testing.T) {

// NewStore() from store package logic. This package is only concerned with whether it returns err or not as NewStore
// is tested in depth in its own package.
dbClient.EXPECT().Begin().Return(txClient, nil)
dbClient.EXPECT().BeginTx(gomock.Any(), true).Return(txClient, nil)
txClient.EXPECT().Exec(gomock.Any(), gomock.Any()).Return(nil)
txClient.EXPECT().Commit().Return(nil)
dbClient.EXPECT().Prepare(gomock.Any()).Return(&sql.Stmt{}).AnyTimes()

// NewIndexer() logic (within NewListOptionIndexer(). This test is only concerned with whether it returns err or not as NewIndexer
// is tested in depth in its own indexer_test.go
dbClient.EXPECT().Begin().Return(txClient, nil)
dbClient.EXPECT().BeginTx(gomock.Any(), true).Return(txClient, nil)
txClient.EXPECT().Exec(gomock.Any(), gomock.Any()).Return(nil)
txClient.EXPECT().Exec(gomock.Any(), gomock.Any()).Return(nil)
txClient.EXPECT().Commit().Return(nil)

// NewListOptionIndexer() logic. This test is only concerned with whether it returns err or not as NewIndexer
// is tested in depth in its own indexer_test.go
dbClient.EXPECT().Begin().Return(txClient, nil)
dbClient.EXPECT().BeginTx(gomock.Any(), true).Return(txClient, nil)
txClient.EXPECT().Exec(gomock.Any(), gomock.Any()).Return(nil)
txClient.EXPECT().Exec(gomock.Any(), gomock.Any()).Return(nil)
txClient.EXPECT().Exec(gomock.Any(), gomock.Any()).Return(nil)
Expand Down
2 changes: 1 addition & 1 deletion pkg/cache/sql/informer/listoption_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func NewListOptionIndexer(fields [][]string, s Store, namespaced bool) (*ListOpt
columnDefs[index] = column
}

tx, err := l.Begin()
tx, err := l.BeginTx(context.Background(), true)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 4704559

Please sign in to comment.