Skip to content

Commit

Permalink
feat(indexer/postgres)!: add basic support for header, txs and events (
Browse files Browse the repository at this point in the history
  • Loading branch information
facundomedica authored Dec 9, 2024
1 parent 34f407d commit 332d0b1
Show file tree
Hide file tree
Showing 34 changed files with 187 additions and 52 deletions.
4 changes: 4 additions & 0 deletions baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -999,6 +999,10 @@ func (app *BaseApp) Commit() (*abci.CommitResponse, error) {
app.logger.Error("Commit listening hook failed", "height", blockHeight, "err", err)
if app.streamingManager.StopNodeOnErr {
err = fmt.Errorf("Commit listening hook failed: %w", err)
if blockHeight == 1 {
// can't rollback to height 0, so just return the error
return nil, fmt.Errorf("failed to commit block 1, can't automatically rollback: %w", err)
}
rollbackErr := app.cms.RollbackToVersion(blockHeight - 1)
if rollbackErr != nil {
return nil, errors.Join(err, rollbackErr)
Expand Down
39 changes: 29 additions & 10 deletions baseapp/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package baseapp

import (
"context"
"encoding/json"
"fmt"
"sort"
"strconv"
Expand All @@ -20,6 +21,7 @@ import (

"github.com/cosmos/cosmos-sdk/client/flags"
servertypes "github.com/cosmos/cosmos-sdk/server/types"
sdk "github.com/cosmos/cosmos-sdk/types"
)

const (
Expand Down Expand Up @@ -48,7 +50,7 @@ func (app *BaseApp) EnableIndexer(indexerOpts interface{}, keys map[string]*stor
app.cms.AddListeners(exposedKeys)

app.streamingManager = storetypes.StreamingManager{
ABCIListeners: []storetypes.ABCIListener{listenerWrapper{listener.Listener}},
ABCIListeners: []storetypes.ABCIListener{listenerWrapper{listener.Listener, app.txDecoder}},
StopNodeOnErr: true,
}

Expand Down Expand Up @@ -144,9 +146,10 @@ func exposeStoreKeysSorted(keysStr []string, keys map[string]*storetypes.KVStore
return exposeStoreKeys
}

func eventToAppDataEvent(event abci.Event) (appdata.Event, error) {
func eventToAppDataEvent(event abci.Event, height int64) (appdata.Event, error) {
appdataEvent := appdata.Event{
Type: event.Type,
BlockNumber: uint64(height),
Type: event.Type,
Attributes: func() ([]appdata.EventAttribute, error) {
attrs := make([]appdata.EventAttribute, len(event.Attributes))
for j, attr := range event.Attributes {
Expand Down Expand Up @@ -197,7 +200,8 @@ func eventToAppDataEvent(event abci.Event) (appdata.Event, error) {
}

type listenerWrapper struct {
listener appdata.Listener
listener appdata.Listener
txDecoder sdk.TxDecoder
}

// NewListenerWrapper creates a new listenerWrapper.
Expand All @@ -208,20 +212,35 @@ func NewListenerWrapper(listener appdata.Listener) listenerWrapper {

func (p listenerWrapper) ListenFinalizeBlock(_ context.Context, req abci.FinalizeBlockRequest, res abci.FinalizeBlockResponse) error {
if p.listener.StartBlock != nil {
// clean up redundant data
reqWithoutTxs := req
reqWithoutTxs.Txs = nil

if err := p.listener.StartBlock(appdata.StartBlockData{
Height: uint64(req.Height),
HeaderBytes: nil, // TODO: https://github.com/cosmos/cosmos-sdk/issues/22009
HeaderJSON: nil, // TODO: https://github.com/cosmos/cosmos-sdk/issues/22009
HeaderJSON: func() (json.RawMessage, error) {
return json.Marshal(reqWithoutTxs)
},
}); err != nil {
return err
}
}
if p.listener.OnTx != nil {
for i, tx := range req.Txs {
if err := p.listener.OnTx(appdata.TxData{
TxIndex: int32(i),
Bytes: func() ([]byte, error) { return tx, nil },
JSON: nil, // TODO: https://github.com/cosmos/cosmos-sdk/issues/22009
BlockNumber: uint64(req.Height),
TxIndex: int32(i),
Bytes: func() ([]byte, error) { return tx, nil },
JSON: func() (json.RawMessage, error) {
sdkTx, err := p.txDecoder(tx)
if err != nil {
// if the transaction cannot be decoded, return the error as JSON
// as there are some txs that might not be decodeable by the txDecoder
return json.Marshal(err)
}
return json.Marshal(sdkTx)
},
}); err != nil {
return err
}
Expand All @@ -231,14 +250,14 @@ func (p listenerWrapper) ListenFinalizeBlock(_ context.Context, req abci.Finaliz
events := make([]appdata.Event, len(res.Events))
var err error
for i, event := range res.Events {
events[i], err = eventToAppDataEvent(event)
events[i], err = eventToAppDataEvent(event, req.Height)
if err != nil {
return err
}
}
for _, txResult := range res.TxResults {
for _, event := range txResult.Events {
appdataEvent, err := eventToAppDataEvent(event)
appdataEvent, err := eventToAppDataEvent(event, req.Height)
if err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions codec/collections.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,7 @@ func protoCol(f protoreflect.FieldDescriptor) schema.Field {
col.Kind = schema.StringKind
case protoreflect.BytesKind:
col.Kind = schema.BytesKind
col.Nullable = true
case protoreflect.EnumKind:
// TODO: support enums
col.Kind = schema.EnumKind
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ require (
// TODO remove after all modules have their own go.mods
replace (
cosmossdk.io/api => ./api
cosmossdk.io/schema => ./schema
cosmossdk.io/store => ./store
cosmossdk.io/x/bank => ./x/bank
cosmossdk.io/x/staking => ./x/staking
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ cosmossdk.io/log v1.5.0 h1:dVdzPJW9kMrnAYyMf1duqacoidB9uZIl+7c6z0mnq0g=
cosmossdk.io/log v1.5.0/go.mod h1:Tr46PUJjiUthlwQ+hxYtUtPn4D/oCZXAkYevBeh5+FI=
cosmossdk.io/math v1.4.0 h1:XbgExXFnXmF/CccPPEto40gOO7FpWu9yWNAZPN3nkNQ=
cosmossdk.io/math v1.4.0/go.mod h1:O5PkD4apz2jZs4zqFdTr16e1dcaQCc5z6lkEnrrppuk=
cosmossdk.io/schema v0.3.1-0.20241128094659-bd76b47e1d8b h1:svpFdulZRrYz+RTHu2u9CeKkMKrIHx5354vjiHerovo=
cosmossdk.io/schema v0.3.1-0.20241128094659-bd76b47e1d8b/go.mod h1:RDAhxIeNB4bYqAlF4NBJwRrgtnciMcyyg0DOKnhNZQQ=
filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 h1:/vQbFIOMbk2FiG/kXiLl8BRyzTWDw7gX/Hz7Dd5eDMs=
Expand Down
14 changes: 8 additions & 6 deletions indexer/postgres/base_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,19 @@ CREATE TABLE IF NOT EXISTS tx
id BIGSERIAL PRIMARY KEY,
block_number BIGINT NOT NULL REFERENCES block (number),
index_in_block BIGINT NOT NULL,
data JSONB NOT NULL
data JSONB NULL,
bytes BYTEA NULL
);
CREATE TABLE IF NOT EXISTS event
(
id BIGSERIAL PRIMARY KEY,
block_number BIGINT NOT NULL REFERENCES block (number),
tx_id BIGINT NULL REFERENCES tx (id),
msg_index BIGINT NULL,
event_index BIGINT NULL,
type TEXT NOT NULL,
data JSONB NOT NULL
block_stage INTEGER NOT NULL,
tx_index BIGINT NOT NULL,
msg_index BIGINT NOT NULL,
event_index BIGINT NOT NULL,
type TEXT NULL,
data JSONB NULL
);
`
68 changes: 68 additions & 0 deletions indexer/postgres/listener.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package postgres

import (
"encoding/json"
"fmt"

"cosmossdk.io/schema/appdata"
Expand Down Expand Up @@ -81,5 +82,72 @@ func (i *indexerImpl) listener() appdata.Listener {
i.tx, err = i.db.BeginTx(i.ctx, nil)
return nil, err
},
OnTx: txListener(i),
OnEvent: eventListener(i),
}
}

func txListener(i *indexerImpl) func(data appdata.TxData) error {
return func(td appdata.TxData) error {
var bz []byte
if td.Bytes != nil {
var err error
bz, err = td.Bytes()
if err != nil {
return err
}
}

var jsonData json.RawMessage
if td.JSON != nil {
var err error
jsonData, err = td.JSON()
if err != nil {
return err
}
}

_, err := i.tx.Exec("INSERT INTO tx (block_number, index_in_block, data, bytes) VALUES ($1, $2, $3, $4)",
td.BlockNumber, td.TxIndex, jsonData, bz)

return err
}
}

func eventListener(i *indexerImpl) func(data appdata.EventData) error {
return func(data appdata.EventData) error {
for _, e := range data.Events {
var jsonData json.RawMessage

if e.Data != nil {
var err error
jsonData, err = e.Data()
if err != nil {
return fmt.Errorf("failed to get event data: %w", err)
}
} else if e.Attributes != nil {
attrs, err := e.Attributes()
if err != nil {
return fmt.Errorf("failed to get event attributes: %w", err)
}

attrsMap := map[string]interface{}{}
for _, attr := range attrs {
attrsMap[attr.Key] = attr.Value
}

jsonData, err = json.Marshal(attrsMap)
if err != nil {
return fmt.Errorf("failed to marshal event attributes: %w", err)
}
}

_, err := i.tx.Exec("INSERT INTO event (block_number, block_stage, tx_index, msg_index, event_index, type, data) VALUES ($1, $2, $3, $4, $5, $6, $7)",
e.BlockNumber, e.BlockStage, e.TxIndex, e.MsgIndex, e.EventIndex, e.Type, jsonData)
if err != nil {
return fmt.Errorf("failed to index event: %w", err)
}
}
return nil
}
}
5 changes: 5 additions & 0 deletions runtime/v2/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ func (a *App[T]) SchemaDecoderResolver() decoding.DecoderResolver {
for moduleName, module := range a.moduleManager.Modules() {
moduleSet[moduleName] = module
}

for _, overrideKey := range a.config.OverrideStoreKeys {
moduleSet[overrideKey.KvStoreKey] = moduleSet[overrideKey.ModuleName]
}

return decoding.ModuleSetDecoderResolver(moduleSet)
}

Expand Down
1 change: 1 addition & 0 deletions runtime/v2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go 1.23
replace (
cosmossdk.io/api => ../../api
cosmossdk.io/core/testing => ../../core/testing
cosmossdk.io/schema => ../../schema
cosmossdk.io/server/v2/appmanager => ../../server/v2/appmanager
cosmossdk.io/server/v2/stf => ../../server/v2/stf
cosmossdk.io/store/v2 => ../../store/v2
Expand Down
2 changes: 0 additions & 2 deletions runtime/v2/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ cosmossdk.io/errors/v2 v2.0.0-20240731132947-df72853b3ca5 h1:IQNdY2kB+k+1OM2DvqF
cosmossdk.io/errors/v2 v2.0.0-20240731132947-df72853b3ca5/go.mod h1:0CuYKkFHxc1vw2JC+t21THBCALJVROrWVR/3PQ1urpc=
cosmossdk.io/log v1.5.0 h1:dVdzPJW9kMrnAYyMf1duqacoidB9uZIl+7c6z0mnq0g=
cosmossdk.io/log v1.5.0/go.mod h1:Tr46PUJjiUthlwQ+hxYtUtPn4D/oCZXAkYevBeh5+FI=
cosmossdk.io/schema v0.3.0 h1:01lcaM4trhzZ1HQTfTV8z6Ma1GziOZ/YmdzBN3F720c=
cosmossdk.io/schema v0.3.0/go.mod h1:RDAhxIeNB4bYqAlF4NBJwRrgtnciMcyyg0DOKnhNZQQ=
github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
github.com/DataDog/zstd v1.5.5 h1:oWf5W7GtOLgp6bciQYDmhHHjdhYkALu6S/5Ni9ZgSvQ=
github.com/DataDog/zstd v1.5.5/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw=
Expand Down
6 changes: 6 additions & 0 deletions schema/appdata/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ type StartBlockData struct {

// TxData represents the raw transaction data that is passed to a listener.
type TxData struct {
// BlockNumber is the block number to which this event is associated.
BlockNumber uint64

// TxIndex is the index of the transaction in the block.
TxIndex int32

Expand All @@ -53,6 +56,9 @@ type Event struct {
// If the block stage is unknown, it should be set to UnknownBlockStage.
BlockStage BlockStage

// BlockNumber is the block number to which this event is associated.
BlockNumber uint64

// TxIndex is the 1-based index of the transaction in the block to which this event is associated.
// If TxIndex is zero, it means that we do not know the transaction index.
// Otherwise, the index should start with 1.
Expand Down
3 changes: 2 additions & 1 deletion schema/decoding/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ func Middleware(target appdata.Listener, resolver DecoderResolver, opts Middlewa
var err error
moduleName, err = resolver.DecodeModuleName(kvUpdate.Actor)
if err != nil {
return err
// we don't have a codec for this module so continue
continue
}

moduleNames[string(kvUpdate.Actor)] = moduleName
Expand Down
9 changes: 5 additions & 4 deletions server/v2/cometbft/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ func (c *consensus[T]) FinalizeBlock(
events = append(events, resp.EndBlockEvents...)

// listen to state streaming changes in accordance with the block
err = c.streamDeliverBlockChanges(ctx, req.Height, req.Txs, resp.TxResults, events, stateChanges)
err = c.streamDeliverBlockChanges(ctx, req.Height, req.Txs, decodedTxs, resp.TxResults, events, stateChanges)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -590,7 +590,7 @@ func (c *consensus[T]) internalFinalizeBlock(
// TODO(tip): can we expect some txs to not decode? if so, what we do in this case? this does not seem to be the case,
// considering that prepare and process always decode txs, assuming they're the ones providing txs we should never
// have a tx that fails decoding.
decodedTxs, err := decodeTxs(req.Txs, c.txCodec)
decodedTxs, err := decodeTxs(c.logger, req.Txs, c.txCodec)
if err != nil {
return nil, nil, nil, err
}
Expand Down Expand Up @@ -723,12 +723,13 @@ func (c *consensus[T]) ExtendVote(ctx context.Context, req *abciproto.ExtendVote
return resp, err
}

func decodeTxs[T transaction.Tx](rawTxs [][]byte, codec transaction.Codec[T]) ([]T, error) {
func decodeTxs[T transaction.Tx](logger log.Logger, rawTxs [][]byte, codec transaction.Codec[T]) ([]T, error) {
txs := make([]T, len(rawTxs))
for i, rawTx := range rawTxs {
tx, err := codec.Decode(rawTx)
if err != nil {
return nil, fmt.Errorf("unable to decode tx: %d: %w", i, err)
// do not return an error here, as we want to deliver the block even if some txs are invalid
logger.Debug("failed to decode tx", "err", err)
}
txs[i] = tx
}
Expand Down
1 change: 1 addition & 0 deletions server/v2/cometbft/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.23.2
replace (
cosmossdk.io/api => ../../../api
cosmossdk.io/core/testing => ../../../core/testing
cosmossdk.io/schema => ../../../schema
cosmossdk.io/server/v2 => ../
cosmossdk.io/server/v2/appmanager => ../appmanager
cosmossdk.io/server/v2/stf => ../stf
Expand Down
2 changes: 0 additions & 2 deletions server/v2/cometbft/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ cosmossdk.io/log v1.5.0 h1:dVdzPJW9kMrnAYyMf1duqacoidB9uZIl+7c6z0mnq0g=
cosmossdk.io/log v1.5.0/go.mod h1:Tr46PUJjiUthlwQ+hxYtUtPn4D/oCZXAkYevBeh5+FI=
cosmossdk.io/math v1.4.0 h1:XbgExXFnXmF/CccPPEto40gOO7FpWu9yWNAZPN3nkNQ=
cosmossdk.io/math v1.4.0/go.mod h1:O5PkD4apz2jZs4zqFdTr16e1dcaQCc5z6lkEnrrppuk=
cosmossdk.io/schema v0.3.1-0.20241128094659-bd76b47e1d8b h1:svpFdulZRrYz+RTHu2u9CeKkMKrIHx5354vjiHerovo=
cosmossdk.io/schema v0.3.1-0.20241128094659-bd76b47e1d8b/go.mod h1:RDAhxIeNB4bYqAlF4NBJwRrgtnciMcyyg0DOKnhNZQQ=
filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 h1:/vQbFIOMbk2FiG/kXiLl8BRyzTWDw7gX/Hz7Dd5eDMs=
Expand Down
11 changes: 8 additions & 3 deletions server/v2/cometbft/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cometbft

import (
"context"
"encoding/json"

"cosmossdk.io/core/event"
"cosmossdk.io/core/server"
Expand All @@ -16,6 +17,7 @@ func (c *consensus[T]) streamDeliverBlockChanges(
ctx context.Context,
height int64,
txs [][]byte,
decodedTxs []T,
txResults []server.TxResult,
events []event.Event,
stateChanges []store.StateChanges,
Expand Down Expand Up @@ -76,9 +78,12 @@ func (c *consensus[T]) streamDeliverBlockChanges(
if c.listener.OnTx != nil {
for i, tx := range txs {
if err := c.listener.OnTx(appdata.TxData{
TxIndex: int32(i),
Bytes: func() ([]byte, error) { return tx, nil },
JSON: nil, // TODO: https://github.com/cosmos/cosmos-sdk/issues/22009
BlockNumber: uint64(height),
TxIndex: int32(i),
Bytes: func() ([]byte, error) { return tx, nil },
JSON: func() (json.RawMessage, error) {
return json.Marshal(decodedTxs[i])
},
}); err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions server/v2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.23

replace (
cosmossdk.io/api => ../../api
cosmossdk.io/schema => ../../schema
cosmossdk.io/server/v2/appmanager => ./appmanager
cosmossdk.io/server/v2/stf => ./stf
cosmossdk.io/store/v2 => ../../store/v2
Expand Down
Loading

0 comments on commit 332d0b1

Please sign in to comment.