Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chunk Data Model supports service event indices #6744

Open
wants to merge 12 commits into
base: feature/efm-recovery
Choose a base branch
from
20 changes: 20 additions & 0 deletions engine/execution/block_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,25 @@ func (er *BlockExecutionResult) AllEvents() flow.EventsList {
return res
}

// ServiceEventIndicesForChunk returns the list of service event indices associated with the given chunk.
// Outputs are index ranges with no gaps, and index into the flow.ExecutionResult.ServiceEvents field.
func (er *BlockExecutionResult) ServiceEventIndicesForChunk(chunkIndex int) []uint32 {
nServiceEventsForChunk := len(er.collectionExecutionResults[chunkIndex].serviceEvents)
if nServiceEventsForChunk == 0 {
return []uint32{}
}

firstIndex := 0
for i := 0; i < chunkIndex; i++ {
firstIndex += len(er.collectionExecutionResults[i].serviceEvents)
}
indices := make([]uint32, 0, nServiceEventsForChunk)
for i := firstIndex; i < firstIndex+nServiceEventsForChunk; i++ {
indices = append(indices, uint32(i))
}
return indices
}

func (er *BlockExecutionResult) AllServiceEvents() flow.EventsList {
res := make(flow.EventsList, 0)
for _, ce := range er.collectionExecutionResults {
Expand Down Expand Up @@ -199,6 +218,7 @@ func (ar *BlockAttestationResult) ChunkAt(index int) *flow.Chunk {
attestRes.startStateCommit,
len(execRes.TransactionResults()),
attestRes.eventCommit,
ar.ServiceEventIndicesForChunk(index),
attestRes.endStateCommit,
execRes.executionSnapshot.TotalComputationUsed(),
)
Expand Down
86 changes: 86 additions & 0 deletions engine/execution/block_result_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package execution

import (
"math/rand"
"testing"

"github.com/stretchr/testify/assert"

"github.com/onflow/flow-go/utils/slices"
"github.com/onflow/flow-go/utils/unittest"
)

// makeBlockExecutionResultFixture makes a BlockExecutionResult fixture
// with the specified allocation of service events to chunks.
func makeBlockExecutionResultFixture(serviceEventsPerChunk []int) *BlockExecutionResult {
fixture := new(BlockExecutionResult)
for _, nServiceEvents := range serviceEventsPerChunk {
fixture.collectionExecutionResults = append(fixture.collectionExecutionResults,
CollectionExecutionResult{
serviceEvents: unittest.EventsFixture(nServiceEvents),
convertedServiceEvents: unittest.ServiceEventsFixture(nServiceEvents),
})
}
return fixture
}

// Tests that ServiceEventIndicesForChunk method works as expected under various circumstances:
func TestBlockExecutionResult_ServiceEventIndicesForChunk(t *testing.T) {
t.Run("no service events", func(t *testing.T) {
nChunks := rand.Intn(10) + 1
blockResult := makeBlockExecutionResultFixture(make([]int, nChunks))
// all chunks should yield an empty list of service event indices
for chunkIndex := 0; chunkIndex < nChunks; chunkIndex++ {
indices := blockResult.ServiceEventIndicesForChunk(chunkIndex)
assert.Equal(t, make([]uint32, 0), indices)
}
})
t.Run("service events only in system chunk", func(t *testing.T) {
nChunks := rand.Intn(10) + 2 // at least 2 chunks
// add between 1 and 10 service events, all in the system chunk
serviceEventAllocation := make([]int, nChunks)
nServiceEvents := rand.Intn(10) + 1
serviceEventAllocation[nChunks-1] = nServiceEvents

blockResult := makeBlockExecutionResultFixture(serviceEventAllocation)
// all non-system chunks should yield an empty list of service event indices
for chunkIndex := 0; chunkIndex < nChunks-1; chunkIndex++ {
indices := blockResult.ServiceEventIndicesForChunk(chunkIndex)
assert.Equal(t, make([]uint32, 0), indices)
}
// the system chunk should contain indices for all events
expected := slices.MakeRange[uint32](0, uint32(nServiceEvents))
assert.Equal(t, expected, blockResult.ServiceEventIndicesForChunk(nChunks-1))
})
t.Run("service events only outside system chunk", func(t *testing.T) {
nChunks := rand.Intn(10) + 2 // at least 2 chunks
// add 1 service event to all non-system chunks
serviceEventAllocation := slices.Fill(1, nChunks)
serviceEventAllocation[nChunks-1] = 0

blockResult := makeBlockExecutionResultFixture(serviceEventAllocation)
// all non-system chunks should yield a length-1 list of service event indices
for chunkIndex := 0; chunkIndex < nChunks-1; chunkIndex++ {
indices := blockResult.ServiceEventIndicesForChunk(chunkIndex)
// 1 service event per chunk => service event indices match chunk indices
expected := slices.Fill(uint32(chunkIndex), 1)
assert.Equal(t, expected, indices)
}
// the system chunk should contain indices for all events
assert.Equal(t, make([]uint32, 0), blockResult.ServiceEventIndicesForChunk(nChunks-1))
})
t.Run("service events in both system chunk and other chunks", func(t *testing.T) {
nChunks := rand.Intn(10) + 2 // at least 2 chunks
// add 1 service event to all chunks (including system chunk
serviceEventAllocation := slices.Fill(1, nChunks)

blockResult := makeBlockExecutionResultFixture(serviceEventAllocation)
// all chunks should yield a length-1 list of service event indices
for chunkIndex := 0; chunkIndex < nChunks; chunkIndex++ {
indices := blockResult.ServiceEventIndicesForChunk(chunkIndex)
// 1 service event per chunk => service event indices match chunk indices
expected := slices.Fill(uint32(chunkIndex), 1)
assert.Equal(t, expected, indices)
}
})
}
51 changes: 50 additions & 1 deletion model/flow/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (

"github.com/ipfs/go-cid"
"github.com/vmihailenco/msgpack/v4"

"github.com/onflow/flow-go/model/encoding/rlp"
)

var EmptyEventCollectionID Identifier
Expand All @@ -20,19 +22,61 @@ func init() {
}
}

// ChunkBodyV0 is the prior version of ChunkBody, used for backward compatibility and tests.
// Compared to ChunkBody, ChunkBodyV0 does not have the ServiceEventIndices field.
type ChunkBodyV0 struct {
CollectionIndex uint
StartState StateCommitment
EventCollection Identifier
BlockID Identifier
TotalComputationUsed uint64
NumberOfTransactions uint64
}

type ChunkBody struct {
CollectionIndex uint

// execution info
StartState StateCommitment // start state when starting executing this chunk
EventCollection Identifier // Events generated by executing results
BlockID Identifier // Block id of the execution result this chunk belongs to
// ServiceEventIndices is a list of indices defining which service events were emitted in this chunk.
// These indices index into the ExecutionResult.ServiceEvents field.
//
// BACKWARD COMPATIBILITY:
// (1) If ServiceEventIndices is nil, this indicates that this chunk was created by an older software version
// which did support specifying a mapping between chunks and service events.
// In this case, all service events are assumed to have been emitted in the system chunk (last chunk).
// (2) Otherwise, ServiceEventIndices must be non-nil. A chunk in which no service events were emitted
// is denoted with an empty list: []uint32{}.
// Within an ExecutionResult, all chunks must use either representation (1) or (2), not both.
ServiceEventIndices []uint32
BlockID Identifier // Block id of the execution result this chunk belongs to

// Computation consumption info
TotalComputationUsed uint64 // total amount of computation used by running all txs in this chunk
NumberOfTransactions uint64 // number of transactions inside the collection
}

// Fingerprint returns the unique binary representation for the receiver ChunkBody,
// used to compute the ID (hash).
// The fingerprint is backward-compatible with the prior data model for ChunkBody: ChunkBodyV0.
// - All new ChunkBody instances must have non-nil ServiceEventIndices
// - A nil ServiceEventIndices field indicates a v0 version of ChunkBody
// - when computing the ID of such a ChunkBody, the ServiceEventIndices field is omitted from the fingerprint
func (ch ChunkBody) Fingerprint() []byte {
if ch.ServiceEventIndices == nil {
return rlp.NewMarshaler().MustMarshal(ChunkBodyV0{
CollectionIndex: ch.CollectionIndex,
StartState: ch.StartState,
EventCollection: ch.EventCollection,
BlockID: ch.BlockID,
TotalComputationUsed: ch.TotalComputationUsed,
NumberOfTransactions: ch.NumberOfTransactions,
})
}
return rlp.NewMarshaler().MustMarshal(ch)
}

type Chunk struct {
ChunkBody

Expand All @@ -47,16 +91,21 @@ func NewChunk(
startState StateCommitment,
numberOfTransactions int,
eventCollection Identifier,
serviceEventIndices []uint32,
endState StateCommitment,
totalComputationUsed uint64,
) *Chunk {
if serviceEventIndices == nil {
serviceEventIndices = []uint32{}
}
return &Chunk{
ChunkBody: ChunkBody{
BlockID: blockID,
CollectionIndex: uint(collectionIndex),
StartState: startState,
NumberOfTransactions: uint64(numberOfTransactions),
EventCollection: eventCollection,
ServiceEventIndices: serviceEventIndices,
TotalComputationUsed: totalComputationUsed,
},
Index: uint64(collectionIndex),
Expand Down
140 changes: 140 additions & 0 deletions model/flow/chunk_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package flow_test

import (
"encoding/json"
"testing"

"github.com/fxamacker/cbor/v2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -117,6 +119,7 @@ func TestChunkIndexIsSet(t *testing.T) {
unittest.StateCommitmentFixture(),
21,
unittest.IdentifierFixture(),
[]uint32{},
unittest.StateCommitmentFixture(),
17995,
)
Expand All @@ -135,6 +138,7 @@ func TestChunkNumberOfTxsIsSet(t *testing.T) {
unittest.StateCommitmentFixture(),
int(i),
unittest.IdentifierFixture(),
[]uint32{},
unittest.StateCommitmentFixture(),
17995,
)
Expand All @@ -152,9 +156,145 @@ func TestChunkTotalComputationUsedIsSet(t *testing.T) {
unittest.StateCommitmentFixture(),
21,
unittest.IdentifierFixture(),
[]uint32{},
unittest.StateCommitmentFixture(),
i,
)

assert.Equal(t, i, chunk.TotalComputationUsed)
}

// TestChunkEncodeDecode test encoding and decoding properties.
// In particular, we want to demonstrate that nil-ness of the ServiceEventIndices field
// is preserved by the encoding schemes we use, because this difference is meaningful and
// important for backward compatibility (see [ChunkBody.ServiceEventIndices] for details).
func TestChunkEncodeDecode(t *testing.T) {
chunk := unittest.ChunkFixture(unittest.IdentifierFixture(), 0)

t.Run("encode/decode preserves nil ServiceEventIndices", func(t *testing.T) {
chunk.ServiceEventIndices = nil
t.Run("json", func(t *testing.T) {
bz, err := json.Marshal(chunk)
require.NoError(t, err)
unmarshaled := new(flow.Chunk)
err = json.Unmarshal(bz, unmarshaled)
require.NoError(t, err)
assert.Equal(t, chunk, unmarshaled)
assert.Nil(t, unmarshaled.ServiceEventIndices)
})
t.Run("cbor", func(t *testing.T) {
bz, err := cbor.Marshal(chunk)
require.NoError(t, err)
unmarshaled := new(flow.Chunk)
err = cbor.Unmarshal(bz, unmarshaled)
require.NoError(t, err)
assert.Equal(t, chunk, unmarshaled)
assert.Nil(t, unmarshaled.ServiceEventIndices)
})
})
t.Run("encode/decode preserves empty but non-nil ServiceEventIndices", func(t *testing.T) {
chunk.ServiceEventIndices = []uint32{}
t.Run("json", func(t *testing.T) {
bz, err := json.Marshal(chunk)
require.NoError(t, err)
unmarshaled := new(flow.Chunk)
err = json.Unmarshal(bz, unmarshaled)
require.NoError(t, err)
assert.Equal(t, chunk, unmarshaled)
assert.NotNil(t, unmarshaled.ServiceEventIndices)
})
t.Run("cbor", func(t *testing.T) {
bz, err := cbor.Marshal(chunk)
require.NoError(t, err)
unmarshaled := new(flow.Chunk)
err = cbor.Unmarshal(bz, unmarshaled)
require.NoError(t, err)
assert.Equal(t, chunk, unmarshaled)
assert.NotNil(t, unmarshaled.ServiceEventIndices)
})
})
}

// TestChunk_ModelVersions_EncodeDecode tests that encoding and decoding between
// supported versions works as expected.
func TestChunk_ModelVersions_EncodeDecode(t *testing.T) {
chunkFixture := unittest.ChunkFixture(unittest.IdentifierFixture(), 1)
chunkFixture.ServiceEventIndices = []uint32{1} // non-nil extra field

t.Run("writing v0 and reading v1 should yield nil for new field", func(t *testing.T) {
var chunkv0 flow.ChunkBodyV0
unittest.CopyStructure(t, chunkFixture.ChunkBody, &chunkv0)

t.Run("json", func(t *testing.T) {
bz, err := json.Marshal(chunkv0)
require.NoError(t, err)

var unmarshaled flow.ChunkBody
err = json.Unmarshal(bz, &unmarshaled)
require.NoError(t, err)
assert.Equal(t, chunkv0.EventCollection, unmarshaled.EventCollection)
assert.Equal(t, chunkv0.BlockID, unmarshaled.BlockID)
assert.Nil(t, unmarshaled.ServiceEventIndices)
})

t.Run("cbor", func(t *testing.T) {
bz, err := cbor.Marshal(chunkv0)
require.NoError(t, err)

var unmarshaled flow.ChunkBody
err = cbor.Unmarshal(bz, &unmarshaled)
require.NoError(t, err)
assert.Equal(t, chunkv0.EventCollection, unmarshaled.EventCollection)
assert.Equal(t, chunkv0.BlockID, unmarshaled.BlockID)
assert.Nil(t, unmarshaled.ServiceEventIndices)
})
})
t.Run("writing v1 and reading v0 does not error", func(t *testing.T) {
chunkv1 := chunkFixture.ChunkBody
chunkv1.ServiceEventIndices = []uint32{0} // ensure non-nil ServiceEventIndices field

t.Run("json", func(t *testing.T) {
bz, err := json.Marshal(chunkv1)
require.NoError(t, err)

var unmarshaled flow.ChunkBodyV0
err = json.Unmarshal(bz, &unmarshaled)
require.NoError(t, err)
assert.Equal(t, chunkv1.EventCollection, unmarshaled.EventCollection)
assert.Equal(t, chunkv1.BlockID, unmarshaled.BlockID)
})
t.Run("cbor", func(t *testing.T) {
bz, err := cbor.Marshal(chunkv1)
require.NoError(t, err)

var unmarshaled flow.ChunkBodyV0
err = cbor.Unmarshal(bz, &unmarshaled)
require.NoError(t, err)
assert.Equal(t, chunkv1.EventCollection, unmarshaled.EventCollection)
assert.Equal(t, chunkv1.BlockID, unmarshaled.BlockID)
})
})
}

// TestChunk_ModelVersions_IDConsistentAcrossVersions ensures that the ID function
// is backward compatible with old data model versions.
func TestChunk_ModelVersions_IDConsistentAcrossVersions(t *testing.T) {
chunk := unittest.ChunkFixture(unittest.IdentifierFixture(), 1)
chunkBody := chunk.ChunkBody
var chunkv0 flow.ChunkBodyV0
unittest.CopyStructure(t, chunkBody, &chunkv0)

// A nil ServiceEventIndices fields indicates a prior model version.
// The ID calculation for the old and new model version should be the same.
t.Run("nil ServiceEventIndices fields", func(t *testing.T) {
chunkBody.ServiceEventIndices = nil
assert.Equal(t, flow.MakeID(chunkv0), flow.MakeID(chunkBody))
})
// A non-nil ServiceEventIndices fields indicates an up-to-date model version.
// The ID calculation for the old and new model version should be different,
// because the new model should include the ServiceEventIndices field value.
t.Run("non-nil ServiceEventIndices fields", func(t *testing.T) {
chunkBody.ServiceEventIndices = []uint32{}
assert.NotEqual(t, flow.MakeID(chunkv0), flow.MakeID(chunkBody))
})
}
Loading
Loading