Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
reductionista committed Dec 20, 2024
1 parent 0c8284c commit e98d76b
Show file tree
Hide file tree
Showing 5 changed files with 446 additions and 12 deletions.
7 changes: 7 additions & 0 deletions pkg/solana/codec/solana_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,13 @@ func TestNewIDLCodec_CircularDependency(t *testing.T) {
assert.ErrorIs(t, err, types.ErrInvalidConfig)
}

func TestNewIDLInstructionCodec(t *testing.T) {
t.Parallel()

var idl codec.IDL

}

func newTestIDLAndCodec(t *testing.T, account bool) (string, codec.IDL, types.RemoteCodec) {
t.Helper()

Expand Down
71 changes: 65 additions & 6 deletions pkg/solana/logpoller/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,36 @@ package logpoller

import (
"context"
"encoding/base64"
"errors"
"fmt"
"iter"
"maps"
"sync"
"sync/atomic"

"github.com/gagliardetto/solana-go"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/types"

"github.com/smartcontractkit/chainlink-solana/pkg/solana/codec"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/config"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/logpoller/utils"
)

type filters struct {
orm ORM
lggr logger.SugaredLogger

filtersByName map[string]Filter
filtersByAddress map[PublicKey]map[EventSignature]map[int64]Filter
filtersToBackfill map[int64]Filter
filtersToDelete map[int64]Filter
filtersMutex sync.RWMutex
loadedFilters atomic.Bool
filtersByName map[string]Filter
filtersByAddress map[PublicKey]map[EventSignature]map[int64]Filter
filtersToBackfill map[int64]Filter
filtersToDelete map[int64]Filter
filtersMutex sync.RWMutex
loadedFilters atomic.Bool
eventCodecs map[int64]types.RemoteCodec
knownPrograms map[string]struct{} // fast lookup to see if a base58-encoded ProgramID matches any registered filters
knownDiscriminators map[string]struct{} // fast lookup by first 10 characters (60-bits) of a base64-encoded discriminator
}

func newFilters(lggr logger.SugaredLogger, orm ORM) *filters {
Expand Down Expand Up @@ -75,6 +85,13 @@ func (fl *filters) RegisterFilter(ctx context.Context, filter Filter) error {
return fmt.Errorf("failed to load filters: %w", err)
}

eventCodec, err := codec.NewIDLEventCodec(filter.EventIDL, config.BuilderForEncoding(config.EncodingTypeBorsh))
if err != nil {
return fmt.Errorf("invalid event IDL for filter %s: %w", filter.Name, err)
}

filter.EventSig = utils.Discriminator("event", filter.EventName)

fl.filtersMutex.Lock()
defer fl.filtersMutex.Unlock()

Expand Down Expand Up @@ -107,6 +124,12 @@ func (fl *filters) RegisterFilter(ctx context.Context, filter Filter) error {

filtersByID[filter.ID] = filter
fl.filtersToBackfill[filterID] = filter

fl.eventCodecs[filter.ID] = eventCodec
fl.knownPrograms[filter.Address.ToSolana().String()] = struct{}{}
discriminator := base64.StdEncoding.EncodeToString(filter.EventSig[:])
fl.knownDiscriminators[discriminator[:10]] = struct{}{}

return nil
}

Expand Down Expand Up @@ -187,6 +210,42 @@ func (fl *filters) MatchingFilters(addr PublicKey, eventSignature EventSignature
}
}

func (fl *filters) EventCodec(ID int64) types.RemoteCodec {
return fl.eventCodecs[ID]
}

// MatchchingFiltersForEncodedEvent - similar to MatchingFilters but accepts a raw encoded event. Under normal operation,
// this will be called on every new event that happens on the blockchain, so it's important it returns immediately if it
// doesn't match any registered filters.
func (fl *filters) MatchingFiltersForEncodedEvent(event ProgramEvent) iter.Seq[Filter] {
if _, ok := fl.knownPrograms[event.Program]; !ok {
return nil
}

// The first 64-bits of the event data is the event sig. Because it's base64 encoded, this corresponds to
// the first 10 characters plus 4 bits of the 11th character. We can quickly rule it out as not matching any known
// discriminators if the first 10 characters don't match. If it passes that initial test, we base64-decode the
// first 11 characters, and use the first 8 bytes of that as the event sig to call MatchingFilters. The address
// also needs to be base58-decoded to pass to MatchingFilters
if _, ok := fl.knownDiscriminators[event.Data[:10]]; !ok {
return nil
}

addr, err := solana.PublicKeyFromBase58(event.Program)
if err != nil {
fl.lggr.Errorw("failed to parse Program ID for event", "EventProgram", event)
return nil
}
decoded, err := base64.StdEncoding.DecodeString(event.Data[:11])
if err != nil {
fl.lggr.Errorw("failed to decode event data", "EventProgram", event)
return nil
}
eventSig := EventSignature(decoded[:8])

return fl.MatchingFilters(PublicKey(addr), eventSig)
}

// ConsumeFiltersToBackfill - removes all filters from the backfill queue and returns them to caller.
// Requires LoadFilters to be called at least once.
func (fl *filters) ConsumeFiltersToBackfill() map[int64]Filter {
Expand Down
86 changes: 81 additions & 5 deletions pkg/solana/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,18 @@ package logpoller

import (
"context"
"encoding/base64"
"errors"
"fmt"
"math"
"reflect"
"sync"
"time"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
commontypes "github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink-common/pkg/utils"

"github.com/smartcontractkit/chainlink-solana/pkg/solana/client"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/internal"
Expand All @@ -19,10 +25,12 @@ var (

//go:generate mockery --name ORM --inpackage --structname mockORM --filename mock_orm.go
type ORM interface {
ChainID() string
InsertFilter(ctx context.Context, filter Filter) (id int64, err error)
SelectFilters(ctx context.Context) ([]Filter, error)
DeleteFilters(ctx context.Context, filters map[int64]Filter) error
MarkFilterDeleted(ctx context.Context, id int64) (err error)
InsertLogs(context.Context, []Log) (err error)
}

type ILogPoller interface {
Expand All @@ -39,8 +47,10 @@ type LogPoller struct {
client internal.Loader[client.Reader]
collector *EncodedLogCollector

filters *filters
events []ProgramEvent
filters *filters
discriminatorLookup map[string]string
events []ProgramEvent
codec commontypes.RemoteCodec

chStop services.StopChan
wg sync.WaitGroup
Expand All @@ -57,12 +67,78 @@ func NewLogPoller(lggr logger.SugaredLogger, orm ORM, cl internal.Loader[client.
return &lp
}

func (lp *LogPoller) Process(event ProgramEvent) error {
// process stream of events coming from event loader
lp.events = append(lp.events, event)
func makeLogIndex(txIndex int, txLogIndex uint) int64 {
if txIndex < 0 || txIndex > math.MaxUint32 || txLogIndex > math.MaxUint32 {
panic(fmt.Sprintf("txIndex or txLogIndex out of range: txIndex=%d, txLogIndex=%d", txIndex, txLogIndex))
}
return int64(math.MaxUint32*uint32(txIndex) + uint32(txLogIndex))
}

// Process - process stream of events coming from log ingester
func (lp *LogPoller) Process(programEvent ProgramEvent) (err error) {
ctx, cancel := utils.ContextFromChan(lp.chStop)
defer cancel()

blockData := programEvent.BlockData

var logs []Log
for filter := range lp.filters.MatchingFiltersForEncodedEvent(programEvent) {
log := Log{
FilterID: filter.ID,
ChainID: lp.orm.ChainID(),
LogIndex: makeLogIndex(blockData.TransactionIndex, blockData.TransactionLogIndex),
BlockHash: Hash(blockData.BlockHash),
BlockNumber: int64(blockData.BlockHeight),
BlockTimestamp: blockData.BlockTime.Time(), // TODO: is this a timezone safe conversion?
Address: filter.Address,
EventSig: filter.EventSig,
TxHash: Signature(blockData.TransactionHash),
}

log.Data, err = base64.StdEncoding.DecodeString(programEvent.Data)
if err != nil {
return err
}

var event any
err = lp.filters.EventCodec(filter.ID).Decode(ctx, log.Data, &event, filter.EventName)
if err != nil {
return err
}

err = lp.ExtractSubkeys(reflect.TypeOf(event), filter.SubkeyPaths)
if err != nil {
return err
}

// TODO: fill in, and keep track of SequenceNumber for each filter. (Initialize from db on LoadFilters, then increment each time?)

logs = append(logs, log)
}

lp.orm.InsertLogs(ctx, logs)
return nil
}

func (lp *LogPoller) ExtractSubkeys(t reflect.Type, paths SubkeyPaths) error {
s := reflect.TypeOf(event)
if s.Kind() != reflect.Struct {
return fmt.Errorf("event type must be struct, got %v. event=%v", t, event)
}

for _, path := range paths[0] {
field, err := s.FieldByName(path)
for depth := 0; depth < len(paths); depth++ {
for _, path := range paths[depth] {
field, err = field.Type.FieldByName(path)
}
}
}

}

func get

func (lp *LogPoller) Start(context.Context) error {
cl, err := lp.client.Get()
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion pkg/solana/logpoller/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"time"

"github.com/lib/pq"

"github.com/smartcontractkit/chainlink-solana/pkg/solana/codec"
)

type Filter struct {
Expand All @@ -13,7 +15,7 @@ type Filter struct {
EventName string
EventSig EventSignature
StartingBlock int64
EventIDL string
EventIDL codec.IDL
SubkeyPaths SubkeyPaths
Retention time.Duration
MaxLogsKept int64
Expand Down
Loading

0 comments on commit e98d76b

Please sign in to comment.