-
Notifications
You must be signed in to change notification settings - Fork 44
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
NONEVM-745 LogPoller db models (#921)
* logpoller db models Signed-off-by: Dmytro Haidashenko <[email protected]> * Replace solana types with custom to support db read/write * remove redundant file Signed-off-by: Dmytro Haidashenko <[email protected]> * improve tests coverage & ensure subkey naming is consistent Signed-off-by: Dmytro Haidashenko <[email protected]> * drop redundant constraint * linter fixes * updata chainlink-common * gomodtidy --------- Signed-off-by: Dmytro Haidashenko <[email protected]>
- Loading branch information
1 parent
eac4f15
commit ed1b2f6
Showing
8 changed files
with
693 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
package logpoller | ||
|
||
import ( | ||
"time" | ||
|
||
"github.com/lib/pq" | ||
) | ||
|
||
type Filter struct { | ||
ID int64 | ||
Name string | ||
Address PublicKey | ||
EventName string | ||
EventSig []byte | ||
StartingBlock int64 | ||
EventIDL string | ||
SubkeyPaths SubkeyPaths | ||
Retention time.Duration | ||
MaxLogsKept int64 | ||
} | ||
|
||
type Log struct { | ||
ID int64 | ||
FilterID int64 | ||
ChainID string | ||
LogIndex int64 | ||
BlockHash Hash | ||
BlockNumber int64 | ||
BlockTimestamp time.Time | ||
Address PublicKey | ||
EventSig []byte | ||
SubkeyValues pq.ByteaArray | ||
TxHash Signature | ||
Data []byte | ||
CreatedAt time.Time | ||
ExpiresAt *time.Time | ||
SequenceNum int64 | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,158 @@ | ||
package logpoller | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"fmt" | ||
|
||
"github.com/smartcontractkit/chainlink-common/pkg/logger" | ||
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil" | ||
) | ||
|
||
type DSORM struct { | ||
chainID string | ||
ds sqlutil.DataSource | ||
lggr logger.Logger | ||
} | ||
|
||
// NewORM creates an DSORM scoped to chainID. | ||
func NewORM(chainID string, ds sqlutil.DataSource, lggr logger.Logger) *DSORM { | ||
return &DSORM{ | ||
chainID: chainID, | ||
ds: ds, | ||
lggr: lggr, | ||
} | ||
} | ||
|
||
func (o *DSORM) Transact(ctx context.Context, fn func(*DSORM) error) (err error) { | ||
return sqlutil.Transact(ctx, o.new, o.ds, nil, fn) | ||
} | ||
|
||
// new returns a NewORM like o, but backed by ds. | ||
func (o *DSORM) new(ds sqlutil.DataSource) *DSORM { return NewORM(o.chainID, ds, o.lggr) } | ||
|
||
// InsertFilter is idempotent. | ||
// | ||
// Each address/event pair must have a unique job id, so it may be removed when the job is deleted. | ||
// Returns ID for updated or newly inserted filter. | ||
func (o *DSORM) InsertFilter(ctx context.Context, filter Filter) (id int64, err error) { | ||
args, err := newQueryArgs(o.chainID). | ||
withField("name", filter.Name). | ||
withRetention(filter.Retention). | ||
withMaxLogsKept(filter.MaxLogsKept). | ||
withName(filter.Name). | ||
withAddress(filter.Address). | ||
withEventName(filter.EventName). | ||
withEventSig(filter.EventSig). | ||
withStartingBlock(filter.StartingBlock). | ||
withEventIDL(filter.EventIDL). | ||
withSubkeyPaths(filter.SubkeyPaths). | ||
toArgs() | ||
if err != nil { | ||
return 0, err | ||
} | ||
|
||
// '::' has to be escaped in the query string | ||
// https://github.com/jmoiron/sqlx/issues/91, https://github.com/jmoiron/sqlx/issues/428 | ||
query := ` | ||
INSERT INTO solana.log_poller_filters | ||
(chain_id, name, address, event_name, event_sig, starting_block, event_idl, subkey_paths, retention, max_logs_kept) | ||
VALUES (:chain_id, :name, :address, :event_name, :event_sig, :starting_block, :event_idl, :subkey_paths, :retention, :max_logs_kept) | ||
RETURNING id;` | ||
|
||
query, sqlArgs, err := o.ds.BindNamed(query, args) | ||
if err != nil { | ||
return 0, err | ||
} | ||
if err = o.ds.GetContext(ctx, &id, query, sqlArgs...); err != nil { | ||
return 0, err | ||
} | ||
return id, nil | ||
} | ||
|
||
// GetFilterByID returns filter by ID | ||
func (o *DSORM) GetFilterByID(ctx context.Context, id int64) (Filter, error) { | ||
query := `SELECT id, name, address, event_name, event_sig, starting_block, event_idl, subkey_paths, retention, max_logs_kept | ||
FROM solana.log_poller_filters WHERE id = $1` | ||
var result Filter | ||
err := o.ds.GetContext(ctx, &result, query, id) | ||
return result, err | ||
} | ||
|
||
// InsertLogs is idempotent to support replays. | ||
func (o *DSORM) InsertLogs(ctx context.Context, logs []Log) error { | ||
if err := o.validateLogs(logs); err != nil { | ||
return err | ||
} | ||
return o.Transact(ctx, func(orm *DSORM) error { | ||
return orm.insertLogsWithinTx(ctx, logs, orm.ds) | ||
}) | ||
} | ||
|
||
func (o *DSORM) insertLogsWithinTx(ctx context.Context, logs []Log, tx sqlutil.DataSource) error { | ||
batchInsertSize := 4000 | ||
for i := 0; i < len(logs); i += batchInsertSize { | ||
start, end := i, i+batchInsertSize | ||
if end > len(logs) { | ||
end = len(logs) | ||
} | ||
|
||
query := `INSERT INTO solana.logs | ||
(filter_id, chain_id, log_index, block_hash, block_number, block_timestamp, address, event_sig, subkey_values, tx_hash, data, created_at, expires_at, sequence_num) | ||
VALUES | ||
(:filter_id, :chain_id, :log_index, :block_hash, :block_number, :block_timestamp, :address, :event_sig, :subkey_values, :tx_hash, :data, NOW(), :expires_at, :sequence_num) | ||
ON CONFLICT DO NOTHING` | ||
|
||
_, err := tx.NamedExecContext(ctx, query, logs[start:end]) | ||
if err != nil { | ||
if errors.Is(err, context.DeadlineExceeded) && batchInsertSize > 500 { | ||
// In case of DB timeouts, try to insert again with a smaller batch upto a limit | ||
batchInsertSize /= 2 | ||
i -= batchInsertSize // counteract +=batchInsertSize on next loop iteration | ||
continue | ||
} | ||
return err | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
func (o *DSORM) validateLogs(logs []Log) error { | ||
for _, log := range logs { | ||
if o.chainID != log.ChainID { | ||
return fmt.Errorf("invalid chainID in log got %v want %v", log.ChainID, o.chainID) | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
// SelectLogs finds the logs in a given block range. | ||
func (o *DSORM) SelectLogs(ctx context.Context, start, end int64, address PublicKey, eventSig []byte) ([]Log, error) { | ||
args, err := newQueryArgsForEvent(o.chainID, address, eventSig). | ||
withStartBlock(start). | ||
withEndBlock(end). | ||
toArgs() | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
query := logsQuery(` | ||
WHERE chain_id = :chain_id | ||
AND address = :address | ||
AND event_sig = :event_sig | ||
AND block_number >= :start_block | ||
AND block_number <= :end_block | ||
ORDER BY block_number, log_index`) | ||
|
||
var logs []Log | ||
query, sqlArgs, err := o.ds.BindNamed(query, args) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
err = o.ds.SelectContext(ctx, &logs, query, sqlArgs...) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return logs, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,159 @@ | ||
//go:build db_tests | ||
|
||
package logpoller | ||
|
||
import ( | ||
"testing" | ||
"time" | ||
|
||
"github.com/gagliardetto/solana-go" | ||
"github.com/google/uuid" | ||
_ "github.com/jackc/pgx/v4/stdlib" | ||
"github.com/lib/pq" | ||
"github.com/stretchr/testify/require" | ||
|
||
"github.com/smartcontractkit/chainlink-common/pkg/logger" | ||
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil/pg" | ||
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests" | ||
) | ||
|
||
// NOTE: at the moment it's not possible to run all db tests at once. This issue will be addressed separately | ||
|
||
func TestLogPollerFilters(t *testing.T) { | ||
lggr := logger.Test(t) | ||
chainID := uuid.NewString() | ||
dbx := pg.NewTestDB(t, pg.TestURL(t)) | ||
orm := NewORM(chainID, dbx, lggr) | ||
|
||
privateKey, err := solana.NewRandomPrivateKey() | ||
require.NoError(t, err) | ||
pubKey := privateKey.PublicKey() | ||
t.Run("Ensure all fields are readable/writable", func(t *testing.T) { | ||
filters := []Filter{ | ||
{ | ||
Name: "happy path", | ||
Address: PublicKey(pubKey), | ||
EventName: "event", | ||
EventSig: []byte{1, 2, 3}, | ||
StartingBlock: 1, | ||
EventIDL: "{}", | ||
SubkeyPaths: SubkeyPaths([][]string{{"a", "b"}, {"c"}}), | ||
Retention: 1000, | ||
MaxLogsKept: 3, | ||
}, | ||
{ | ||
Name: "empty sub key paths", | ||
Address: PublicKey(pubKey), | ||
EventName: "event", | ||
EventSig: []byte{1, 2, 3}, | ||
StartingBlock: 1, | ||
EventIDL: "{}", | ||
SubkeyPaths: SubkeyPaths([][]string{}), | ||
Retention: 1000, | ||
MaxLogsKept: 3, | ||
}, | ||
{ | ||
Name: "nil sub key paths", | ||
Address: PublicKey(pubKey), | ||
EventName: "event", | ||
EventSig: []byte{1, 2, 3}, | ||
StartingBlock: 1, | ||
EventIDL: "{}", | ||
SubkeyPaths: nil, | ||
Retention: 1000, | ||
MaxLogsKept: 3, | ||
}, | ||
} | ||
|
||
for _, filter := range filters { | ||
t.Run("Read/write filter: "+filter.Name, func(t *testing.T) { | ||
ctx := tests.Context(t) | ||
id, err := orm.InsertFilter(ctx, filter) | ||
require.NoError(t, err) | ||
filter.ID = id | ||
dbFilter, err := orm.GetFilterByID(ctx, id) | ||
require.NoError(t, err) | ||
require.Equal(t, filter, dbFilter) | ||
}) | ||
} | ||
}) | ||
t.Run("Returns and error if name is not unique", func(t *testing.T) { | ||
filter := newRandomFilter(t) | ||
ctx := tests.Context(t) | ||
_, err = orm.InsertFilter(ctx, filter) | ||
require.NoError(t, err) | ||
filter.EventSig = []byte(uuid.NewString()) | ||
_, err = orm.InsertFilter(ctx, filter) | ||
require.EqualError(t, err, `ERROR: duplicate key value violates unique constraint "solana_log_poller_filter_name" (SQLSTATE 23505)`) | ||
}) | ||
} | ||
|
||
func newRandomFilter(t *testing.T) Filter { | ||
privateKey, err := solana.NewRandomPrivateKey() | ||
require.NoError(t, err) | ||
pubKey := privateKey.PublicKey() | ||
return Filter{ | ||
Name: uuid.NewString(), | ||
Address: PublicKey(pubKey), | ||
EventName: "event", | ||
EventSig: []byte{1, 2, 3}, | ||
StartingBlock: 1, | ||
EventIDL: "{}", | ||
SubkeyPaths: [][]string{{"a", "b"}, {"c"}}, | ||
Retention: 1000, | ||
MaxLogsKept: 3, | ||
} | ||
} | ||
|
||
func TestLogPollerLogs(t *testing.T) { | ||
lggr := logger.Test(t) | ||
chainID := uuid.NewString() | ||
dbx := pg.NewTestDB(t, pg.TestURL(t)) | ||
orm := NewORM(chainID, dbx, lggr) | ||
|
||
privateKey, err := solana.NewRandomPrivateKey() | ||
require.NoError(t, err) | ||
pubKey := privateKey.PublicKey() | ||
|
||
ctx := tests.Context(t) | ||
// create filter as it's required for a log | ||
filterID, err := orm.InsertFilter(ctx, Filter{ | ||
Name: "awesome filter", | ||
Address: PublicKey(pubKey), | ||
EventName: "event", | ||
EventSig: []byte{1, 2, 3}, | ||
StartingBlock: 1, | ||
EventIDL: "{}", | ||
SubkeyPaths: [][]string{{"a", "b"}, {"c"}}, | ||
Retention: 1000, | ||
MaxLogsKept: 3, | ||
}) | ||
require.NoError(t, err) | ||
data := []byte("solana is fun") | ||
signature, err := privateKey.Sign(data) | ||
require.NoError(t, err) | ||
log := Log{ | ||
FilterID: filterID, | ||
ChainID: chainID, | ||
LogIndex: 1, | ||
BlockHash: Hash(pubKey), | ||
BlockNumber: 10, | ||
BlockTimestamp: time.Unix(1731590113, 0), | ||
Address: PublicKey(pubKey), | ||
EventSig: []byte{3, 2, 1}, | ||
SubkeyValues: pq.ByteaArray([][]byte{{3, 2, 1}, {1}, {1, 2}, pubKey.Bytes()}), | ||
TxHash: Signature(signature), | ||
Data: data, | ||
} | ||
err = orm.InsertLogs(ctx, []Log{log}) | ||
require.NoError(t, err) | ||
// insert of the same Log should not produce two instances | ||
err = orm.InsertLogs(ctx, []Log{log}) | ||
require.NoError(t, err) | ||
dbLogs, err := orm.SelectLogs(ctx, 0, 100, log.Address, log.EventSig) | ||
require.NoError(t, err) | ||
require.Len(t, dbLogs, 1) | ||
log.ID = dbLogs[0].ID | ||
log.CreatedAt = dbLogs[0].CreatedAt | ||
require.Equal(t, log, dbLogs[0]) | ||
} |
Oops, something went wrong.