diff --git a/.github/workflows/benchmark-go.yml b/.github/workflows/benchmark-go.yml new file mode 100644 index 00000000..b07517c8 --- /dev/null +++ b/.github/workflows/benchmark-go.yml @@ -0,0 +1,36 @@ +name: Benchmark Go + +on: + push: + branches: + - main + +permissions: + # deployments permission to deploy GitHub pages website + deployments: write + # contents permission to update benchmark contents in gh-pages branch + contents: write + # allow posting comments to pull request + pull-requests: write + +jobs: + benchmark: + name: Performance regression check + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-go@v4 + with: + go-version-file: go.mod + - name: Run benchmark + run: set -o pipefail; go test ./... -benchmem -run=^$ -bench . | tee output.txt + - name: Store benchmark result + uses: benchmark-action/github-action-benchmark@v1.20.4 + with: + tool: 'go' + output-file-path: output.txt + github-token: ${{ secrets.GITHUB_TOKEN }} + auto-push: true + alert-threshold: "150%" + comment-on-alert: true + fail-on-alert: true diff --git a/README.md b/README.md index 346d739f..0e68d746 100644 --- a/README.md +++ b/README.md @@ -127,13 +127,18 @@ When writing your Tessera personality, the biggest decision you need to make fir * [POSIX](./storage/posix/) Each of these implementations has a very similar API, but they have different characteristics. + The easiest implementations to operate and to scale are the cloud implementations: GCP and AWS. -These are the recommended choice for the majority of users. +These are the recommended choice for the majority of users running in production. + +If you aren't using a cloud provider, then your options are MySQL and POSIX: +- POSIX is the simplest to get started with as it needs little in the way of extra infrastructure, and + if you already serve static files as part of your business/project this could be a good fit. +- Alternatively, if you are used to operating user-facing applications backed by a RDBMS, then MySQL could + be a natural fit. -If you aren't using a cloud provider, then your options are MySQL and POSIX. -POSIX is the more niche choice, intended to be lightweight and for logs that are infrequently updated. -If you already serve static files as part of your business this could be a good fit. -If you are more used to operating user-facing applications backed by a RDBMS, then MySQL will be a natural fit. +To get a sense of the rough performance you can expect from the different backends, take a look at +[docs/performance.md](/docs/performance.md). #### Setup diff --git a/api/layout/example_test.go b/api/layout/example_test.go index 87a90d49..db824a6e 100644 --- a/api/layout/example_test.go +++ b/api/layout/example_test.go @@ -28,19 +28,19 @@ func ExampleNodeCoordsToTileAddress() { } func ExampleTilePath() { - tilePath := layout.TilePath(0, 1234067, 315921160) + tilePath := layout.TilePath(0, 1234067, 8) fmt.Printf("tile path: %s", tilePath) // Output: tile path: tile/0/x001/x234/067.p/8 } func ExampleEntriesPath() { - entriesPath := layout.EntriesPath(1234067, 315921160) + entriesPath := layout.EntriesPath(1234067, 8) fmt.Printf("entries path: %s", entriesPath) // Output: entries path: tile/entries/x001/x234/067.p/8 } -func ExampleParseTileLevelIndexWidth() { - level, index, width, _ := layout.ParseTileLevelIndexWidth("0", "x001/x234/067.p/8") +func ExampleParseTileLevelIndexPartial() { + level, index, width, _ := layout.ParseTileLevelIndexPartial("0", "x001/x234/067.p/8") fmt.Printf("level: %d, index: %d, width: %d", level, index, width) // Output: level: 0, index: 1234067, width: 8 } diff --git a/api/layout/paths.go b/api/layout/paths.go index e8b4e5dc..07d2aa2b 100644 --- a/api/layout/paths.go +++ b/api/layout/paths.go @@ -37,13 +37,13 @@ const ( // The logSize is required so that a partial qualifier can be appended to tiles that // would contain fewer than 256 entries. func EntriesPathForLogIndex(seq, logSize uint64) string { - return EntriesPath(seq/256, logSize) + return EntriesPath(seq/EntryBundleWidth, PartialTileSize(0, seq, logSize)) } -// NWithSuffix returns a tiles-spec "N" path, with a partial suffix if applicable. -func NWithSuffix(l, n, logSize uint64) string { +// NWithSuffix returns a tiles-spec "N" path, with a partial suffix if p > 0. +func NWithSuffix(l, n uint64, p uint8) string { suffix := "" - if p := partialTileSize(l, n, logSize); p > 0 { + if p > 0 { suffix = fmt.Sprintf(".p/%d", p) } return fmt.Sprintf("%s%s", fmtN(n), suffix) @@ -51,13 +51,14 @@ func NWithSuffix(l, n, logSize uint64) string { // EntriesPath returns the local path for the nth entry bundle. p denotes the partial // tile size, or 0 if the tile is complete. -func EntriesPath(n, logSize uint64) string { - return fmt.Sprintf("tile/entries/%s", NWithSuffix(0, n, logSize)) +func EntriesPath(n uint64, p uint8) string { + return fmt.Sprintf("tile/entries/%s", NWithSuffix(0, n, p)) } // TilePath builds the path to the subtree tile with the given level and index in tile space. -func TilePath(tileLevel, tileIndex, logSize uint64) string { - return fmt.Sprintf("tile/%d/%s", tileLevel, NWithSuffix(tileLevel, tileIndex, logSize)) +// If p > 0 the path represents a partial tile. +func TilePath(tileLevel, tileIndex uint64, p uint8) string { + return fmt.Sprintf("tile/%d/%s", tileLevel, NWithSuffix(tileLevel, tileIndex, p)) } // fmtN returns the "N" part of a Tiles-spec path. @@ -83,13 +84,13 @@ func fmtN(N uint64) string { // Examples: // "/tile/0/x001/x234/067" means level 0 and index 1234067 of a full tile. // "/tile/0/x001/x234/067.p/8" means level 0, index 1234067 and width 8 of a partial tile. -func ParseTileLevelIndexWidth(level, index string) (uint64, uint64, uint64, error) { +func ParseTileLevelIndexPartial(level, index string) (uint64, uint64, uint8, error) { l, err := ParseTileLevel(level) if err != nil { return 0, 0, 0, err } - i, w, err := ParseTileIndexWidth(index) + i, w, err := ParseTileIndexPartial(index) if err != nil { return 0, 0, 0, err } @@ -107,17 +108,18 @@ func ParseTileLevel(level string) (uint64, error) { return l, err } -// ParseTileIndexWidth takes index in string, validates and returns the index and width in uint64. -func ParseTileIndexWidth(index string) (uint64, uint64, error) { - w := uint64(256) +// ParseTileIndexPartial takes index in string, validates and returns the index and width in uint64. +func ParseTileIndexPartial(index string) (uint64, uint8, error) { + w := uint8(0) indexPaths := strings.Split(index, "/") if strings.Contains(index, ".p") { var err error - w, err = strconv.ParseUint(indexPaths[len(indexPaths)-1], 10, 64) - if err != nil || w < 1 || w > 255 { - return 0, 0, fmt.Errorf("failed to parse tile index") + w64, err := strconv.ParseUint(indexPaths[len(indexPaths)-1], 10, 64) + if err != nil || w64 < 1 || w64 >= TileWidth { + return 0, 0, fmt.Errorf("failed to parse tile width") } + w = uint8(w64) indexPaths[len(indexPaths)-2] = strings.TrimSuffix(indexPaths[len(indexPaths)-2], ".p") indexPaths = indexPaths[:len(indexPaths)-1] } diff --git a/api/layout/paths_test.go b/api/layout/paths_test.go index 2ccb0b3f..093eab69 100644 --- a/api/layout/paths_test.go +++ b/api/layout/paths_test.go @@ -16,7 +16,6 @@ package layout import ( "fmt" - "math" "testing" ) @@ -65,39 +64,30 @@ func TestEntriesPathForLogIndex(t *testing.T) { func TestEntriesPath(t *testing.T) { for _, test := range []struct { N uint64 - logSize uint64 + p uint8 wantPath string + wantErr bool }{ { N: 0, - logSize: 289, wantPath: "tile/entries/000", }, { N: 0, - logSize: 8, + p: 8, wantPath: "tile/entries/000.p/8", }, { N: 255, - logSize: 256 * 256, wantPath: "tile/entries/255", }, { N: 255, - logSize: 255*256 - 3, + p: 253, wantPath: "tile/entries/255.p/253", - }, { - N: 256, - logSize: 257 * 256, - wantPath: "tile/entries/256", - }, { - N: 123456789000, - logSize: math.MaxUint64, - wantPath: "tile/entries/x123/x456/x789/000", }, } { desc := fmt.Sprintf("N %d", test.N) t.Run(desc, func(t *testing.T) { - gotPath := EntriesPath(test.N, test.logSize) + gotPath := EntriesPath(test.N, test.p) if gotPath != test.wantPath { t.Errorf("got file %q want %q", gotPath, test.wantPath) } @@ -109,59 +99,37 @@ func TestTilePath(t *testing.T) { for _, test := range []struct { level uint64 index uint64 - logSize uint64 + p uint8 wantPath string }{ { level: 0, index: 0, - logSize: 256, - wantPath: "tile/0/000", - }, { - level: 0, - index: 0, - logSize: 0, wantPath: "tile/0/000", }, { level: 0, index: 0, - logSize: 255, + p: 255, wantPath: "tile/0/000.p/255", }, { level: 1, index: 0, - logSize: math.MaxUint64, wantPath: "tile/1/000", - }, { - level: 1, - index: 0, - logSize: 256, - wantPath: "tile/1/000.p/1", - }, { - level: 1, - index: 0, - logSize: 1024, - wantPath: "tile/1/000.p/4", }, { level: 15, index: 455667, - logSize: math.MaxUint64, + p: 0, wantPath: "tile/15/x455/667", - }, { - level: 3, - index: 1234567, - logSize: math.MaxUint64, - wantPath: "tile/3/x001/x234/567", }, { level: 15, index: 123456789, - logSize: math.MaxUint64, - wantPath: "tile/15/x123/x456/789", + p: 41, + wantPath: "tile/15/x123/x456/789.p/41", }, } { desc := fmt.Sprintf("level %x index %x", test.level, test.index) t.Run(desc, func(t *testing.T) { - gotPath := TilePath(test.level, test.index, test.logSize) + gotPath := TilePath(test.level, test.index, test.p) if gotPath != test.wantPath { t.Errorf("Got path %q want %q", gotPath, test.wantPath) } @@ -173,59 +141,32 @@ func TestNWithSuffix(t *testing.T) { for _, test := range []struct { level uint64 index uint64 - logSize uint64 + p uint8 wantPath string }{ { level: 0, index: 0, - logSize: 256, - wantPath: "000", - }, { - level: 0, - index: 0, - logSize: 0, wantPath: "000", }, { level: 0, index: 0, - logSize: 255, + p: 255, wantPath: "000.p/255", - }, { - level: 1, - index: 0, - logSize: math.MaxUint64, - wantPath: "000", - }, { - level: 1, - index: 0, - logSize: 256, - wantPath: "000.p/1", - }, { - level: 1, - index: 0, - logSize: 1024, - wantPath: "000.p/4", }, { level: 15, index: 455667, - logSize: math.MaxUint64, wantPath: "x455/667", - }, { - level: 3, - index: 1234567, - logSize: math.MaxUint64, - wantPath: "x001/x234/567", }, { level: 15, index: 123456789, - logSize: math.MaxUint64, - wantPath: "x123/x456/789", + p: 65, + wantPath: "x123/x456/789.p/65", }, } { desc := fmt.Sprintf("level %x index %x", test.level, test.index) t.Run(desc, func(t *testing.T) { - gotPath := NWithSuffix(test.level, test.index, test.logSize) + gotPath := NWithSuffix(test.level, test.index, test.p) if gotPath != test.wantPath { t.Errorf("Got path %q want %q", gotPath, test.wantPath) } @@ -233,13 +174,13 @@ func TestNWithSuffix(t *testing.T) { } } -func TestParseTileLevelIndexWidth(t *testing.T) { +func TestParseTileLevelIndexPartial(t *testing.T) { for _, test := range []struct { pathLevel string pathIndex string wantLevel uint64 wantIndex uint64 - wantWidth uint64 + wantP uint8 wantErr bool }{ { @@ -247,28 +188,28 @@ func TestParseTileLevelIndexWidth(t *testing.T) { pathIndex: "x001/x234/067", wantLevel: 0, wantIndex: 1234067, - wantWidth: 256, + wantP: 0, }, { pathLevel: "0", pathIndex: "x001/x234/067.p/89", wantLevel: 0, wantIndex: 1234067, - wantWidth: 89, + wantP: 89, }, { pathLevel: "63", pathIndex: "x999/x999/x999/x999/x999/999.p/255", wantLevel: 63, wantIndex: 999999999999999999, - wantWidth: 255, + wantP: 255, }, { pathLevel: "0", pathIndex: "001", wantLevel: 0, wantIndex: 1, - wantWidth: 256, + wantP: 0, }, { pathLevel: "0", @@ -348,15 +289,15 @@ func TestParseTileLevelIndexWidth(t *testing.T) { } { desc := fmt.Sprintf("pathLevel: %q, pathIndex: %q", test.pathLevel, test.pathIndex) t.Run(desc, func(t *testing.T) { - gotLevel, gotIndex, gotWidth, err := ParseTileLevelIndexWidth(test.pathLevel, test.pathIndex) + gotLevel, gotIndex, gotWidth, err := ParseTileLevelIndexPartial(test.pathLevel, test.pathIndex) if gotLevel != test.wantLevel { t.Errorf("got level %d want %d", gotLevel, test.wantLevel) } if gotIndex != test.wantIndex { t.Errorf("got index %d want %d", gotIndex, test.wantIndex) } - if gotWidth != test.wantWidth { - t.Errorf("got width %d want %d", gotWidth, test.wantWidth) + if gotWidth != test.wantP { + t.Errorf("got width %d want %d", gotWidth, test.wantP) } gotErr := err != nil if gotErr != test.wantErr { diff --git a/api/layout/tile.go b/api/layout/tile.go index c6b31a97..ded1c6da 100644 --- a/api/layout/tile.go +++ b/api/layout/tile.go @@ -14,24 +14,35 @@ package layout -// partialTileSize returns the expected number of leaves in a tile at the given location within +const ( + // TileHeight is the maximum number of levels Merkle tree levels a tile represents. + // This is fixed at 8 by tlog-tile spec. + TileHeight = 8 + // TileWidth is the maximum number of hashes which can be present in the bottom row of a tile. + TileWidth = 1 << TileHeight + // EntryBundleWidth is the maximum number of entries which can be present in an EntryBundle. + // This is defined to be the same as the width of the node tiles by tlog-tile spec. + EntryBundleWidth = TileWidth +) + +// PartialTileSize returns the expected number of leaves in a tile at the given location within // a tree of the specified logSize, or 0 if the tile is expected to be fully populated. -func partialTileSize(level, index, logSize uint64) uint64 { - sizeAtLevel := logSize >> (level * 8) - fullTiles := sizeAtLevel / 256 +func PartialTileSize(level, index, logSize uint64) uint8 { + sizeAtLevel := logSize >> (level * TileHeight) + fullTiles := sizeAtLevel / TileWidth if index < fullTiles { return 0 } - return sizeAtLevel % 256 + return uint8(sizeAtLevel % TileWidth) } // NodeCoordsToTileAddress returns the (TileLevel, TileIndex) in tile-space, and the // (NodeLevel, NodeIndex) address within that tile of the specified tree node co-ordinates. func NodeCoordsToTileAddress(treeLevel, treeIndex uint64) (uint64, uint64, uint, uint64) { - tileRowWidth := uint64(1 << (8 - treeLevel%8)) - tileLevel := treeLevel / 8 + tileRowWidth := uint64(1 << (TileHeight - treeLevel%TileHeight)) + tileLevel := treeLevel / TileHeight tileIndex := treeIndex / tileRowWidth - nodeLevel := uint(treeLevel % 8) + nodeLevel := uint(treeLevel % TileHeight) nodeIndex := uint64(treeIndex % tileRowWidth) return tileLevel, tileIndex, nodeLevel, nodeIndex diff --git a/api/state.go b/api/state.go index 3c67af40..d86e67cc 100644 --- a/api/state.go +++ b/api/state.go @@ -22,6 +22,8 @@ import ( "crypto/sha256" "encoding/binary" "fmt" + + "github.com/transparency-dev/trillian-tessera/api/layout" ) // HashTile represents a tile within the Merkle hash tree. @@ -71,7 +73,7 @@ type EntryBundle struct { // UnmarshalText implements encoding/TextUnmarshaler and reads EntryBundles // which are encoded using the tlog-tiles spec. func (t *EntryBundle) UnmarshalText(raw []byte) error { - nodes := make([][]byte, 0, 256) + nodes := make([][]byte, 0, layout.EntryBundleWidth) for index := 0; index < len(raw); { dataIndex := index + 2 if dataIndex > len(raw) { diff --git a/api/state_test.go b/api/state_test.go index 7c3f5c47..e207da64 100644 --- a/api/state_test.go +++ b/api/state_test.go @@ -18,7 +18,9 @@ package api_test import ( "bytes" "crypto/rand" + "crypto/sha256" "fmt" + "strings" "testing" "github.com/google/go-cmp/cmp" @@ -44,7 +46,7 @@ func TestHashTile_MarshalTileRoundtrip(t *testing.T) { tile := api.HashTile{Nodes: make([][]byte, 0, test.size)} for i := 0; i < test.size; i++ { // Fill in the leaf index - tile.Nodes = append(tile.Nodes, make([]byte, 32)) + tile.Nodes = append(tile.Nodes, make([]byte, sha256.Size)) if _, err := rand.Read(tile.Nodes[i]); err != nil { t.Error(err) } @@ -143,3 +145,19 @@ func TestLeafBundle_UnmarshalText(t *testing.T) { }) } } + +func BenchmarkLeafBundle_UnmarshalText(b *testing.B) { + bs := bytes.Buffer{} + for i := range 222 { + // Create leaves of different lengths for interest in the parsing / memory allocation + leafStr := strings.Repeat(fmt.Sprintf("Leaf %d", i), i%20) + _, _ = bs.Write(tessera.NewEntry([]byte(leafStr)).MarshalBundleData(uint64(i))) + } + rawBundle := bs.Bytes() + for i := 0; i < b.N; i++ { + tile := api.EntryBundle{} + if err := tile.UnmarshalText(rawBundle); err != nil { + b.Fatal(err) + } + } +} diff --git a/await.go b/await.go index 8f0b6c55..59f08c18 100644 --- a/await.go +++ b/await.go @@ -16,7 +16,9 @@ package tessera import ( "context" + "errors" "fmt" + "os" "sync" "time" @@ -111,7 +113,7 @@ func (a *IntegrationAwaiter) pollLoop(ctx context.Context, readCheckpoint func(c // Note that for now, this releases all clients in the event of a single failure. // If this causes problems, this could be changed to attempt retries. rawCp, err := readCheckpoint(ctx) - if err != nil { + if err != nil && !errors.Is(err, os.ErrNotExist) { a.releaseClientsErr(fmt.Errorf("readCheckpoint: %v", err)) continue } diff --git a/client/client.go b/client/client.go index 564b280e..5bb5d92c 100644 --- a/client/client.go +++ b/client/client.go @@ -53,7 +53,7 @@ type CheckpointFetcherFunc func(ctx context.Context) ([]byte, error) // Note that the implementation of this MUST return (either directly or wrapped) // an os.ErrIsNotExist when the file referenced by path does not exist, e.g. a HTTP // based implementation MUST return this error when it receives a 404 StatusCode. -type TileFetcherFunc func(ctx context.Context, level, index, logSize uint64) ([]byte, error) +type TileFetcherFunc func(ctx context.Context, level, index uint64, p uint8) ([]byte, error) // EntryBundleFetcherFunc is the signature of a function which can fetch the raw data // for a given entry bundle. @@ -61,7 +61,7 @@ type TileFetcherFunc func(ctx context.Context, level, index, logSize uint64) ([] // Note that the implementation of this MUST return (either directly or wrapped) // an os.ErrIsNotExist when the file referenced by path does not exist, e.g. a HTTP // based implementation MUST return this error when it receives a 404 StatusCode. -type EntryBundleFetcherFunc func(ctx context.Context, bundleIndex, logSize uint64) ([]byte, error) +type EntryBundleFetcherFunc func(ctx context.Context, bundleIndex uint64, p uint8) ([]byte, error) // ConsensusCheckpointFunc is a function which returns the largest checkpoint known which is // signed by logSigV and satisfies some consensus algorithm. @@ -255,7 +255,7 @@ func (n *nodeCache) GetNode(ctx context.Context, id compact.NodeID) ([]byte, err tKey := tileKey{tileLevel, tileIndex} t, ok := n.tiles[tKey] if !ok { - tileRaw, err := n.getTile(ctx, tileLevel, tileIndex, n.logSize) + tileRaw, err := n.getTile(ctx, tileLevel, tileIndex, layout.PartialTileSize(tileLevel, tileIndex, n.logSize)) if err != nil { return nil, fmt.Errorf("failed to fetch tile: %w", err) } @@ -286,7 +286,7 @@ func (n *nodeCache) GetNode(ctx context.Context, id compact.NodeID) ([]byte, err // GetEntryBundle fetches the entry bundle at the given _tile index_. func GetEntryBundle(ctx context.Context, f EntryBundleFetcherFunc, i, logSize uint64) (api.EntryBundle, error) { bundle := api.EntryBundle{} - sRaw, err := f(ctx, i, logSize) + sRaw, err := f(ctx, i, layout.PartialTileSize(0, i, logSize)) if err != nil { if errors.Is(err, os.ErrNotExist) { return bundle, fmt.Errorf("leaf bundle at index %d not found: %v", i, err) diff --git a/client/client_test.go b/client/client_test.go index 9807af02..10e34b1b 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -76,8 +76,8 @@ func testLogFetcher(_ context.Context, p string) ([]byte, error) { return os.ReadFile(path) } -func testLogTileFetcher(ctx context.Context, l, i, s uint64) ([]byte, error) { - return testLogFetcher(ctx, layout.TilePath(l, i, s)) +func testLogTileFetcher(ctx context.Context, l, i uint64, p uint8) ([]byte, error) { + return testLogFetcher(ctx, layout.TilePath(l, i, p)) } // fetchCheckpointShim allows fetcher requests for checkpoints to be intercepted. @@ -309,7 +309,7 @@ func TestCheckConsistency(t *testing.T) { func TestNodeCacheHandlesInvalidRequest(t *testing.T) { ctx := context.Background() wantBytes := []byte("0123456789ABCDEF0123456789ABCDEF") - f := func(_ context.Context, _, _, _ uint64) ([]byte, error) { + f := func(_ context.Context, _, _ uint64, _ uint8) ([]byte, error) { h := &api.HashTile{ Nodes: [][]byte{wantBytes}, } @@ -343,3 +343,44 @@ func TestHandleZeroRoot(t *testing.T) { t.Fatalf("NewProofBuilder: %v", err) } } + +func TestGetEntryBundleAddressing(t *testing.T) { + for _, test := range []struct { + name string + idx, logSize uint64 + wantPartialTileSize uint8 + }{ + { + name: "works - partial tile", + idx: 0, + logSize: 34, + wantPartialTileSize: 34, + }, + { + name: "works - full tile", + idx: 1, + logSize: 256*2 + 45, + wantPartialTileSize: 0, + }, + } { + t.Run(test.name, func(t *testing.T) { + gotIdx := uint64(0) + gotTileSize := uint8(0) + f := func(_ context.Context, i uint64, sz uint8) ([]byte, error) { + gotIdx = i + gotTileSize = sz + return []byte{}, nil + } + _, err := GetEntryBundle(context.Background(), f, test.idx, test.logSize) + if err != nil { + t.Fatalf("GetEntryBundle: %v", err) + } + if gotIdx != test.idx { + t.Errorf("f got idx %d, want %d", gotIdx, test.idx) + } + if gotTileSize != test.wantPartialTileSize { + t.Errorf("f got tileSize %d, want %d", gotTileSize, test.wantPartialTileSize) + } + }) + } +} diff --git a/client/fetcher.go b/client/fetcher.go index bfa0e9eb..681e0b82 100644 --- a/client/fetcher.go +++ b/client/fetcher.go @@ -98,12 +98,12 @@ func (h HTTPFetcher) ReadCheckpoint(ctx context.Context) ([]byte, error) { return h.fetch(ctx, layout.CheckpointPath) } -func (h HTTPFetcher) ReadTile(ctx context.Context, l, i, sz uint64) ([]byte, error) { - return h.fetch(ctx, layout.TilePath(l, i, sz)) +func (h HTTPFetcher) ReadTile(ctx context.Context, l, i uint64, p uint8) ([]byte, error) { + return h.fetch(ctx, layout.TilePath(l, i, p)) } -func (h HTTPFetcher) ReadEntryBundle(ctx context.Context, i, sz uint64) ([]byte, error) { - return h.fetch(ctx, layout.EntriesPath(i, sz)) +func (h HTTPFetcher) ReadEntryBundle(ctx context.Context, i uint64, p uint8) ([]byte, error) { + return h.fetch(ctx, layout.EntriesPath(i, p)) } // FileFetcher knows how to fetch log artifacts from a filesystem rooted at Root. @@ -115,10 +115,10 @@ func (f FileFetcher) ReadCheckpoint(_ context.Context) ([]byte, error) { return os.ReadFile(path.Join(f.Root, layout.CheckpointPath)) } -func (f FileFetcher) ReadTile(_ context.Context, l, i, sz uint64) ([]byte, error) { - return os.ReadFile(path.Join(f.Root, layout.TilePath(l, i, sz))) +func (f FileFetcher) ReadTile(_ context.Context, l, i uint64, p uint8) ([]byte, error) { + return os.ReadFile(path.Join(f.Root, layout.TilePath(l, i, p))) } -func (f FileFetcher) ReadEntryBundle(_ context.Context, i, sz uint64) ([]byte, error) { - return os.ReadFile(path.Join(f.Root, layout.EntriesPath(i, sz))) +func (f FileFetcher) ReadEntryBundle(_ context.Context, i uint64, p uint8) ([]byte, error) { + return os.ReadFile(path.Join(f.Root, layout.EntriesPath(i, p))) } diff --git a/cmd/conformance/gcp/main.go b/cmd/conformance/gcp/main.go index 9bd9d9be..e34b08ed 100644 --- a/cmd/conformance/gcp/main.go +++ b/cmd/conformance/gcp/main.go @@ -37,6 +37,7 @@ var ( listen = flag.String("listen", ":2024", "Address:port to listen on") spanner = flag.String("spanner", "", "Spanner resource URI ('projects/.../...')") signer = flag.String("signer", "", "Note signer to use to sign checkpoints") + persistentDedup = flag.Bool("gcp_dedup", false, "EXPERIMENTAL: Set to true to enable persistent dedupe storage") additionalSigners = []string{} ) @@ -65,7 +66,18 @@ func main() { if err != nil { klog.Exitf("Failed to create new GCP storage: %v", err) } - dedupeAdd := tessera.InMemoryDedupe(storage.Add, 256) + + // Handle dedup configuration + addDelegate := storage.Add + + // PersistentDedup is currently experimental, so there's no terraform or documentation yet! + if *persistentDedup { + addDelegate, err = gcp.NewDedupe(ctx, fmt.Sprintf("%s_dedup", *spanner), addDelegate) + if err != nil { + klog.Exitf("Failed to create new GCP dedupe: %v", err) + } + } + dedupeAdd := tessera.InMemoryDedupe(addDelegate, 256) // Expose a HTTP handler for the conformance test writes. // This should accept arbitrary bytes POSTed to /add, and return an ascii diff --git a/cmd/conformance/mysql/main.go b/cmd/conformance/mysql/main.go index 7751597b..16eb7959 100644 --- a/cmd/conformance/mysql/main.go +++ b/cmd/conformance/mysql/main.go @@ -18,9 +18,11 @@ package main import ( "context" "database/sql" + "errors" "flag" "fmt" "io" + "io/fs" "net/http" "os" "time" @@ -141,15 +143,19 @@ func configureTilesReadAPI(mux *http.ServeMux, storage *mysql.Storage) { mux.HandleFunc("GET /checkpoint", func(w http.ResponseWriter, r *http.Request) { checkpoint, err := storage.ReadCheckpoint(r.Context()) if err != nil { + if errors.Is(err, fs.ErrNotExist) { + w.WriteHeader(http.StatusNotFound) + return + } klog.Errorf("/checkpoint: %v", err) w.WriteHeader(http.StatusInternalServerError) return } - if checkpoint == nil { - w.WriteHeader(http.StatusNotFound) - return - } + // Don't cache checkpoints as the endpoint refreshes regularly. + // A personality that wanted to _could_ set a small cache time here which was no higher + // than the checkpoint publish interval. + w.Header().Set("Cache-Control", "no-cache") if _, err := w.Write(checkpoint); err != nil { klog.Errorf("/checkpoint: %v", err) return @@ -157,7 +163,7 @@ func configureTilesReadAPI(mux *http.ServeMux, storage *mysql.Storage) { }) mux.HandleFunc("GET /tile/{level}/{index...}", func(w http.ResponseWriter, r *http.Request) { - level, index, width, err := layout.ParseTileLevelIndexWidth(r.PathValue("level"), r.PathValue("index")) + level, index, p, err := layout.ParseTileLevelIndexPartial(r.PathValue("level"), r.PathValue("index")) if err != nil { w.WriteHeader(http.StatusBadRequest) if _, werr := w.Write([]byte(fmt.Sprintf("Malformed URL: %s", err.Error()))); werr != nil { @@ -165,17 +171,16 @@ func configureTilesReadAPI(mux *http.ServeMux, storage *mysql.Storage) { } return } - impliedSize := (index*256 + width) << (level * 8) - tile, err := storage.ReadTile(r.Context(), level, index, impliedSize) + tile, err := storage.ReadTile(r.Context(), level, index, p) if err != nil { + if errors.Is(err, fs.ErrNotExist) { + w.WriteHeader(http.StatusNotFound) + return + } klog.Errorf("/tile/{level}/{index...}: %v", err) w.WriteHeader(http.StatusInternalServerError) return } - if tile == nil { - w.WriteHeader(http.StatusNotFound) - return - } w.Header().Set("Cache-Control", "public, max-age=31536000, immutable") @@ -186,7 +191,7 @@ func configureTilesReadAPI(mux *http.ServeMux, storage *mysql.Storage) { }) mux.HandleFunc("GET /tile/entries/{index...}", func(w http.ResponseWriter, r *http.Request) { - index, width, err := layout.ParseTileIndexWidth(r.PathValue("index")) + index, p, err := layout.ParseTileIndexPartial(r.PathValue("index")) if err != nil { w.WriteHeader(http.StatusBadRequest) if _, werr := w.Write([]byte(fmt.Sprintf("Malformed URL: %s", err.Error()))); werr != nil { @@ -195,7 +200,7 @@ func configureTilesReadAPI(mux *http.ServeMux, storage *mysql.Storage) { return } - entryBundle, err := storage.ReadEntryBundle(r.Context(), index, width) + entryBundle, err := storage.ReadEntryBundle(r.Context(), index, p) if err != nil { klog.Errorf("/tile/entries/{index...}: %v", err) w.WriteHeader(http.StatusInternalServerError) @@ -206,7 +211,7 @@ func configureTilesReadAPI(mux *http.ServeMux, storage *mysql.Storage) { return } - // TODO: Add immutable Cache-Control header. + w.Header().Set("Cache-Control", "public, max-age=31536000, immutable") if _, err := w.Write(entryBundle); err != nil { klog.Errorf("/tile/entries/{index...}: %v", err) diff --git a/cmd/examples/posix-oneshot/main.go b/cmd/examples/posix-oneshot/main.go index c794fe0c..ad30279d 100644 --- a/cmd/examples/posix-oneshot/main.go +++ b/cmd/examples/posix-oneshot/main.go @@ -46,7 +46,7 @@ const ( // Since this is a short-lived command-line tool, we set this to a relatively low value so that // the tool can publish the new checkpoint and exit relatively quickly after integrating the entries // into the tree. - checkpointInterval = 500 * time.Millisecond + checkpointInterval = time.Second ) // entryInfo binds the actual bytes to be added as a leaf with a diff --git a/ct_only.go b/ct_only.go index 34ede327..e39a21a6 100644 --- a/ct_only.go +++ b/ct_only.go @@ -69,6 +69,6 @@ func WithCTLayout() func(*options.StorageOptions) { } } -func ctEntriesPath(n, logSize uint64) string { - return fmt.Sprintf("tile/data/%s", layout.NWithSuffix(0, n, logSize)) +func ctEntriesPath(n uint64, p uint8) string { + return fmt.Sprintf("tile/data/%s", layout.NWithSuffix(0, n, p)) } diff --git a/ct_only_test.go b/ct_only_test.go index 7048a466..d5817da3 100644 --- a/ct_only_test.go +++ b/ct_only_test.go @@ -16,46 +16,41 @@ package tessera import ( "fmt" - "math" "testing" ) func TestCTEntriesPath(t *testing.T) { for _, test := range []struct { N uint64 - logSize uint64 + p uint8 wantPath string }{ { N: 0, - logSize: 289, wantPath: "tile/data/000", }, { N: 0, - logSize: 8, + p: 8, wantPath: "tile/data/000.p/8", }, { N: 255, - logSize: 256 * 256, wantPath: "tile/data/255", }, { N: 255, - logSize: 255*256 - 3, + p: 253, wantPath: "tile/data/255.p/253", }, { N: 256, - logSize: 257 * 256, wantPath: "tile/data/256", }, { N: 123456789000, - logSize: math.MaxUint64, wantPath: "tile/data/x123/x456/x789/000", }, } { desc := fmt.Sprintf("N %d", test.N) t.Run(desc, func(t *testing.T) { - gotPath := ctEntriesPath(test.N, test.logSize) + gotPath := ctEntriesPath(test.N, test.p) if gotPath != test.wantPath { t.Errorf("got file %q want %q", gotPath, test.wantPath) } diff --git a/dedupe.go b/dedupe.go index 23b9b2fa..20a31917 100644 --- a/dedupe.go +++ b/dedupe.go @@ -16,9 +16,10 @@ package tessera import ( "context" + "fmt" "sync" - "github.com/hashicorp/golang-lru/v2/expirable" + lru "github.com/hashicorp/golang-lru/v2" ) // InMemoryDedupe wraps an Add function to prevent duplicate entries being written to the underlying @@ -35,30 +36,38 @@ import ( // InMemoryDedupe. This allows recent duplicates to be deduplicated in memory, reducing the need to // make calls to a persistent storage. func InMemoryDedupe(delegate func(ctx context.Context, e *Entry) IndexFuture, size uint) func(context.Context, *Entry) IndexFuture { + c, err := lru.New[string, func() IndexFuture](int(size)) + if err != nil { + panic(fmt.Errorf("lru.New(%d): %v", size, err)) + } dedupe := &inMemoryDedupe{ delegate: delegate, - cache: expirable.NewLRU[string, IndexFuture](int(size), nil, 0), + cache: c, } return dedupe.add } type inMemoryDedupe struct { delegate func(ctx context.Context, e *Entry) IndexFuture - mu sync.Mutex // cache is thread safe, but this mutex allows us to do conditional writes - cache *expirable.LRU[string, IndexFuture] + cache *lru.Cache[string, func() IndexFuture] } // Add adds the entry to the underlying delegate only if e hasn't been recently seen. In either case, // an IndexFuture will be returned that the client can use to get the sequence number of this entry. func (d *inMemoryDedupe) add(ctx context.Context, e *Entry) IndexFuture { id := string(e.Identity()) - d.mu.Lock() - defer d.mu.Unlock() - f, ok := d.cache.Get(id) - if !ok { - f = d.delegate(ctx, e) - d.cache.Add(id, f) + // However many calls with the same entry come in and are deduped, we should only call delegate + // once for each unique entry: + f := sync.OnceValue(func() IndexFuture { + return d.delegate(ctx, e) + }) + + // if we've seen this entry before, discard our f and replace + // with the one we created last time, otherwise store f against id. + if prev, ok, _ := d.cache.PeekOrAdd(id, f); ok { + f = prev } - return f + + return f() } diff --git a/dedupe_test.go b/dedupe_test.go index bdf12d5a..c29b0851 100644 --- a/dedupe_test.go +++ b/dedupe_test.go @@ -16,6 +16,7 @@ package tessera_test import ( "context" + "crypto/sha256" "fmt" "sync" "testing" @@ -59,13 +60,15 @@ func TestDedupe(t *testing.T) { dedupeAdd := tessera.InMemoryDedupe(delegate, 256) // Add foo, bar, baz to prime the cache to make things interesting - dedupeAdd(ctx, tessera.NewEntry([]byte("foo"))) - dedupeAdd(ctx, tessera.NewEntry([]byte("bar"))) - dedupeAdd(ctx, tessera.NewEntry([]byte("baz"))) + for _, s := range []string{"foo", "bar", "baz"} { + if _, err := dedupeAdd(ctx, tessera.NewEntry([]byte(s)))(); err != nil { + t.Fatalf("dedupeAdd(%q): %v", s, err) + } + } idx, err := dedupeAdd(ctx, tessera.NewEntry([]byte(tC.newValue)))() if err != nil { - t.Fatal(err) + t.Fatalf("dedupeAdd(%q): %v", tC.newValue, err) } if idx != tC.wantIdx { t.Errorf("got != want (%d != %d)", idx, tC.wantIdx) @@ -92,7 +95,7 @@ func BenchmarkDedupe(b *testing.B) { for leafIndex := range 1024 { wg.Add(1) go func(index int) { - _, err := dedupeAdd(ctx, tessera.NewEntry([]byte(fmt.Sprintf("leaf with value %d", index%32))))() + _, err := dedupeAdd(ctx, tessera.NewEntry([]byte(fmt.Sprintf("leaf with value %d", index%sha256.Size))))() if err != nil { b.Error(err) } diff --git a/deployment/live/gcp/conformance/README.md b/deployment/live/gcp/conformance/README.md index afe09aa3..f836e54e 100644 --- a/deployment/live/gcp/conformance/README.md +++ b/deployment/live/gcp/conformance/README.md @@ -21,6 +21,7 @@ need to be manually applied. You'll need the following tools installed: +- [`golang`](https://go.dev/doc/install) - [`docker`](https://docs.docker.com/engine/install/) - [`gcloud`](https://cloud.google.com/sdk/docs/install) - One of: @@ -30,7 +31,12 @@ You'll need the following tools installed: #### Google Cloud tooling -Ensure you've got already created a project you want to use, and have configured your local `gcloud` +> [!CAUTION] +> This example creates real Google Cloud resources running in your project. They will almost certainly +> cost you real money if left running. For the purposes of this demo it is strongly recommended that +> you create a new project so that you can easily clean up at the end. + +Once you've got a Google Cloud project you want to use, have configured your local `gcloud` tool use use it, and authenticated as a principle with sufficient ACLs for the project: ```bash @@ -47,8 +53,8 @@ export GOOGLE_PROJECT=$(gcloud config get project) # This should be a note signer string. # You can use the generate_keys tool to create a new signer & verifier pair: -# go run github.com/transparency-dev/serverless-log/cmd/generate_keys@HEAD --key_name="TestTessera" -export TESSERA_SIGNER={VALUE} +go run github.com/transparency-dev/serverless-log/cmd/generate_keys@HEAD --key_name="TestTessera" --out_priv=tessera.sec --out_pub=tessera.pub +export TESSERA_SIGNER=$(cat tessera.sec) # This is the name of the artifact registry docker repo to create/use. export DOCKER_REPO_NAME=tessera-docker @@ -126,3 +132,15 @@ Finally, apply the config using `terragrunt`: This should create all necessary infrastructure, and spin up a Cloud Run instance with the docker image you created above. + +### Clean up + +> [!IMPORTANT] +> You need to run this step on your project if you want to ensure you don't get charged into perpetuity +> for the resources we've setup. + +**This will delete your project!** +Do not do this on a project that you didn't create expressly and exclusively to run this demo. +```bash +gcloud projects delete ${GOOGLE_PROJECT} +``` diff --git a/integration/storage_uniformity_test.go b/integration/storage_uniformity_test.go index 123240aa..aeb12af4 100644 --- a/integration/storage_uniformity_test.go +++ b/integration/storage_uniformity_test.go @@ -28,8 +28,8 @@ import ( type StorageContract interface { Add(ctx context.Context, entry *tessera.Entry) tessera.IndexFuture ReadCheckpoint(ctx context.Context) ([]byte, error) - ReadTile(ctx context.Context, level, index, treeSize uint64) ([]byte, error) - ReadEntryBundle(ctx context.Context, index, treeSize uint64) ([]byte, error) + ReadTile(ctx context.Context, level, index uint64, p uint8) ([]byte, error) + ReadEntryBundle(ctx context.Context, index uint64, p uint8) ([]byte, error) } var ( diff --git a/internal/hammer/client.go b/internal/hammer/client.go index 58429100..63a04028 100644 --- a/internal/hammer/client.go +++ b/internal/hammer/client.go @@ -35,8 +35,8 @@ var ErrRetry = errors.New("retry") type fetcher interface { ReadCheckpoint(ctx context.Context) ([]byte, error) - ReadTile(ctx context.Context, l, i, sz uint64) ([]byte, error) - ReadEntryBundle(ctx context.Context, i, sz uint64) ([]byte, error) + ReadTile(ctx context.Context, l, i uint64, p uint8) ([]byte, error) + ReadEntryBundle(ctx context.Context, i uint64, p uint8) ([]byte, error) } // newLogClientsFromFlags returns a fetcher and a writer that will read @@ -110,14 +110,14 @@ func (rr *roundRobinFetcher) ReadCheckpoint(ctx context.Context) ([]byte, error) return f.ReadCheckpoint(ctx) } -func (rr *roundRobinFetcher) ReadTile(ctx context.Context, l, i, sz uint64) ([]byte, error) { +func (rr *roundRobinFetcher) ReadTile(ctx context.Context, l, i uint64, p uint8) ([]byte, error) { f := rr.next() - return f.ReadTile(ctx, l, i, sz) + return f.ReadTile(ctx, l, i, p) } -func (rr *roundRobinFetcher) ReadEntryBundle(ctx context.Context, i, sz uint64) ([]byte, error) { +func (rr *roundRobinFetcher) ReadEntryBundle(ctx context.Context, i uint64, p uint8) ([]byte, error) { f := rr.next() - return f.ReadEntryBundle(ctx, i, sz) + return f.ReadEntryBundle(ctx, i, p) } func (rr *roundRobinFetcher) next() fetcher { diff --git a/internal/options/options.go b/internal/options/options.go index 2d60c543..5a87c5ca 100644 --- a/internal/options/options.go +++ b/internal/options/options.go @@ -27,7 +27,7 @@ type NewCPFunc func(size uint64, hash []byte) ([]byte, error) type ParseCPFunc func(raw []byte) (*f_log.Checkpoint, error) // EntriesPathFunc is the signature of a function which knows how to format entry bundle paths. -type EntriesPathFunc func(n, logSize uint64) string +type EntriesPathFunc func(n uint64, p uint8) string // StorageOptions holds optional settings for all storage implementations. type StorageOptions struct { diff --git a/storage/aws/README.md b/storage/aws/README.md index 19b4aab6..838eff14 100644 --- a/storage/aws/README.md +++ b/storage/aws/README.md @@ -29,34 +29,28 @@ A table with a single row which is used to keep track of the next assignable seq This holds batches of entries keyed by the sequence number assigned to the first entry in the batch. ### `IntCoord` -TODO: add the new checkpoint updater logic, and update the docstring in aws.go. - -This table is used to coordinate integration of sequenced batches in the `Seq` table. +This table is used to coordinate integration of sequenced batches in the `Seq` table, and keep track of the current tree state. ## Life of a leaf -TODO: add the new checkpoint updater logic. - 1. Leaves are submitted by the binary built using Tessera via a call the storage's `Add` func. -2. [Not implemented yet - Dupe squashing: look for existing `` object, read assigned sequence number if present and return.] -3. The storage library batches these entries up, and, after a configurable period of time has elapsed +1. The storage library batches these entries up, and, after a configurable period of time has elapsed or the batch reaches a configurable size threshold, the batch is written to the `Seq` table which effectively assigns a sequence numbers to the entries using the following algorithm: In a transaction: 1. selects next from `SeqCoord` with for update ← this blocks other FE from writing their pools, but only for a short duration. - 2. Inserts batch of entries into `Seq` with key `SeqCoord.next` - 3. Update `SeqCoord` with `next+=len(batch)` -4. Integrators periodically integrate new sequenced entries into the tree: + 1. Inserts batch of entries into `Seq` with key `SeqCoord.next` + 1. Update `SeqCoord` with `next+=len(batch)` +1. Integrators periodically integrate new sequenced entries into the tree: In a transaction: 1. select `seq` from `IntCoord` with for update ← this blocks other integrators from proceeding. - 2. Select one or more consecutive batches from `Seq` for update, starting at `IntCoord.seq` - 3. Write leaf bundles to S3 using batched entries - 4. Integrate in Merkle tree and write tiles to S3 - 5. Update checkpoint in S3 - 6. Delete consumed batches from `Seq` - 7. Update `IntCoord` with `seq+=num_entries_integrated` - 8. [Not implemented yet - Dupe detection: - 1. Writes out `` containing the leaf's sequence number] + 1. Select one or more consecutive batches from `Seq` for update, starting at `IntCoord.seq` + 1. Write leaf bundles to S3 using batched entries + 1. Integrate in Merkle tree and write tiles to S3 + 1. Update checkpoint in S3 + 1. Delete consumed batches from `Seq` + 1. Update `IntCoord` with `seq+=num_entries_integrated` and the latest `rootHash` +1. Checkpoints representing the latest state of the tree are published at the configured interval. ## Dedup @@ -75,12 +69,12 @@ operational overhead, code complexity, and so was selected. The alpha implementation was tested with entries of size 1KB each, at a write rate of 1500/s. This was done using the smallest possible Aurora instance -availalbe, `db.r5.large`, running `8.0.mysql_aurora.3.05.2`. +available, `db.r5.large`, running `8.0.mysql_aurora.3.05.2`. Aurora (Serverless v2) worked out well, but seems less cost effective than -provisionned Aurora for sustained traffic. For now, we decided not to explore this option further. +provisioned Aurora for sustained traffic. For now, we decided not to explore this option further. -RDS (MySQL) worked out well, but requires more admistrative overhead than +RDS (MySQL) worked out well, but requires more administrative overhead than Aurora. For now, we decided not to explore this option further. DynamoDB worked out to be less cost efficient than Aurora and RDS. It also has diff --git a/storage/aws/aws.go b/storage/aws/aws.go index c4766190..19e3b954 100644 --- a/storage/aws/aws.go +++ b/storage/aws/aws.go @@ -59,9 +59,10 @@ import ( ) const ( - entryBundleSize = 256 - logContType = "application/octet-stream" - ckptContType = "text/plain; charset=utf-8" + entryBundleSize = 256 + logContType = "application/octet-stream" + ckptContType = "text/plain; charset=utf-8" + minCheckpointInterval = time.Second DefaultPushbackMaxOutstanding = 4096 DefaultIntegrationSizeLimit = 5 * 4096 @@ -129,6 +130,9 @@ func New(ctx context.Context, cfg Config, opts ...func(*options.StorageOptions)) if opt.PushbackMaxOutstanding == 0 { opt.PushbackMaxOutstanding = DefaultPushbackMaxOutstanding } + if opt.CheckpointInterval < minCheckpointInterval { + return nil, fmt.Errorf("requested CheckpointInterval (%v) is less than minimum permitted %v", opt.CheckpointInterval, minCheckpointInterval) + } sdkConfig, err := config.LoadDefaultConfig(ctx) if err != nil { @@ -225,12 +229,12 @@ func (s *Storage) ReadCheckpoint(ctx context.Context) ([]byte, error) { return s.get(ctx, layout.CheckpointPath) } -func (s *Storage) ReadTile(ctx context.Context, l, i, sz uint64) ([]byte, error) { - return s.get(ctx, layout.TilePath(l, i, sz)) +func (s *Storage) ReadTile(ctx context.Context, l, i uint64, p uint8) ([]byte, error) { + return s.get(ctx, layout.TilePath(l, i, p)) } -func (s *Storage) ReadEntryBundle(ctx context.Context, i, sz uint64) ([]byte, error) { - return s.get(ctx, s.entriesPath(i, sz)) +func (s *Storage) ReadEntryBundle(ctx context.Context, i uint64, p uint8) ([]byte, error) { + return s.get(ctx, s.entriesPath(i, p)) } // get returns the requested object. @@ -303,7 +307,7 @@ func (s *Storage) setTile(ctx context.Context, level, index, logSize uint64, til if err != nil { return err } - tPath := layout.TilePath(level, index, logSize) + tPath := layout.TilePath(level, index, layout.PartialTileSize(level, index, logSize)) klog.V(2).Infof("StoreTile: %s (%d entries)", tPath, len(tile.Nodes)) return s.objStore.setObjectIfNoneMatch(ctx, tPath, data, logContType) @@ -319,7 +323,7 @@ func (s *Storage) getTiles(ctx context.Context, tileIDs []storage.TileID, logSiz i := i id := id errG.Go(func() error { - objName := layout.TilePath(id.Level, id.Index, logSize) + objName := layout.TilePath(id.Level, id.Index, layout.PartialTileSize(id.Level, id.Index, logSize)) data, err := s.objStore.getObject(ctx, objName) if err != nil { // Do not use errors.Is. Keep errors.As to compare by type and not by value. @@ -349,8 +353,8 @@ func (s *Storage) getTiles(ctx context.Context, tileIDs []storage.TileID, logSiz // getEntryBundle returns the serialised entry bundle at the location implied by the given index and treeSize. // // Returns a wrapped os.ErrNotExist if the bundle does not exist. -func (s *Storage) getEntryBundle(ctx context.Context, bundleIndex uint64, logSize uint64) ([]byte, error) { - objName := s.entriesPath(bundleIndex, logSize) +func (s *Storage) getEntryBundle(ctx context.Context, bundleIndex uint64, p uint8) ([]byte, error) { + objName := s.entriesPath(bundleIndex, p) data, err := s.objStore.getObject(ctx, objName) if err != nil { // Do not use errors.Is. Keep errors.As to compare by type and not by value. @@ -367,8 +371,8 @@ func (s *Storage) getEntryBundle(ctx context.Context, bundleIndex uint64, logSiz } // setEntryBundle idempotently stores the serialised entry bundle at the location implied by the bundleIndex and treeSize. -func (s *Storage) setEntryBundle(ctx context.Context, bundleIndex uint64, logSize uint64, bundleRaw []byte) error { - objName := s.entriesPath(bundleIndex, logSize) +func (s *Storage) setEntryBundle(ctx context.Context, bundleIndex uint64, p uint8, bundleRaw []byte) error { + objName := s.entriesPath(bundleIndex, p) // Note that setObject does an idempotent interpretation of IfNoneMatch - it only // returns an error if the named object exists _and_ contains different data to what's // passed in here. @@ -433,11 +437,11 @@ func (s *Storage) updateEntryBundles(ctx context.Context, fromSeq uint64, entrie } numAdded := uint64(0) - bundleIndex, entriesInBundle := fromSeq/entryBundleSize, fromSeq%entryBundleSize + bundleIndex, entriesInBundle := fromSeq/layout.EntryBundleWidth, fromSeq%layout.EntryBundleWidth bundleWriter := &bytes.Buffer{} if entriesInBundle > 0 { // If the latest bundle is partial, we need to read the data it contains in for our newer, larger, bundle. - part, err := s.getEntryBundle(ctx, uint64(bundleIndex), uint64(entriesInBundle)) + part, err := s.getEntryBundle(ctx, uint64(bundleIndex), uint8(entriesInBundle)) if err != nil { return err } @@ -451,9 +455,9 @@ func (s *Storage) updateEntryBundles(ctx context.Context, fromSeq uint64, entrie // goSetEntryBundle is a function which uses seqErr to spin off a go-routine to write out an entry bundle. // It's used in the for loop below. - goSetEntryBundle := func(ctx context.Context, bundleIndex uint64, fromSeq uint64, bundleRaw []byte) { + goSetEntryBundle := func(ctx context.Context, bundleIndex uint64, p uint8, bundleRaw []byte) { seqErr.Go(func() error { - if err := s.setEntryBundle(ctx, bundleIndex, fromSeq, bundleRaw); err != nil { + if err := s.setEntryBundle(ctx, bundleIndex, p, bundleRaw); err != nil { return err } return nil @@ -468,10 +472,10 @@ func (s *Storage) updateEntryBundles(ctx context.Context, fromSeq uint64, entrie entriesInBundle++ fromSeq++ numAdded++ - if entriesInBundle == entryBundleSize { + if entriesInBundle == layout.EntryBundleWidth { // This bundle is full, so we need to write it out... klog.V(1).Infof("In-memory bundle idx %d is full, attempting write to S3", bundleIndex) - goSetEntryBundle(ctx, bundleIndex, fromSeq, bundleWriter.Bytes()) + goSetEntryBundle(ctx, bundleIndex, 0, bundleWriter.Bytes()) // ... and prepare the next entry bundle for any remaining entries in the batch bundleIndex++ entriesInBundle = 0 @@ -484,7 +488,7 @@ func (s *Storage) updateEntryBundles(ctx context.Context, fromSeq uint64, entrie // this needs writing out too. if entriesInBundle > 0 { klog.V(1).Infof("Attempting to write in-memory partial bundle idx %d.%d to S3", bundleIndex, entriesInBundle) - goSetEntryBundle(ctx, bundleIndex, fromSeq, bundleWriter.Bytes()) + goSetEntryBundle(ctx, bundleIndex, uint8(entriesInBundle), bundleWriter.Bytes()) } return seqErr.Wait() } diff --git a/storage/aws/aws_test.go b/storage/aws/aws_test.go index 5ce734be..7c866657 100644 --- a/storage/aws/aws_test.go +++ b/storage/aws/aws_test.go @@ -294,7 +294,7 @@ func TestTileRoundtrip(t *testing.T) { t.Fatalf("setTile: %v", err) } - expPath := layout.TilePath(test.level, test.index, test.logSize) + expPath := layout.TilePath(test.level, test.index, layout.PartialTileSize(test.level, test.index, test.logSize)) _, ok := m.mem[expPath] if !ok { t.Fatalf("want tile at %v but found none", expPath) @@ -334,29 +334,29 @@ func TestBundleRoundtrip(t *testing.T) { for _, test := range []struct { name string index uint64 - logSize uint64 + p uint8 bundleSize uint64 }{ { name: "ok", index: 3 * 256, - logSize: 3*256 + 20, + p: 20, bundleSize: 20, }, } { t.Run(test.name, func(t *testing.T) { wantBundle := makeBundle(t, test.bundleSize) - if err := s.setEntryBundle(ctx, test.index, test.logSize, wantBundle); err != nil { + if err := s.setEntryBundle(ctx, test.index, test.p, wantBundle); err != nil { t.Fatalf("setEntryBundle: %v", err) } - expPath := layout.EntriesPath(test.index, test.logSize) + expPath := layout.EntriesPath(test.index, test.p) _, ok := m.mem[expPath] if !ok { t.Fatalf("want bundle at %v but found none", expPath) } - got, err := s.getEntryBundle(ctx, test.index, test.logSize) + got, err := s.getEntryBundle(ctx, test.index, test.p) if err != nil { t.Fatalf("getEntryBundle: %v", err) } diff --git a/storage/gcp/README.md b/storage/gcp/README.md index bee47c1f..07fdb002 100644 --- a/storage/gcp/README.md +++ b/storage/gcp/README.md @@ -34,7 +34,6 @@ This table is used to coordinate integration of sequenced batches in the `Seq` t ## Life of a leaf 1. Leaves are submitted by the binary built using Tessera via a call the storage's `Add` func. -1. Dupe squashing (TODO): look for existing `` object, read assigned sequence number if present and return. 1. The storage library batches these entries up, and, after a configurable period of time has elapsed or the batch reaches a configurable size threshold, the batch is written to the `Seq` table which effectively assigns a sequence numbers to the entries using the following algorithm: @@ -48,11 +47,9 @@ This table is used to coordinate integration of sequenced batches in the `Seq` t 1. Select one or more consecutive batches from `Seq` for update, starting at `IntCoord.seq` 1. Write leaf bundles to GCS using batched entries 1. Integrate in Merkle tree and write tiles to GCS - 1. Update checkpoint in GCS 1. Delete consumed batches from `Seq` - 1. Update `IntCoord` with `seq+=num_entries_integrated` - 1. Dupe detection (TODO): - 1. Writes out `` containing the leaf's sequence number + 1. Update `IntCoord` with `seq+=num_entries_integrated` and the latest `rootHash` +1. Checkpoints representing the latest state of the tree are published at the configured interval. ## Dedup diff --git a/storage/gcp/gcp.go b/storage/gcp/gcp.go index 6b5fce2f..bf4e95a7 100644 --- a/storage/gcp/gcp.go +++ b/storage/gcp/gcp.go @@ -37,11 +37,13 @@ import ( "io" "net/http" "os" + "sync/atomic" "time" "cloud.google.com/go/spanner" "cloud.google.com/go/spanner/apiv1/spannerpb" gcs "cloud.google.com/go/storage" + "github.com/globocom/go-buffer" "github.com/google/go-cmp/cmp" "github.com/transparency-dev/merkle/rfc6962" tessera "github.com/transparency-dev/trillian-tessera" @@ -57,9 +59,16 @@ import ( ) const ( - entryBundleSize = 256 - logContType = "application/octet-stream" - ckptContType = "text/plain; charset=utf-8" + // minCheckpointInterval is the shortest permitted interval between updating published checkpoints. + // GCS has a rate limit 1 update per second for individual objects, but we've observed that attempting + // to update at exactly that rate still results in the occasional refusal, so bake in a little wiggle + // room. + minCheckpointInterval = 1200 * time.Millisecond + + logContType = "application/octet-stream" + ckptContType = "text/plain; charset=utf-8" + logCacheControl = "max-age=604800,immutable" + ckptCacheControl = "no-cache" DefaultPushbackMaxOutstanding = 4096 DefaultIntegrationSizeLimit = 5 * 4096 @@ -81,7 +90,7 @@ type Storage struct { // objStore describes a type which can store and retrieve objects. type objStore interface { getObject(ctx context.Context, obj string) ([]byte, int64, error) - setObject(ctx context.Context, obj string, data []byte, cond *gcs.Conditions, contType string) error + setObject(ctx context.Context, obj string, data []byte, cond *gcs.Conditions, contType string, cacheCtl string) error lastModified(ctx context.Context, obj string) (time.Time, error) } @@ -119,6 +128,9 @@ func New(ctx context.Context, cfg Config, opts ...func(*options.StorageOptions)) if opt.PushbackMaxOutstanding == 0 { opt.PushbackMaxOutstanding = DefaultPushbackMaxOutstanding } + if opt.CheckpointInterval < minCheckpointInterval { + return nil, fmt.Errorf("requested CheckpointInterval (%v) is less than minimum permitted %v", opt.CheckpointInterval, minCheckpointInterval) + } c, err := gcs.NewClient(ctx, gcs.WithJSONReads()) if err != nil { @@ -201,12 +213,12 @@ func (s *Storage) ReadCheckpoint(ctx context.Context) ([]byte, error) { return s.get(ctx, layout.CheckpointPath) } -func (s *Storage) ReadTile(ctx context.Context, l, i, sz uint64) ([]byte, error) { - return s.get(ctx, layout.TilePath(l, i, sz)) +func (s *Storage) ReadTile(ctx context.Context, l, i uint64, p uint8) ([]byte, error) { + return s.get(ctx, layout.TilePath(l, i, p)) } -func (s *Storage) ReadEntryBundle(ctx context.Context, i, sz uint64) ([]byte, error) { - return s.get(ctx, s.entriesPath(i, sz)) +func (s *Storage) ReadEntryBundle(ctx context.Context, i uint64, p uint8) ([]byte, error) { + return s.get(ctx, s.entriesPath(i, p)) } // get returns the requested object. @@ -260,7 +272,7 @@ func (s *Storage) publishCheckpoint(ctx context.Context, minStaleness time.Durat return fmt.Errorf("newCP: %v", err) } - if err := s.objStore.setObject(ctx, layout.CheckpointPath, cpRaw, nil, ckptContType); err != nil { + if err := s.objStore.setObject(ctx, layout.CheckpointPath, cpRaw, nil, ckptContType, ckptCacheControl); err != nil { return fmt.Errorf("writeCheckpoint: %v", err) } return nil @@ -275,10 +287,10 @@ func (s *Storage) setTile(ctx context.Context, level, index, logSize uint64, til if err != nil { return err } - tPath := layout.TilePath(level, index, logSize) + tPath := layout.TilePath(level, index, layout.PartialTileSize(level, index, logSize)) klog.V(2).Infof("StoreTile: %s (%d entries)", tPath, len(tile.Nodes)) - return s.objStore.setObject(ctx, tPath, data, &gcs.Conditions{DoesNotExist: true}, logContType) + return s.objStore.setObject(ctx, tPath, data, &gcs.Conditions{DoesNotExist: true}, logContType, logCacheControl) } // getTiles returns the tiles with the given tile-coords for the specified log size. @@ -291,7 +303,7 @@ func (s *Storage) getTiles(ctx context.Context, tileIDs []storage.TileID, logSiz i := i id := id errG.Go(func() error { - objName := layout.TilePath(id.Level, id.Index, logSize) + objName := layout.TilePath(id.Level, id.Index, layout.PartialTileSize(id.Level, id.Index, logSize)) data, _, err := s.objStore.getObject(ctx, objName) if err != nil { if errors.Is(err, gcs.ErrObjectNotExist) { @@ -316,11 +328,12 @@ func (s *Storage) getTiles(ctx context.Context, tileIDs []storage.TileID, logSiz } -// getEntryBundle returns the serialised entry bundle at the location implied by the given index and treeSize. +// getEntryBundle returns the serialised entry bundle at the location described by the given index and partial size. +// A partial size of zero implies a full tile. // // Returns a wrapped os.ErrNotExist if the bundle does not exist. -func (s *Storage) getEntryBundle(ctx context.Context, bundleIndex uint64, logSize uint64) ([]byte, error) { - objName := s.entriesPath(bundleIndex, logSize) +func (s *Storage) getEntryBundle(ctx context.Context, bundleIndex uint64, p uint8) ([]byte, error) { + objName := s.entriesPath(bundleIndex, p) data, _, err := s.objStore.getObject(ctx, objName) if err != nil { if errors.Is(err, gcs.ErrObjectNotExist) { @@ -335,12 +348,12 @@ func (s *Storage) getEntryBundle(ctx context.Context, bundleIndex uint64, logSiz } // setEntryBundle idempotently stores the serialised entry bundle at the location implied by the bundleIndex and treeSize. -func (s *Storage) setEntryBundle(ctx context.Context, bundleIndex uint64, logSize uint64, bundleRaw []byte) error { - objName := s.entriesPath(bundleIndex, logSize) +func (s *Storage) setEntryBundle(ctx context.Context, bundleIndex uint64, p uint8, bundleRaw []byte) error { + objName := s.entriesPath(bundleIndex, p) // Note that setObject does an idempotent interpretation of DoesNotExist - it only // returns an error if the named object exists _and_ contains different data to what's // passed in here. - if err := s.objStore.setObject(ctx, objName, bundleRaw, &gcs.Conditions{DoesNotExist: true}, logContType); err != nil { + if err := s.objStore.setObject(ctx, objName, bundleRaw, &gcs.Conditions{DoesNotExist: true}, logContType, logCacheControl); err != nil { return fmt.Errorf("setObject(%q): %v", objName, err) } @@ -398,11 +411,11 @@ func (s *Storage) updateEntryBundles(ctx context.Context, fromSeq uint64, entrie } numAdded := uint64(0) - bundleIndex, entriesInBundle := fromSeq/entryBundleSize, fromSeq%entryBundleSize + bundleIndex, entriesInBundle := fromSeq/layout.EntryBundleWidth, fromSeq%layout.EntryBundleWidth bundleWriter := &bytes.Buffer{} if entriesInBundle > 0 { // If the latest bundle is partial, we need to read the data it contains in for our newer, larger, bundle. - part, err := s.getEntryBundle(ctx, uint64(bundleIndex), uint64(entriesInBundle)) + part, err := s.getEntryBundle(ctx, uint64(bundleIndex), uint8(entriesInBundle)) if err != nil { return err } @@ -416,9 +429,9 @@ func (s *Storage) updateEntryBundles(ctx context.Context, fromSeq uint64, entrie // goSetEntryBundle is a function which uses seqErr to spin off a go-routine to write out an entry bundle. // It's used in the for loop below. - goSetEntryBundle := func(ctx context.Context, bundleIndex uint64, fromSeq uint64, bundleRaw []byte) { + goSetEntryBundle := func(ctx context.Context, bundleIndex uint64, p uint8, bundleRaw []byte) { seqErr.Go(func() error { - if err := s.setEntryBundle(ctx, bundleIndex, fromSeq, bundleRaw); err != nil { + if err := s.setEntryBundle(ctx, bundleIndex, p, bundleRaw); err != nil { return err } return nil @@ -433,10 +446,10 @@ func (s *Storage) updateEntryBundles(ctx context.Context, fromSeq uint64, entrie entriesInBundle++ fromSeq++ numAdded++ - if entriesInBundle == entryBundleSize { + if entriesInBundle == layout.EntryBundleWidth { // This bundle is full, so we need to write it out... klog.V(1).Infof("In-memory bundle idx %d is full, attempting write to GCS", bundleIndex) - goSetEntryBundle(ctx, bundleIndex, fromSeq, bundleWriter.Bytes()) + goSetEntryBundle(ctx, bundleIndex, 0, bundleWriter.Bytes()) // ... and prepare the next entry bundle for any remaining entries in the batch bundleIndex++ entriesInBundle = 0 @@ -449,7 +462,7 @@ func (s *Storage) updateEntryBundles(ctx context.Context, fromSeq uint64, entrie // this needs writing out too. if entriesInBundle > 0 { klog.V(1).Infof("Attempting to write in-memory partial bundle idx %d.%d to GCS", bundleIndex, entriesInBundle) - goSetEntryBundle(ctx, bundleIndex, fromSeq, bundleWriter.Bytes()) + goSetEntryBundle(ctx, bundleIndex, uint8(entriesInBundle), bundleWriter.Bytes()) } return seqErr.Wait() } @@ -737,7 +750,7 @@ func (s *gcsStorage) getObject(ctx context.Context, obj string) ([]byte, int64, // Note that when preconditions are specified and are not met, an error will be returned *unless* // the currently stored data is bit-for-bit identical to the data to-be-written. // This is intended to provide idempotentency for writes. -func (s *gcsStorage) setObject(ctx context.Context, objName string, data []byte, cond *gcs.Conditions, contType string) error { +func (s *gcsStorage) setObject(ctx context.Context, objName string, data []byte, cond *gcs.Conditions, contType string, cacheCtl string) error { bkt := s.gcsClient.Bucket(s.bucket) obj := bkt.Object(objName) @@ -749,6 +762,7 @@ func (s *gcsStorage) setObject(ctx context.Context, objName string, data []byte, w = obj.If(*cond).NewWriter(ctx) } w.ObjectAttrs.ContentType = contType + w.ObjectAttrs.CacheControl = cacheCtl if _, err := w.Write(data); err != nil { return fmt.Errorf("failed to write object %q to bucket %q: %w", objName, s.bucket, err) } @@ -783,3 +797,170 @@ func (s *gcsStorage) lastModified(ctx context.Context, obj string) (time.Time, e } return r.Attrs.LastModified, r.Close() } + +// NewDedupe returns wrapped Add func which will use Spanner to maintain a mapping of +// previously seen entries and their assigned indices. Future calls with the same entry +// will return the previously assigned index, as yet unseen entries will be passed to the provided +// delegate function to have an index assigned. +// +// For performance reasons, the ID -> index associations returned by the delegate are buffered before +// being flushed to Spanner. This can result in duplicates occuring in some circumstances, but in +// general this should not be a problem. +// +// Note that the storage for this mapping is entirely separate and unconnected to the storage used for +// maintaining the Merkle tree. +// +// This functionality is experimental! +func NewDedupe(ctx context.Context, spannerDB string, delegate func(ctx context.Context, e *tessera.Entry) tessera.IndexFuture) (func(ctx context.Context, e *tessera.Entry) tessera.IndexFuture, error) { + /* + Schema for reference: + + CREATE TABLE IDSeq ( + id INT64 NOT NULL, + h BYTES(MAX) NOT NULL, + idx INT64 NOT NULL, + ) PRIMARY KEY (id, h); + */ + dedupDB, err := spanner.NewClient(ctx, spannerDB) + if err != nil { + return nil, fmt.Errorf("failed to connect to Spanner: %v", err) + } + + r := &dedupStorage{ + ctx: ctx, + dbPool: dedupDB, + delegate: delegate, + } + + // TODO(al): Make these configurable + r.buf = buffer.New( + buffer.WithSize(64), + buffer.WithFlushInterval(200*time.Millisecond), + buffer.WithFlusher(buffer.FlusherFunc(r.flush)), + buffer.WithPushTimeout(15*time.Second), + ) + go func(ctx context.Context) { + t := time.NewTicker(time.Second) + for { + select { + case <-ctx.Done(): + return + case <-t.C: + klog.V(1).Infof("DEDUP: # Writes %d, # Lookups %d, # DB hits %v, # buffer Push discards %d", r.numWrites.Load(), r.numLookups.Load(), r.numDBDedups.Load(), r.numPushErrs.Load()) + } + } + }(ctx) + return r.add, nil +} + +type dedupStorage struct { + ctx context.Context + dbPool *spanner.Client + delegate func(ctx context.Context, e *tessera.Entry) tessera.IndexFuture + + numLookups atomic.Uint64 + numWrites atomic.Uint64 + numDBDedups atomic.Uint64 + numPushErrs atomic.Uint64 + + buf *buffer.Buffer +} + +// index returns the index (if any) previously associated with the provided hash +func (d *dedupStorage) index(ctx context.Context, h []byte) (*uint64, error) { + d.numLookups.Add(1) + var idx int64 + if row, err := d.dbPool.Single().ReadRow(ctx, "IDSeq", spanner.Key{0, h}, []string{"idx"}); err != nil { + if c := spanner.ErrCode(err); c == codes.NotFound { + return nil, nil + } + return nil, err + } else { + if err := row.Column(0, &idx); err != nil { + return nil, fmt.Errorf("failed to read dedup index: %v", err) + } + idx := uint64(idx) + d.numDBDedups.Add(1) + return &idx, nil + } +} + +// storeMappings stores the associations between the keys and IDs in a non-atomic fashion +// (i.e. it does not store all or none in a transactional sense). +// +// Returns an error if one or more mappings cannot be stored. +func (d *dedupStorage) storeMappings(ctx context.Context, entries []dedupeMapping) error { + m := make([]*spanner.MutationGroup, 0, len(entries)) + for _, e := range entries { + m = append(m, &spanner.MutationGroup{ + Mutations: []*spanner.Mutation{spanner.Insert("IDSeq", []string{"id", "h", "idx"}, []interface{}{0, e.ID, int64(e.Idx)})}, + }) + } + + i := d.dbPool.BatchWrite(ctx, m) + return i.Do(func(r *spannerpb.BatchWriteResponse) error { + s := r.GetStatus() + if c := codes.Code(s.Code); c != codes.OK && c != codes.AlreadyExists { + return fmt.Errorf("failed to write dedup record: %v (%v)", s.GetMessage(), c) + } + return nil + }) +} + +// dedupeMapping represents an ID -> index mapping. +type dedupeMapping struct { + ID []byte + Idx uint64 +} + +// add adds the entry to the underlying delegate only if e isn't already known. In either case, +// an IndexFuture will be returned that the client can use to get the sequence number of this entry. +func (d *dedupStorage) add(ctx context.Context, e *tessera.Entry) tessera.IndexFuture { + idx, err := d.index(ctx, e.Identity()) + if err != nil { + return func() (uint64, error) { return 0, err } + } + if idx != nil { + return func() (uint64, error) { return *idx, nil } + } + + i, err := d.delegate(ctx, e)() + if err != nil { + return func() (uint64, error) { return 0, err } + } + + err = d.enqueueMapping(ctx, e.Identity(), i) + return func() (uint64, error) { + return i, err + } +} + +// enqueueMapping buffers the provided ID -> index mapping ready to be flushed to storage. +func (d *dedupStorage) enqueueMapping(_ context.Context, h []byte, idx uint64) error { + err := d.buf.Push(dedupeMapping{ID: h, Idx: idx}) + if err != nil { + d.numPushErrs.Add(1) + // This means there's pressure flushing dedup writes out, so discard this write. + if err != buffer.ErrTimeout { + return err + } + } + return nil +} + +// flush writes enqueued mappings to storage. +func (d *dedupStorage) flush(items []interface{}) { + entries := make([]dedupeMapping, len(items)) + for i := range items { + entries[i] = items[i].(dedupeMapping) + } + + ctx, c := context.WithTimeout(d.ctx, 15*time.Second) + defer c() + + if err := d.storeMappings(ctx, entries); err != nil { + klog.Infof("Failed to flush dedup entries: %v", err) + return + } + d.numWrites.Add(uint64(len(entries))) +} diff --git a/storage/gcp/gcp_test.go b/storage/gcp/gcp_test.go index 2210301b..ed44df86 100644 --- a/storage/gcp/gcp_test.go +++ b/storage/gcp/gcp_test.go @@ -232,7 +232,7 @@ func TestTileRoundtrip(t *testing.T) { t.Fatalf("setTile: %v", err) } - expPath := layout.TilePath(test.level, test.index, test.logSize) + expPath := layout.TilePath(test.level, test.index, layout.PartialTileSize(test.level, test.index, test.logSize)) _, ok := m.mem[expPath] if !ok { t.Fatalf("want tile at %v but found none", expPath) @@ -284,17 +284,17 @@ func TestBundleRoundtrip(t *testing.T) { } { t.Run(test.name, func(t *testing.T) { wantBundle := makeBundle(t, test.bundleSize) - if err := s.setEntryBundle(ctx, test.index, test.logSize, wantBundle); err != nil { + if err := s.setEntryBundle(ctx, test.index, uint8(test.bundleSize), wantBundle); err != nil { t.Fatalf("setEntryBundle: %v", err) } - expPath := layout.EntriesPath(test.index, test.logSize) + expPath := layout.EntriesPath(test.index, layout.PartialTileSize(0, test.index, test.logSize)) _, ok := m.mem[expPath] if !ok { t.Fatalf("want bundle at %v but found none", expPath) } - got, err := s.getEntryBundle(ctx, test.index, test.logSize) + got, err := s.getEntryBundle(ctx, test.index, layout.PartialTileSize(0, test.index, test.logSize)) if err != nil { t.Fatalf("getEntryBundle: %v", err) } @@ -347,7 +347,7 @@ func TestPublishCheckpoint(t *testing.T) { t.Fatalf("storage.init: %v", err) } cpOld := []byte("bananas") - if err := m.setObject(ctx, layout.CheckpointPath, cpOld, nil, ""); err != nil { + if err := m.setObject(ctx, layout.CheckpointPath, cpOld, nil, "", ""); err != nil { t.Fatalf("setObject(bananas): %v", err) } m.lMod = test.cpModifiedAt @@ -394,7 +394,7 @@ func (m *memObjStore) getObject(_ context.Context, obj string) ([]byte, int64, e } // TODO(phboneff): add content type tests -func (m *memObjStore) setObject(_ context.Context, obj string, data []byte, cond *gcs.Conditions, _ string) error { +func (m *memObjStore) setObject(_ context.Context, obj string, data []byte, cond *gcs.Conditions, _, _ string) error { m.Lock() defer m.Unlock() diff --git a/storage/internal/integrate.go b/storage/internal/integrate.go index d04ac27b..275016c8 100644 --- a/storage/internal/integrate.go +++ b/storage/internal/integrate.go @@ -177,7 +177,7 @@ func newTileReadCache(getTiles func(ctx context.Context, tileIDs []TileID, treeS // Get returns a previously set tile and true, or, if no such tile is in the cache, attempt to fetch it. func (r *tileReadCache) Get(ctx context.Context, tileID TileID, treeSize uint64) (*populatedTile, error) { - k := layout.TilePath(uint64(tileID.Level), tileID.Index, treeSize) + k := layout.TilePath(uint64(tileID.Level), tileID.Index, layout.PartialTileSize(tileID.Level, tileID.Index, treeSize)) e, ok := r.entries[k] if !ok { klog.V(1).Infof("Readcache miss: %q", k) @@ -207,7 +207,7 @@ func (r *tileReadCache) Prewarm(ctx context.Context, tileIDs []TileID, treeSize if err != nil { return fmt.Errorf("failed to create fulltile: %v", err) } - k := layout.TilePath(uint64(tileIDs[i].Level), tileIDs[i].Index, treeSize) + k := layout.TilePath(uint64(tileIDs[i].Level), tileIDs[i].Index, layout.PartialTileSize(tileIDs[i].Level, tileIDs[i].Index, treeSize)) r.entries[k] = e } return nil diff --git a/storage/internal/integrate_test.go b/storage/internal/integrate_test.go index 6a12aafd..07a6c737 100644 --- a/storage/internal/integrate_test.go +++ b/storage/internal/integrate_test.go @@ -170,9 +170,8 @@ func BenchmarkIntegrate(b *testing.B) { m := newMemTileStore[api.HashTile]() chunkSize := 200 - numChunks := 256 seq := uint64(0) - for chunk := 0; chunk < numChunks; chunk++ { + for chunk := 0; chunk < b.N; chunk++ { oldSeq := seq c := make([]SequencedEntry, chunkSize) for i := range c { @@ -222,7 +221,7 @@ func (m *memTileStore[T]) getTile(_ context.Context, id TileID, treeSize uint64) m.RLock() defer m.RUnlock() - k := layout.TilePath(id.Level, id.Index, treeSize) + k := layout.TilePath(id.Level, id.Index, layout.PartialTileSize(id.Level, id.Index, treeSize)) d := m.mem[k] return d, nil } @@ -233,7 +232,7 @@ func (m *memTileStore[T]) getTiles(_ context.Context, ids []TileID, treeSize uin r := make([]*T, len(ids)) for i, id := range ids { - k := layout.TilePath(id.Level, id.Index, treeSize) + k := layout.TilePath(id.Level, id.Index, layout.PartialTileSize(id.Level, id.Index, treeSize)) klog.V(1).Infof("mem.getTile(%q, %d)", k, treeSize) d, ok := m.mem[k] if !ok { @@ -248,7 +247,7 @@ func (m *memTileStore[T]) setTile(_ context.Context, id TileID, treeSize uint64, m.Lock() defer m.Unlock() - k := layout.TilePath(id.Level, id.Index, treeSize) + k := layout.TilePath(id.Level, id.Index, layout.PartialTileSize(id.Level, id.Index, treeSize)) klog.V(1).Infof("mem.setTile(%q, %d)", k, treeSize) _, ok := m.mem[k] if ok { diff --git a/storage/mysql/DESIGN.md b/storage/mysql/DESIGN.md index 60feeff7..6f67e203 100644 --- a/storage/mysql/DESIGN.md +++ b/storage/mysql/DESIGN.md @@ -17,7 +17,11 @@ The DB layout has been designed such that serving any read request is a point lo #### `Checkpoint` -A single row that records the current state of the log. Updated after every sequence + integration. +A single row that records the current published checkpoint. + +#### `TreeState` + +A single row that records the current state of the tree. Updated after every integration. #### `Subtree` @@ -51,12 +55,13 @@ Sequence pool: Sequence & integrate (DB integration starts here): 1. Takes a batch of entries to sequence and integrate -1. Starts a transaction, which first takes a write lock on the checkpoint row to ensure that: +1. Starts a transaction, which first takes a write lock on the `TreeState` row to ensure that: 1. No other processes will be competing with this work. - 1. That the next index to sequence is known (this is the same as the current checkpoint size) + 1. That the next index to sequence is known (this is the same as the current tree size) 1. Update the required TiledLeaves rows -1. Perform an integration operation to update the Merkle tree, updating/adding Subtree rows as needed, and eventually updating the Checkpoint row +1. Perform an integration operation to update the Merkle tree, updating/adding Subtree rows as needed, and eventually updating the `TreeState` row 1. Commit the transaction +1. Checkpoints representing the latest state of the tree are published at the configured interval. ## Costs diff --git a/storage/mysql/mysql.go b/storage/mysql/mysql.go index 15e236af..537db2db 100644 --- a/storage/mysql/mysql.go +++ b/storage/mysql/mysql.go @@ -18,9 +18,12 @@ package mysql import ( "bytes" "context" + "crypto/sha256" "database/sql" "errors" "fmt" + "io/fs" + "os" "strings" "time" @@ -28,6 +31,7 @@ import ( "github.com/transparency-dev/merkle/rfc6962" tessera "github.com/transparency-dev/trillian-tessera" "github.com/transparency-dev/trillian-tessera/api" + "github.com/transparency-dev/trillian-tessera/api/layout" options "github.com/transparency-dev/trillian-tessera/internal/options" storage "github.com/transparency-dev/trillian-tessera/storage/internal" "k8s.io/klog/v2" @@ -42,12 +46,14 @@ const ( replaceTreeStateSQL = "REPLACE INTO `TreeState` (`id`, `size`, `root`) VALUES (?, ?, ?)" selectSubtreeByLevelAndIndexSQL = "SELECT `nodes` FROM `Subtree` WHERE `level` = ? AND `index` = ?" replaceSubtreeSQL = "REPLACE INTO `Subtree` (`level`, `index`, `nodes`) VALUES (?, ?, ?)" - selectTiledLeavesSQL = "SELECT `data` FROM `TiledLeaves` WHERE `tile_index` = ?" - replaceTiledLeavesSQL = "REPLACE INTO `TiledLeaves` (`tile_index`, `data`) VALUES (?, ?)" + selectTiledLeavesSQL = "SELECT `size`, `data` FROM `TiledLeaves` WHERE `tile_index` = ?" + replaceTiledLeavesSQL = "REPLACE INTO `TiledLeaves` (`tile_index`, `size`, `data`) VALUES (?, ?, ?)" checkpointID = 0 treeStateID = 0 entryBundleSize = 256 + + minCheckpointInterval = time.Second ) // Storage is a MySQL-based storage implementation for Tessera. @@ -64,6 +70,10 @@ type Storage struct { // Note that `tessera.WithCheckpointSigner()` is mandatory in the `opts` argument. func New(ctx context.Context, db *sql.DB, opts ...func(*options.StorageOptions)) (*Storage, error) { opt := storage.ResolveStorageOptions(opts...) + if opt.CheckpointInterval < minCheckpointInterval { + return nil, fmt.Errorf("requested CheckpointInterval too low - %v < %v", opt.CheckpointInterval, minCheckpointInterval) + } + s := &Storage{ db: db, newCheckpoint: opt.NewCP, @@ -121,7 +131,7 @@ func (s *Storage) maybeInitTree(ctx context.Context) error { }() treeState, err := s.readTreeState(ctx, tx) - if err != nil { + if err != nil && !errors.Is(err, fs.ErrNotExist) { klog.Errorf("Failed to read tree state: %v", err) return err } @@ -142,7 +152,7 @@ func (s *Storage) maybeInitTree(ctx context.Context) error { } // ReadCheckpoint returns the latest stored checkpoint. -// If the checkpoint is not found, nil is returned with no error. +// If the checkpoint is not found, it returns os.ErrNotExist. func (s *Storage) ReadCheckpoint(ctx context.Context) ([]byte, error) { row := s.db.QueryRowContext(ctx, selectCheckpointByIDSQL, checkpointID) if err := row.Err(); err != nil { @@ -153,7 +163,7 @@ func (s *Storage) ReadCheckpoint(ctx context.Context) ([]byte, error) { var at int64 if err := row.Scan(&checkpoint, &at); err != nil { if err == sql.ErrNoRows { - return nil, nil + return nil, os.ErrNotExist } return nil, fmt.Errorf("scan checkpoint: %v", err) } @@ -207,7 +217,7 @@ type treeState struct { } // readTreeState returns the currently stored tree state information. -// If there is no stored tree state, nil is returned with no error. +// If there is no stored tree state, it returns os.ErrNotExist. func (s *Storage) readTreeState(ctx context.Context, tx *sql.Tx) (*treeState, error) { row := tx.QueryRowContext(ctx, selectTreeStateByIDForUpdateSQL, treeStateID) if err := row.Err(); err != nil { @@ -217,7 +227,7 @@ func (s *Storage) readTreeState(ctx context.Context, tx *sql.Tx) (*treeState, er r := &treeState{} if err := row.Scan(&r.size, &r.root); err != nil { if err == sql.ErrNoRows { - return nil, nil + return nil, os.ErrNotExist } return nil, fmt.Errorf("scan tree state: %v", err) } @@ -234,16 +244,13 @@ func (s *Storage) writeTreeState(ctx context.Context, tx *sql.Tx, size uint64, r return nil } -// ReadTile returns a full tile or a partial tile at the given level, index and width. -// If the tile is not found, nil is returned with no error. +// ReadTile returns a full tile or a partial tile at the given level, index and treeSize. +// If the tile is not found, it returns os.ErrNotExist. // -// TODO: Handle the following scenarios: -// 1. Full tile request with full tile output: Return full tile. -// 2. Full tile request with partial tile output: Return error. -// 3. Partial tile request with full/larger partial tile output: Return trimmed partial tile with correct tile width. -// 4. Partial tile request with partial tile (same width) output: Return partial tile. -// 5. Partial tile request with smaller partial tile output: Return error. -func (s *Storage) ReadTile(ctx context.Context, level, index, width uint64) ([]byte, error) { +// Note that if a partial tile is requested, but a larger tile is available, this +// will return the largest tile available. This could be trimmed to return only the +// number of entries specifically requested if this behaviour becomes problematic. +func (s *Storage) ReadTile(ctx context.Context, level, index uint64, p uint8) ([]byte, error) { row := s.db.QueryRowContext(ctx, selectSubtreeByLevelAndIndexSQL, level, index) if err := row.Err(); err != nil { return nil, err @@ -252,15 +259,20 @@ func (s *Storage) ReadTile(ctx context.Context, level, index, width uint64) ([]b var tile []byte if err := row.Scan(&tile); err != nil { if err == sql.ErrNoRows { - return nil, nil + return nil, os.ErrNotExist } return nil, fmt.Errorf("scan tile: %v", err) } - // Return nil when returning a partial tile on a full tile request. - if width == 256 && uint64(len(tile)/32) != width { - return nil, nil + numEntries := uint64(len(tile) / sha256.Size) + requestedEntries := uint64(p) + if requestedEntries == 0 { + requestedEntries = 256 + } + if requestedEntries > numEntries { + // If the user has requested a size larger than we have, they can't have it + return nil, os.ErrNotExist } return tile, nil @@ -277,34 +289,40 @@ func (s *Storage) writeTile(ctx context.Context, tx *sql.Tx, level, index uint64 } // ReadEntryBundle returns the log entries at the given index. -// If the entry bundle is not found, nil is returned with no error. +// If the entry bundle is not found, it returns os.ErrNotExist. // -// TODO: Handle the following scenarios: -// 1. Full tile request with full tile output: Return full tile. -// 2. Full tile request with partial tile output: Return error. -// 3. Partial tile request with full/larger partial tile output: Return trimmed partial tile with correct tile width. -// 4. Partial tile request with partial tile (same width) output: Return partial tile. -// 5. Partial tile request with smaller partial tile output: Return error. -func (s *Storage) ReadEntryBundle(ctx context.Context, index, treeSize uint64) ([]byte, error) { +// Note that if a partial tile is requested, but a larger tile is available, this +// will return the largest tile available. This could be trimmed to return only the +// number of entries specifically requested if this behaviour becomes problematic. +func (s *Storage) ReadEntryBundle(ctx context.Context, index uint64, p uint8) ([]byte, error) { row := s.db.QueryRowContext(ctx, selectTiledLeavesSQL, index) if err := row.Err(); err != nil { return nil, err } + var size uint32 var entryBundle []byte - if err := row.Scan(&entryBundle); err != nil { + if err := row.Scan(&size, &entryBundle); err != nil { if err == sql.ErrNoRows { - return nil, nil + return nil, os.ErrNotExist } - return nil, fmt.Errorf("scan entry bundle: %v", err) } + requestedSize := uint32(p) + if requestedSize == 0 { + requestedSize = layout.EntryBundleWidth + } + + if requestedSize > size { + return nil, fmt.Errorf("bundle with %d entries requested, but only %d available: %w", requestedSize, size, os.ErrNotExist) + } + return entryBundle, nil } -func (s *Storage) writeEntryBundle(ctx context.Context, tx *sql.Tx, index uint64, entryBundle []byte) error { - if _, err := tx.ExecContext(ctx, replaceTiledLeavesSQL, index, entryBundle); err != nil { +func (s *Storage) writeEntryBundle(ctx context.Context, tx *sql.Tx, index uint64, size uint32, entryBundle []byte) error { + if _, err := tx.ExecContext(ctx, replaceTiledLeavesSQL, index, size, entryBundle); err != nil { klog.Errorf("Failed to execute replaceTiledLeavesSQL: %v", err) return err } @@ -442,10 +460,14 @@ func (s *Storage) integrate(ctx context.Context, tx *sql.Tx, fromSeq uint64, ent return fmt.Errorf("query tiled leaves: %v", err) } + var size uint32 var partialEntryBundle []byte - if err := row.Scan(&partialEntryBundle); err != nil { + if err := row.Scan(&size, &partialEntryBundle); err != nil { return fmt.Errorf("scan partial entry bundle: %w", err) } + if size != uint32(entriesInBundle) { + return fmt.Errorf("expected %d entries in storage but found %d", entriesInBundle, size) + } if _, err := bundleWriter.Write(partialEntryBundle); err != nil { return fmt.Errorf("write partial entry bundle: %w", err) @@ -461,7 +483,7 @@ func (s *Storage) integrate(ctx context.Context, tx *sql.Tx, fromSeq uint64, ent // This bundle is full, so we need to write it out. if entriesInBundle == entryBundleSize { - if err := s.writeEntryBundle(ctx, tx, bundleIndex, bundleWriter.Bytes()); err != nil { + if err := s.writeEntryBundle(ctx, tx, bundleIndex, uint32(entriesInBundle), bundleWriter.Bytes()); err != nil { return fmt.Errorf("writeEntryBundle: %w", err) } @@ -475,7 +497,7 @@ func (s *Storage) integrate(ctx context.Context, tx *sql.Tx, fromSeq uint64, ent // If we have a partial bundle remaining once we've added all the entries from the batch, // this needs writing out too. if entriesInBundle > 0 { - if err := s.writeEntryBundle(ctx, tx, bundleIndex, bundleWriter.Bytes()); err != nil { + if err := s.writeEntryBundle(ctx, tx, bundleIndex, uint32(entriesInBundle), bundleWriter.Bytes()); err != nil { return fmt.Errorf("writeEntryBundle: %w", err) } } diff --git a/storage/mysql/mysql_test.go b/storage/mysql/mysql_test.go index 6d4d98ef..60c0aae4 100644 --- a/storage/mysql/mysql_test.go +++ b/storage/mysql/mysql_test.go @@ -22,8 +22,12 @@ package mysql_test import ( "bytes" "context" + "crypto/sha256" "database/sql" + "errors" "flag" + "fmt" + "io/fs" "os" "testing" "time" @@ -48,8 +52,8 @@ var ( ) const ( - // Matching public key: "transparency.dev/tessera/example+ae330e15+ASf4/L1zE859VqlfQgGzKy34l91Gl8W6wfwp+vKP62DW" testPrivateKey = "PRIVATE+KEY+transparency.dev/tessera/example+ae330e15+AXEwZQ2L6Ga3NX70ITObzyfEIketMr2o9Kc+ed/rt/QR" + testPublicKey = "transparency.dev/tessera/example+ae330e15+ASf4/L1zE859VqlfQgGzKy34l91Gl8W6wfwp+vKP62DW" ) // TestMain checks whether the test MySQL database is available and starts the tests including database schema initialization. @@ -163,26 +167,121 @@ func TestNew(t *testing.T) { } } +func TestGetTile(t *testing.T) { + ctx := context.Background() + s := newTestMySQLStorage(t, ctx) + + awaiter := tessera.NewIntegrationAwaiter(ctx, s.ReadCheckpoint, 10*time.Millisecond) + + treeSize := 258 + + wg := errgroup.Group{} + for i := range treeSize { + wg.Go( + func() error { + _, _, err := awaiter.Await(ctx, s.Add(ctx, tessera.NewEntry([]byte(fmt.Sprintf("TestGetTile %d", i))))) + if err != nil { + return fmt.Errorf("failed to prep test with entry: %v", err) + } + return nil + }) + } + if err := wg.Wait(); err != nil { + t.Fatalf("Failed to set up database with required leaves: %v", err) + } + + for _, test := range []struct { + name string + level, index uint64 + p uint8 + wantEntries int + wantNotFound bool + }{ + { + name: "requested partial tile for a complete tile", + level: 0, index: 0, p: 10, + wantEntries: 256, + wantNotFound: false, + }, + { + name: "too small but that's ok", + level: 0, index: 1, p: layout.PartialTileSize(0, 1, uint64(treeSize-1)), + wantEntries: 2, + wantNotFound: false, + }, + { + name: "just right", + level: 0, index: 1, p: layout.PartialTileSize(0, 1, uint64(treeSize)), + wantEntries: 2, + wantNotFound: false, + }, + { + name: "too big", + level: 0, index: 1, p: layout.PartialTileSize(0, 1, uint64(treeSize+1)), + wantNotFound: true, + }, + { + name: "level 1 too small", + level: 1, index: 0, p: layout.PartialTileSize(1, 0, uint64(treeSize-1)), + wantEntries: 1, + wantNotFound: false, + }, + { + name: "level 1 just right", + level: 1, index: 0, p: layout.PartialTileSize(1, 0, uint64(treeSize)), + wantEntries: 1, + wantNotFound: false, + }, + { + name: "level 1 too big", + level: 1, index: 0, p: layout.PartialTileSize(1, 0, 550), + wantNotFound: true, + }, + } { + t.Run(test.name, func(t *testing.T) { + tile, err := s.ReadTile(ctx, test.level, test.index, test.p) + if err != nil { + if notFound, wantNotFound := errors.Is(err, fs.ErrNotExist), test.wantNotFound; notFound != wantNotFound { + t.Errorf("wantNotFound %v but notFound %v", wantNotFound, notFound) + } + if test.wantNotFound { + return + } + t.Errorf("got err: %v", err) + } + numEntries := len(tile) / sha256.Size + if got, want := numEntries, test.wantEntries; got != want { + t.Errorf("got %d entries, but want %d", got, want) + } + }) + } +} + func TestReadMissingTile(t *testing.T) { ctx := context.Background() s := newTestMySQLStorage(t, ctx) for _, test := range []struct { - name string - level, index, width uint64 + name string + level, index uint64 + p uint8 }{ { name: "0/0/0", - level: 0, index: 0, width: 0, + level: 0, index: 0, p: 0, }, { name: "123/456/789", - level: 123, index: 456, width: 789, + level: 123, index: 456, p: 789 % layout.TileWidth, }, } { t.Run(test.name, func(t *testing.T) { - tile, err := s.ReadTile(ctx, test.level, test.index, test.width) + tile, err := s.ReadTile(ctx, test.level, test.index, test.p) if err != nil { + if errors.Is(err, fs.ErrNotExist) { + // this is success for this test + return + } t.Errorf("got err: %v", err) } if tile != nil { @@ -210,8 +309,12 @@ func TestReadMissingEntryBundle(t *testing.T) { }, } { t.Run(test.name, func(t *testing.T) { - entryBundle, err := s.ReadEntryBundle(ctx, test.index, test.index) + entryBundle, err := s.ReadEntryBundle(ctx, test.index, uint8(test.index%layout.TileWidth)) if err != nil { + if errors.Is(err, fs.ErrNotExist) { + // this is success for this test + return + } t.Errorf("got err: %v", err) } if entryBundle != nil { @@ -286,7 +389,7 @@ func TestTileRoundTrip(t *testing.T) { } tileLevel, tileIndex, _, nodeIndex := layout.NodeCoordsToTileAddress(0, entryIndex) - tileRaw, err := s.ReadTile(ctx, tileLevel, tileIndex, nodeIndex) + tileRaw, err := s.ReadTile(ctx, tileLevel, tileIndex, uint8(nodeIndex+1)) if err != nil { t.Errorf("ReadTile got err: %v", err) } @@ -335,9 +438,9 @@ func TestEntryBundleRoundTrip(t *testing.T) { if err != nil { t.Errorf("Add got err: %v", err) } - entryBundleRaw, err := s.ReadEntryBundle(ctx, entryIndex/256, entryIndex) + entryBundleRaw, err := s.ReadEntryBundle(ctx, entryIndex/256, layout.PartialTileSize(0, entryIndex, entryIndex+1)) if err != nil { - t.Errorf("ReadEntryBundle got err: %v", err) + t.Fatalf("ReadEntryBundle got err: %v", err) } bundle := api.EntryBundle{} @@ -358,10 +461,14 @@ func TestEntryBundleRoundTrip(t *testing.T) { func newTestMySQLStorage(t *testing.T, ctx context.Context) *mysql.Storage { t.Helper() + initDatabaseSchema(ctx) - s, err := mysql.New(ctx, testDB, tessera.WithCheckpointSigner(noteSigner)) + s, err := mysql.New(ctx, testDB, + tessera.WithCheckpointSigner(noteSigner), + tessera.WithCheckpointInterval(time.Second), + tessera.WithBatching(128, 100*time.Millisecond)) if err != nil { - t.Errorf("Failed to create mysql.Storage: %v", err) + t.Fatalf("Failed to create mysql.Storage: %v", err) } return s diff --git a/storage/mysql/schema.sql b/storage/mysql/schema.sql index 955c5422..82c281c9 100644 --- a/storage/mysql/schema.sql +++ b/storage/mysql/schema.sql @@ -19,7 +19,7 @@ -- on an indepentent timeframe to the internal updating of state. CREATE TABLE IF NOT EXISTS `Checkpoint` ( -- id is expected to be always 0 to maintain a maximum of a single row. - `id` INT UNSIGNED NOT NULL, + `id` TINYINT UNSIGNED NOT NULL, -- note is the text signed by one or more keys in the checkpoint format. See https://c2sp.org/tlog-checkpoint and https://c2sp.org/signed-note. `note` MEDIUMBLOB NOT NULL, -- published_at is the millisecond UNIX timestamp of when this row was written. @@ -31,7 +31,7 @@ CREATE TABLE IF NOT EXISTS `Checkpoint` ( -- This is not the same thing as a Checkpoint, which is a signed commitment to such a state. CREATE TABLE IF NOT EXISTS `TreeState` ( -- id is expected to be always 0 to maintain a maximum of a single row. - `id` INT UNSIGNED NOT NULL, + `id` TINYINT UNSIGNED NOT NULL, -- size is the extent of the currently integrated tree. `size` BIGINT UNSIGNED NOT NULL, -- root is the root hash of the tree at the size stored in `size`. @@ -42,7 +42,7 @@ CREATE TABLE IF NOT EXISTS `TreeState` ( -- "Subtree" table is an internal tile consisting of hashes. There is one row for each internal tile, and this is updated until it is completed, at which point it is immutable. CREATE TABLE IF NOT EXISTS `Subtree` ( -- level is the level of the tile. - `level` INT UNSIGNED NOT NULL, + `level` TINYINT UNSIGNED NOT NULL, -- index is the index of the tile. `index` BIGINT UNSIGNED NOT NULL, -- nodes stores the hashes of the leaves. @@ -53,6 +53,8 @@ CREATE TABLE IF NOT EXISTS `Subtree` ( -- "TiledLeaves" table stores the data committed to by the leaves of the tree. Follows the same evolution as Subtree. CREATE TABLE IF NOT EXISTS `TiledLeaves` ( `tile_index` BIGINT UNSIGNED NOT NULL, + -- size is the number of entries serialized into this leaf bundle. + `size` SMALLINT UNSIGNED NOT NULL, `data` LONGBLOB NOT NULL, PRIMARY KEY(`tile_index`) ); diff --git a/storage/posix/files.go b/storage/posix/files.go index 5e78658f..e2981ebb 100644 --- a/storage/posix/files.go +++ b/storage/posix/files.go @@ -39,6 +39,8 @@ const ( dirPerm = 0o755 filePerm = 0o644 stateDir = ".state" + + minCheckpointInterval = time.Second ) // Storage implements storage functions for a POSIX filesystem. @@ -64,6 +66,9 @@ type NewTreeFunc func(size uint64, root []byte) error // - create must only be set when first creating the log, and will create the directory structure and an empty checkpoint func New(ctx context.Context, path string, create bool, opts ...func(*options.StorageOptions)) (*Storage, error) { opt := storage.ResolveStorageOptions(opts...) + if opt.CheckpointInterval < minCheckpointInterval { + return nil, fmt.Errorf("requested CheckpointInterval (%v) is less than minimum permitted %v", opt.CheckpointInterval, minCheckpointInterval) + } r := &Storage{ path: path, @@ -148,12 +153,12 @@ func (s *Storage) ReadCheckpoint(_ context.Context) ([]byte, error) { } // ReadEntryBundle retrieves the Nth entries bundle for a log of the given size. -func (s *Storage) ReadEntryBundle(_ context.Context, index, logSize uint64) ([]byte, error) { - return os.ReadFile(filepath.Join(s.path, s.entriesPath(index, logSize))) +func (s *Storage) ReadEntryBundle(_ context.Context, index uint64, p uint8) ([]byte, error) { + return os.ReadFile(filepath.Join(s.path, s.entriesPath(index, p))) } -func (s *Storage) ReadTile(_ context.Context, level, index, logSize uint64) ([]byte, error) { - return os.ReadFile(filepath.Join(s.path, layout.TilePath(level, index, logSize))) +func (s *Storage) ReadTile(_ context.Context, level, index uint64, p uint8) ([]byte, error) { + return os.ReadFile(filepath.Join(s.path, layout.TilePath(level, index, p))) } // sequenceBatch writes the entries from the provided batch into the entry bundle files of the log. @@ -194,7 +199,7 @@ func (s *Storage) sequenceBatch(ctx context.Context, entries []*tessera.Entry) e bundleIndex, entriesInBundle := seq/uint64(256), seq%uint64(256) if entriesInBundle > 0 { // If the latest bundle is partial, we need to read the data it contains in for our newer, larger, bundle. - part, err := s.ReadEntryBundle(ctx, bundleIndex, s.curSize) + part, err := s.ReadEntryBundle(ctx, bundleIndex, uint8(s.curSize%layout.EntryBundleWidth)) if err != nil { return err } @@ -203,7 +208,7 @@ func (s *Storage) sequenceBatch(ctx context.Context, entries []*tessera.Entry) e } } writeBundle := func(bundleIndex uint64) error { - bf := filepath.Join(s.path, s.entriesPath(bundleIndex, newSize)) + bf := filepath.Join(s.path, s.entriesPath(bundleIndex, uint8(newSize%layout.EntryBundleWidth))) if err := os.MkdirAll(filepath.Dir(bf), dirPerm); err != nil { return fmt.Errorf("failed to make entries directory structure: %w", err) } @@ -287,7 +292,7 @@ func (s *Storage) doIntegrate(ctx context.Context, fromSeq uint64, entries []sto func (s *Storage) readTiles(ctx context.Context, tileIDs []storage.TileID, treeSize uint64) ([]*api.HashTile, error) { r := make([]*api.HashTile, 0, len(tileIDs)) for _, id := range tileIDs { - t, err := s.readTile(ctx, id.Level, id.Index, treeSize) + t, err := s.readTile(ctx, id.Level, id.Index, layout.PartialTileSize(id.Level, id.Index, treeSize)) if err != nil { return nil, err } @@ -299,8 +304,8 @@ func (s *Storage) readTiles(ctx context.Context, tileIDs []storage.TileID, treeS // readTile returns the parsed tile at the given tile-level and tile-index. // If no complete tile exists at that location, it will attempt to find a // partial tile for the given tree size at that location. -func (s *Storage) readTile(ctx context.Context, level, index, logSize uint64) (*api.HashTile, error) { - t, err := s.ReadTile(ctx, level, index, logSize) +func (s *Storage) readTile(ctx context.Context, level, index uint64, p uint8) (*api.HashTile, error) { + t, err := s.ReadTile(ctx, level, index, p) if err != nil { if errors.Is(err, os.ErrNotExist) { // We'll signal to higher levels that it wasn't found by retuning a nil for this tile. @@ -331,7 +336,7 @@ func (s *Storage) storeTile(_ context.Context, level, index, logSize uint64, til return fmt.Errorf("failed to marshal tile: %w", err) } - tPath := filepath.Join(s.path, layout.TilePath(level, index, logSize)) + tPath := filepath.Join(s.path, layout.TilePath(level, index, layout.PartialTileSize(level, index, logSize))) tDir := filepath.Dir(tPath) if err := os.MkdirAll(tDir, dirPerm); err != nil { return fmt.Errorf("failed to create directory %q: %w", tDir, err)