Skip to content

Commit

Permalink
feat(batching): make dr_height part of the primary key
Browse files Browse the repository at this point in the history
This allows us to keep old results around for recurring data request
IDs. Also introduces a few missing genesis states during initialisation.
  • Loading branch information
Thomasvdam committed Dec 2, 2024
1 parent 100951e commit 9e90765
Show file tree
Hide file tree
Showing 12 changed files with 571 additions and 136 deletions.
9 changes: 8 additions & 1 deletion proto/sedachain/batching/v1/genesis.proto
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ message GenesisState {
uint64 current_batch_number = 1;
repeated Batch batches = 2 [ (gogoproto.nullable) = false ];
repeated BatchData batch_data = 3 [ (gogoproto.nullable) = false ];
repeated DataResult data_results = 4 [ (gogoproto.nullable) = false ];
repeated GenesisDataResult data_results = 4 [ (gogoproto.nullable) = false ];
repeated BatchAssignment batch_assignments = 5
[ (gogoproto.nullable) = false ];
}
Expand All @@ -23,6 +23,7 @@ message GenesisState {
message BatchAssignment {
uint64 batch_number = 1;
string data_request_id = 2;
uint64 data_request_height = 3;
}

// BatchData represents a given batch's full data.
Expand All @@ -35,3 +36,9 @@ message BatchData {
repeated BatchSignatures batch_signatures = 4
[ (gogoproto.nullable) = false ];
}

// GenesisDataResult includes a data result and its batching status.
message GenesisDataResult {
bool batched = 1;
DataResult data_result = 2 [ (gogoproto.nullable) = false ];
}
5 changes: 4 additions & 1 deletion proto/sedachain/batching/v1/query.proto
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,10 @@ message QueryBatchesResponse {
}

// The request message for QueryDataResult RPC.
message QueryDataResultRequest { string data_request_id = 1; }
message QueryDataResultRequest {
string data_request_id = 1;
uint64 data_request_height = 2;
}

// The response message for QueryDataResult RPC.
message QueryDataResultResponse {
Expand Down
10 changes: 8 additions & 2 deletions x/batching/client/cli/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,9 @@ func GetCmdQueryBatches() *cobra.Command {
// associated with a given data request.
func GetCmdQueryDataResult() *cobra.Command {
cmd := &cobra.Command{
Use: "data-result <data_request_id>",
Use: "data-result <data_request_id> <optional_data_request_height>",
Short: "Get the data result associated with a given data request",
Args: cobra.ExactArgs(1),
Args: cobra.MaximumNArgs(2),
RunE: func(cmd *cobra.Command, args []string) error {
clientCtx, err := client.GetClientQueryContext(cmd)
if err != nil {
Expand All @@ -160,6 +160,12 @@ func GetCmdQueryDataResult() *cobra.Command {
req := &types.QueryDataResultRequest{
DataRequestId: args[0],
}
if len(args) == 2 {
req.DataRequestHeight, err = strconv.ParseUint(args[1], 10, 64)
if err != nil {
return err
}
}
res, err := queryClient.DataResult(cmd.Context(), req)
if err != nil {
return err
Expand Down
108 changes: 80 additions & 28 deletions x/batching/keeper/data_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,35 +12,32 @@ import (
// SetDataResultForBatching stores a data result so that it is ready
// to be batched.
func (k Keeper) SetDataResultForBatching(ctx context.Context, result types.DataResult) error {
return k.dataResults.Set(ctx, collections.Join(false, result.DrId), result)
return k.dataResults.Set(ctx, collections.Join3(false, result.DrId, result.DrBlockHeight), result)
}

// MarkDataResultAsBatched removes the "unbatched" variant of the given
// data result and stores a "batched" variant.
func (k Keeper) MarkDataResultAsBatched(ctx context.Context, result types.DataResult, batchNum uint64) error {
err := k.SetBatchAssignment(ctx, result.DrId, batchNum)
err := k.SetBatchAssignment(ctx, result.DrId, result.DrBlockHeight, batchNum)
if err != nil {
return err
}
err = k.dataResults.Remove(ctx, collections.Join(false, result.DrId))
err = k.dataResults.Remove(ctx, collections.Join3(false, result.DrId, result.DrBlockHeight))
if err != nil {
return err
}
return k.dataResults.Set(ctx, collections.Join(true, result.DrId), result)
return k.dataResults.Set(ctx, collections.Join3(true, result.DrId, result.DrBlockHeight), result)
}

// GetDataResult returns a data result given the associated data request's
// ID.
func (k Keeper) GetDataResult(ctx context.Context, dataReqID string) (*types.DataResult, error) {
dataResult, err := k.dataResults.Get(ctx, collections.Join(false, dataReqID))
// ID and height.
func (k Keeper) GetDataResult(ctx context.Context, dataReqID string, dataReqHeight uint64) (*types.DataResult, error) {
dataResult, err := k.dataResults.Get(ctx, collections.Join3(false, dataReqID, dataReqHeight))
if err != nil {
if errors.Is(err, collections.ErrNotFound) {
// Look among batched data requests.
dataResult, err := k.dataResults.Get(ctx, collections.Join(true, dataReqID))
dataResult, err := k.dataResults.Get(ctx, collections.Join3(true, dataReqID, dataReqHeight))
if err != nil {
if errors.Is(err, collections.ErrNotFound) {
return nil, nil
}
return nil, err
}
return &dataResult, nil
Expand All @@ -50,60 +47,115 @@ func (k Keeper) GetDataResult(ctx context.Context, dataReqID string) (*types.Dat
return &dataResult, err
}

// GetLatestDataResult returns the latest data result given the associated
// data request's ID.
func (k Keeper) GetLatestDataResult(ctx context.Context, dataReqID string) (*types.DataResult, error) {
dataResult, err := k.getLatestDataResult(ctx, false, dataReqID)
if err != nil {
if errors.Is(err, collections.ErrNotFound) {
// Look among batched data requests.
dataResult, err := k.getLatestDataResult(ctx, true, dataReqID)
if err != nil {
return nil, err
}
return dataResult, nil
}
return nil, err
}

return dataResult, nil
}

func (k Keeper) getLatestDataResult(ctx context.Context, batched bool, dataReqID string) (*types.DataResult, error) {
// The triple pair ranger does not expose the Descending() method,
// so we manually create the range using the same prefix that the
// collections.NewSuperPrefixedTripleRange uses internally.
drRange := &collections.Range[collections.Triple[bool, string, uint64]]{}
drRange.Prefix(collections.TripleSuperPrefix[bool, string, uint64](batched, dataReqID)).Descending()

itr, err := k.dataResults.Iterate(ctx, drRange)
if err != nil {
return nil, err
}
defer itr.Close()

if itr.Valid() {
kv, err := itr.KeyValue()
if err != nil {
return nil, err
}
return &kv.Value, nil
}

return nil, collections.ErrNotFound
}

// GetDataResults returns a list of data results under a given status
// (batched or not).
func (k Keeper) GetDataResults(ctx context.Context, batched bool) ([]types.DataResult, error) {
var results []types.DataResult
err := k.IterateDataResults(ctx, batched, func(_ collections.Pair[bool, string], value types.DataResult) (bool, error) {
err := k.IterateDataResults(ctx, batched, func(_ collections.Triple[bool, string, uint64], value types.DataResult) (bool, error) {
results = append(results, value)
return false, nil
})
return results, err
}

// getAllDataResults returns all data results from the store regardless
// of their batched status.
func (k Keeper) getAllDataResults(ctx context.Context) ([]types.DataResult, error) {
var dataResults []types.DataResult
// of their batched status. Used for genesis export.
func (k Keeper) getAllGenesisDataResults(ctx context.Context) ([]types.GenesisDataResult, error) {
dataResults := make([]types.GenesisDataResult, 0)
unbatched, err := k.GetDataResults(ctx, false)
if err != nil {
return nil, err
}
dataResults = append(dataResults, unbatched...)
for _, result := range unbatched {
dataResults = append(dataResults, types.GenesisDataResult{
Batched: false,
DataResult: result,
})
}
batched, err := k.GetDataResults(ctx, true)
if err != nil {
return nil, err
}
dataResults = append(dataResults, batched...)
for _, result := range batched {
dataResults = append(dataResults, types.GenesisDataResult{
Batched: true,
DataResult: result,
})
}
return dataResults, nil
}

// IterateDataResults iterates over all data results under a given
// status (batched or not) and performs a given callback function.
func (k Keeper) IterateDataResults(ctx context.Context, batched bool, cb func(key collections.Pair[bool, string], value types.DataResult) (bool, error)) error {
rng := collections.NewPrefixedPairRange[bool, string](batched)
func (k Keeper) IterateDataResults(ctx context.Context, batched bool, cb func(key collections.Triple[bool, string, uint64], value types.DataResult) (bool, error)) error {
rng := collections.NewPrefixedTripleRange[bool, string, uint64](batched)
return k.dataResults.Walk(ctx, rng, cb)
}

// SetBatchAssignment assigns a given batch number to the given data
// request.
func (k Keeper) SetBatchAssignment(ctx context.Context, dataReqID string, batchNumber uint64) error {
return k.batchAssignments.Set(ctx, dataReqID, batchNumber)
// request ID and data request height.
func (k Keeper) SetBatchAssignment(ctx context.Context, dataReqID string, dataReqHeight uint64, batchNumber uint64) error {
return k.batchAssignments.Set(ctx, collections.Join(dataReqID, dataReqHeight), batchNumber)
}

// GetBatchAssignment returns the given data request's assigned batch
// number.
func (k Keeper) GetBatchAssignment(ctx context.Context, dataReqID string) (uint64, error) {
return k.batchAssignments.Get(ctx, dataReqID)
// number for a given height.
func (k Keeper) GetBatchAssignment(ctx context.Context, dataReqID string, dataReqHeight uint64) (uint64, error) {
return k.batchAssignments.Get(ctx, collections.Join(dataReqID, dataReqHeight))
}

// getAllBatchAssignments retrieves all batch assignments from the store.
// Used for genesis export.
func (k Keeper) getAllBatchAssignments(ctx context.Context) ([]types.BatchAssignment, error) {
var batchAssignments []types.BatchAssignment
err := k.batchAssignments.Walk(ctx, nil, func(dataReqID string, batchNum uint64) (stop bool, err error) {
err := k.batchAssignments.Walk(ctx, nil, func(key collections.Pair[string, uint64], value uint64) (stop bool, err error) {
batchAssignments = append(batchAssignments, types.BatchAssignment{
BatchNumber: batchNum,
DataRequestId: dataReqID,
BatchNumber: value,
DataRequestId: key.K1(),
DataRequestHeight: key.K2(),
})
return false, nil
})
Expand Down
14 changes: 13 additions & 1 deletion x/batching/keeper/genesis.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package keeper

import (
"cosmossdk.io/collections"

sdk "github.com/cosmos/cosmos-sdk/types"

"github.com/sedaprotocol/seda-chain/x/batching/types"
Expand Down Expand Up @@ -31,11 +33,21 @@ func (k Keeper) InitGenesis(ctx sdk.Context, data types.GenesisState) {
}
}
}
for _, dataResult := range data.DataResults {
if err := k.dataResults.Set(ctx, collections.Join3(false, dataResult.DataResult.DrId, dataResult.DataResult.DrBlockHeight), dataResult.DataResult); err != nil {
panic(err)
}
}
for _, batchAssignment := range data.BatchAssignments {
if err := k.SetBatchAssignment(ctx, batchAssignment.DataRequestId, batchAssignment.DataRequestHeight, batchAssignment.BatchNumber); err != nil {
panic(err)
}
}
}

// ExportGenesis extracts all data from store to genesis state.
func (k Keeper) ExportGenesis(ctx sdk.Context) types.GenesisState {
dataResults, err := k.getAllDataResults(ctx)
dataResults, err := k.getAllGenesisDataResults(ctx)
if err != nil {
panic(err)
}
Expand Down
8 changes: 4 additions & 4 deletions x/batching/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ type Keeper struct {
authority string

Schema collections.Schema
dataResults collections.Map[collections.Pair[bool, string], types.DataResult]
batchAssignments collections.Map[string, uint64]
dataResults collections.Map[collections.Triple[bool, string, uint64], types.DataResult]
batchAssignments collections.Map[collections.Pair[string, uint64], uint64]
currentBatchNumber collections.Sequence
batches *collections.IndexedMap[int64, types.Batch, BatchIndexes]
validatorTreeEntries collections.Map[collections.Pair[uint64, []byte], types.ValidatorTreeEntry]
Expand Down Expand Up @@ -60,8 +60,8 @@ func NewKeeper(
wasmViewKeeper: wvk,
validatorAddressCodec: validatorAddressCodec,
authority: authority,
dataResults: collections.NewMap(sb, types.DataResultsPrefix, "data_results", collections.PairKeyCodec(collections.BoolKey, collections.StringKey), codec.CollValue[types.DataResult](cdc)),
batchAssignments: collections.NewMap(sb, types.BatchAssignmentsPrefix, "batch_assignments", collections.StringKey, collections.Uint64Value),
dataResults: collections.NewMap(sb, types.DataResultsPrefix, "data_results", collections.TripleKeyCodec(collections.BoolKey, collections.StringKey, collections.Uint64Key), codec.CollValue[types.DataResult](cdc)),
batchAssignments: collections.NewMap(sb, types.BatchAssignmentsPrefix, "batch_assignments", collections.PairKeyCodec(collections.StringKey, collections.Uint64Key), collections.Uint64Value),
currentBatchNumber: collections.NewSequence(sb, types.CurrentBatchNumberKey, "current_batch_number"),
batches: collections.NewIndexedMap(sb, types.BatchesKeyPrefix, "batches", collections.Int64Key, codec.CollValue[types.Batch](cdc), NewBatchIndexes(sb)),
validatorTreeEntries: collections.NewMap(sb, types.ValidatorTreeEntriesKeyPrefix, "validator_tree_entries", collections.PairKeyCodec(collections.Uint64Key, collections.BytesKey), codec.CollValue[types.ValidatorTreeEntry](cdc)),
Expand Down
40 changes: 38 additions & 2 deletions x/batching/keeper/keeper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func (s *KeeperTestSuite) TestKeeper_DataResult() {
ExitCode: 0,
Result: []byte("Ghkvq84TmIuEmU1ClubNxBjVXi8df5QhiNQEC5T8V6w="),
BlockHeight: 12345,
DrBlockHeight: 12343,
GasUsed: 20,
PaybackAddress: "",
SedaPayload: "",
Expand All @@ -104,7 +105,42 @@ func (s *KeeperTestSuite) TestKeeper_DataResult() {
s.Require().NoError(err)
s.Require().Equal(&mockDataResult, res.DataResult)
s.Require().Equal(&batchingtypes.BatchAssignment{
BatchNumber: batchNum,
DataRequestId: mockDataResult.DrId,
BatchNumber: batchNum,
DataRequestId: mockDataResult.DrId,
DataRequestHeight: mockDataResult.DrBlockHeight,
}, res.BatchAssignment)

// Resolve and batch another data result for the same data request ID.
mockDataResult2 := mockDataResult
mockDataResult2.Id = "ccf12276c43cc61e0f3c6ace3e66872eda5df5ec753525a7bddab6fa3407e927"
mockDataResult2.DrBlockHeight = 54321

err = s.keeper.SetDataResultForBatching(s.ctx, mockDataResult2)
s.Require().NoError(err)
err = s.keeper.MarkDataResultAsBatched(s.ctx, mockDataResult2, batchNum)
s.Require().NoError(err)

res, err = s.queryClient.DataResult(s.ctx, &batchingtypes.QueryDataResultRequest{
DataRequestId: mockDataResult2.DrId,
})
s.Require().NoError(err)
s.Require().Equal(&batchingtypes.BatchAssignment{
BatchNumber: batchNum,
DataRequestId: mockDataResult2.DrId,
DataRequestHeight: mockDataResult2.DrBlockHeight,
}, res.BatchAssignment)
s.Require().Equal(&mockDataResult2, res.DataResult)

// We should still be able to query the first data result.
res, err = s.queryClient.DataResult(s.ctx, &batchingtypes.QueryDataResultRequest{
DataRequestId: mockDataResult.DrId,
DataRequestHeight: mockDataResult.DrBlockHeight,
})
s.Require().NoError(err)
s.Require().Equal(&mockDataResult, res.DataResult)
s.Require().Equal(&batchingtypes.BatchAssignment{
BatchNumber: batchNum,
DataRequestId: mockDataResult.DrId,
DataRequestHeight: mockDataResult.DrBlockHeight,
}, res.BatchAssignment)
}
20 changes: 16 additions & 4 deletions x/batching/keeper/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,24 +89,36 @@ func (q Querier) Batches(c context.Context, req *types.QueryBatchesRequest) (*ty

func (q Querier) DataResult(c context.Context, req *types.QueryDataResultRequest) (*types.QueryDataResultResponse, error) {
ctx := sdk.UnwrapSDKContext(c)
dataResult, err := q.Keeper.GetDataResult(ctx, req.DataRequestId)

var dataResult *types.DataResult
var err error
if req.DataRequestHeight == 0 {
dataResult, err = q.Keeper.GetLatestDataResult(ctx, req.DataRequestId)
} else {
dataResult, err = q.Keeper.GetDataResult(ctx, req.DataRequestId, req.DataRequestHeight)
}

if err != nil {
if errors.Is(err, collections.ErrNotFound) {
return &types.QueryDataResultResponse{}, nil
}
return nil, err
}

result := &types.QueryDataResultResponse{
DataResult: dataResult,
}

batchNum, err := q.Keeper.GetBatchAssignment(ctx, req.DataRequestId)
batchNum, err := q.Keeper.GetBatchAssignment(ctx, req.DataRequestId, dataResult.DrBlockHeight)
if err != nil {
if !errors.Is(err, collections.ErrNotFound) {
return nil, err
}
} else {
result.BatchAssignment = &types.BatchAssignment{
BatchNumber: batchNum,
DataRequestId: req.DataRequestId,
BatchNumber: batchNum,
DataRequestId: req.DataRequestId,
DataRequestHeight: dataResult.DrBlockHeight,
}
}
return result, nil
Expand Down
2 changes: 1 addition & 1 deletion x/batching/types/genesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ func NewGenesisState(
curBatchNum uint64,
batches []Batch,
batchData []BatchData,
dataResults []DataResult,
dataResults []GenesisDataResult,
batchAssignments []BatchAssignment,
) GenesisState {
return GenesisState{
Expand Down
Loading

0 comments on commit 9e90765

Please sign in to comment.