From c57fd57b255699abdf337cc1e1cd2384034ac536 Mon Sep 17 00:00:00 2001 From: Martin Hutchinson Date: Thu, 5 Dec 2024 14:25:00 +0000 Subject: [PATCH 01/17] [MySQL Conformance] fix tree size bug (#382) Major changes: - MySQL storage read methods return os.ErrNotExist when values aren't found - ReadTile returns an error if the user requests more data than we have available - Added tests for writing and reading data from tiles - Made tests hermetic (though slower) by resetting the DB for each test case This got a bit bigger than intended. This fixes #364. --- cmd/conformance/mysql/main.go | 15 +++-- storage/mysql/mysql.go | 54 ++++++++++------- storage/mysql/mysql_test.go | 107 +++++++++++++++++++++++++++++++++- 3 files changed, 146 insertions(+), 30 deletions(-) diff --git a/cmd/conformance/mysql/main.go b/cmd/conformance/mysql/main.go index 7751597b..2fe58d08 100644 --- a/cmd/conformance/mysql/main.go +++ b/cmd/conformance/mysql/main.go @@ -165,17 +165,17 @@ func configureTilesReadAPI(mux *http.ServeMux, storage *mysql.Storage) { } return } - impliedSize := (index*256 + width) << (level * 8) - tile, err := storage.ReadTile(r.Context(), level, index, impliedSize) + inferredMinTreeSize := (index*256 + width) << (level * 8) + tile, err := storage.ReadTile(r.Context(), level, index, inferredMinTreeSize) if err != nil { + if os.IsNotExist(err) { + w.WriteHeader(http.StatusNotFound) + return + } klog.Errorf("/tile/{level}/{index...}: %v", err) w.WriteHeader(http.StatusInternalServerError) return } - if tile == nil { - w.WriteHeader(http.StatusNotFound) - return - } w.Header().Set("Cache-Control", "public, max-age=31536000, immutable") @@ -207,6 +207,9 @@ func configureTilesReadAPI(mux *http.ServeMux, storage *mysql.Storage) { } // TODO: Add immutable Cache-Control header. + // Only do this once we're sure we're returning the right number of entries + // Currently a user can request a full tile and we can return a partial tile. + // If cache headers were set then this could cause caches to be poisoned. if _, err := w.Write(entryBundle); err != nil { klog.Errorf("/tile/entries/{index...}: %v", err) diff --git a/storage/mysql/mysql.go b/storage/mysql/mysql.go index 15e236af..a6d62dd2 100644 --- a/storage/mysql/mysql.go +++ b/storage/mysql/mysql.go @@ -18,9 +18,11 @@ package mysql import ( "bytes" "context" + "crypto/sha256" "database/sql" "errors" "fmt" + "os" "strings" "time" @@ -121,7 +123,7 @@ func (s *Storage) maybeInitTree(ctx context.Context) error { }() treeState, err := s.readTreeState(ctx, tx) - if err != nil { + if err != nil && !os.IsNotExist(err) { klog.Errorf("Failed to read tree state: %v", err) return err } @@ -142,7 +144,7 @@ func (s *Storage) maybeInitTree(ctx context.Context) error { } // ReadCheckpoint returns the latest stored checkpoint. -// If the checkpoint is not found, nil is returned with no error. +// If the checkpoint is not found, it returns os.ErrNotExist. func (s *Storage) ReadCheckpoint(ctx context.Context) ([]byte, error) { row := s.db.QueryRowContext(ctx, selectCheckpointByIDSQL, checkpointID) if err := row.Err(); err != nil { @@ -153,7 +155,7 @@ func (s *Storage) ReadCheckpoint(ctx context.Context) ([]byte, error) { var at int64 if err := row.Scan(&checkpoint, &at); err != nil { if err == sql.ErrNoRows { - return nil, nil + return nil, os.ErrNotExist } return nil, fmt.Errorf("scan checkpoint: %v", err) } @@ -207,7 +209,7 @@ type treeState struct { } // readTreeState returns the currently stored tree state information. -// If there is no stored tree state, nil is returned with no error. +// If there is no stored tree state, it returns os.ErrNotExist. func (s *Storage) readTreeState(ctx context.Context, tx *sql.Tx) (*treeState, error) { row := tx.QueryRowContext(ctx, selectTreeStateByIDForUpdateSQL, treeStateID) if err := row.Err(); err != nil { @@ -217,7 +219,7 @@ func (s *Storage) readTreeState(ctx context.Context, tx *sql.Tx) (*treeState, er r := &treeState{} if err := row.Scan(&r.size, &r.root); err != nil { if err == sql.ErrNoRows { - return nil, nil + return nil, os.ErrNotExist } return nil, fmt.Errorf("scan tree state: %v", err) } @@ -234,16 +236,13 @@ func (s *Storage) writeTreeState(ctx context.Context, tx *sql.Tx, size uint64, r return nil } -// ReadTile returns a full tile or a partial tile at the given level, index and width. -// If the tile is not found, nil is returned with no error. +// ReadTile returns a full tile or a partial tile at the given level, index and treeSize. +// If the tile is not found, it returns os.ErrNotExist. // -// TODO: Handle the following scenarios: -// 1. Full tile request with full tile output: Return full tile. -// 2. Full tile request with partial tile output: Return error. -// 3. Partial tile request with full/larger partial tile output: Return trimmed partial tile with correct tile width. -// 4. Partial tile request with partial tile (same width) output: Return partial tile. -// 5. Partial tile request with smaller partial tile output: Return error. -func (s *Storage) ReadTile(ctx context.Context, level, index, width uint64) ([]byte, error) { +// Note that if a partial tile is requested, but a larger tile is available, this +// will return the largest tile available. This could be trimmed to return only the +// number of entries specifically requested if this behaviour becomes problematic. +func (s *Storage) ReadTile(ctx context.Context, level, index, minTreeSize uint64) ([]byte, error) { row := s.db.QueryRowContext(ctx, selectSubtreeByLevelAndIndexSQL, level, index) if err := row.Err(); err != nil { return nil, err @@ -252,20 +251,34 @@ func (s *Storage) ReadTile(ctx context.Context, level, index, width uint64) ([]b var tile []byte if err := row.Scan(&tile); err != nil { if err == sql.ErrNoRows { - return nil, nil + return nil, os.ErrNotExist } return nil, fmt.Errorf("scan tile: %v", err) } - // Return nil when returning a partial tile on a full tile request. - if width == 256 && uint64(len(tile)/32) != width { - return nil, nil + requestedWidth := partialTileSize(level, index, minTreeSize) + numEntries := uint64(len(tile) / sha256.Size) + + if requestedWidth > numEntries { + // If the user has requested a size larger than we have, they can't have it + return nil, os.ErrNotExist } return tile, nil } +// partialTileSize returns the expected number of leaves in a tile at the given location within +// a tree of the specified logSize, or 0 if the tile is expected to be fully populated. +func partialTileSize(level, index, logSize uint64) uint64 { + sizeAtLevel := logSize >> (level * 8) + fullTiles := sizeAtLevel / 256 + if index < fullTiles { + return 256 + } + return sizeAtLevel % 256 +} + // writeTile replaces the tile nodes at the given level and index. func (s *Storage) writeTile(ctx context.Context, tx *sql.Tx, level, index uint64, nodes []byte) error { if _, err := tx.ExecContext(ctx, replaceSubtreeSQL, level, index, nodes); err != nil { @@ -277,7 +290,7 @@ func (s *Storage) writeTile(ctx context.Context, tx *sql.Tx, level, index uint64 } // ReadEntryBundle returns the log entries at the given index. -// If the entry bundle is not found, nil is returned with no error. +// If the entry bundle is not found, it returns os.ErrNotExist. // // TODO: Handle the following scenarios: // 1. Full tile request with full tile output: Return full tile. @@ -294,9 +307,8 @@ func (s *Storage) ReadEntryBundle(ctx context.Context, index, treeSize uint64) ( var entryBundle []byte if err := row.Scan(&entryBundle); err != nil { if err == sql.ErrNoRows { - return nil, nil + return nil, os.ErrNotExist } - return nil, fmt.Errorf("scan entry bundle: %v", err) } diff --git a/storage/mysql/mysql_test.go b/storage/mysql/mysql_test.go index 6d4d98ef..5989a499 100644 --- a/storage/mysql/mysql_test.go +++ b/storage/mysql/mysql_test.go @@ -22,8 +22,10 @@ package mysql_test import ( "bytes" "context" + "crypto/sha256" "database/sql" "flag" + "fmt" "os" "testing" "time" @@ -48,8 +50,8 @@ var ( ) const ( - // Matching public key: "transparency.dev/tessera/example+ae330e15+ASf4/L1zE859VqlfQgGzKy34l91Gl8W6wfwp+vKP62DW" testPrivateKey = "PRIVATE+KEY+transparency.dev/tessera/example+ae330e15+AXEwZQ2L6Ga3NX70ITObzyfEIketMr2o9Kc+ed/rt/QR" + testPublicKey = "transparency.dev/tessera/example+ae330e15+ASf4/L1zE859VqlfQgGzKy34l91Gl8W6wfwp+vKP62DW" ) // TestMain checks whether the test MySQL database is available and starts the tests including database schema initialization. @@ -163,6 +165,93 @@ func TestNew(t *testing.T) { } } +func TestGetTile(t *testing.T) { + ctx := context.Background() + s := newTestMySQLStorage(t, ctx) + + awaiter := tessera.NewIntegrationAwaiter(ctx, s.ReadCheckpoint, 10*time.Millisecond) + + treeSize := 258 + var lastIndex uint64 + for i := range treeSize { + idx, _, err := awaiter.Await(ctx, s.Add(ctx, tessera.NewEntry([]byte(fmt.Sprintf("TestGetTile %d", i))))) + if err != nil { + t.Fatalf("Failed to prep test with entry: %v", err) + } + if idx > lastIndex { + lastIndex = idx + } + } + if got, want := lastIndex, uint64(treeSize-1); got != want { + t.Fatalf("expected only newly created entries in database; tests are not hermetic (got %d, want %d)", got, want) + } + + for _, test := range []struct { + name string + level, index, treeSize uint64 + wantEntries int + wantNotFound bool + }{ + { + name: "requested partial tile for a complete tile", + level: 0, index: 0, treeSize: 10, + wantEntries: 256, + wantNotFound: false, + }, + { + name: "too small but that's ok", + level: 0, index: 1, treeSize: uint64(treeSize) - 1, + wantEntries: 2, + wantNotFound: false, + }, + { + name: "just right", + level: 0, index: 1, treeSize: uint64(treeSize), + wantEntries: 2, + wantNotFound: false, + }, + { + name: "too big", + level: 0, index: 1, treeSize: uint64(treeSize + 1), + wantNotFound: true, + }, + { + name: "level 1 too small", + level: 1, index: 0, treeSize: uint64(treeSize - 1), + wantEntries: 1, + wantNotFound: false, + }, + { + name: "level 1 just right", + level: 1, index: 0, treeSize: uint64(treeSize), + wantEntries: 1, + wantNotFound: false, + }, + { + name: "level 1 too big", + level: 1, index: 0, treeSize: 550, + wantNotFound: true, + }, + } { + t.Run(test.name, func(t *testing.T) { + tile, err := s.ReadTile(ctx, test.level, test.index, test.treeSize) + if err != nil { + if notFound, wantNotFound := os.IsNotExist(err), test.wantNotFound; notFound != wantNotFound { + t.Errorf("wantNotFound %v but notFound %v", wantNotFound, notFound) + } + if test.wantNotFound { + return + } + t.Errorf("got err: %v", err) + } + numEntries := len(tile) / sha256.Size + if got, want := numEntries, test.wantEntries; got != want { + t.Errorf("got %d entries, but want %d", got, want) + } + }) + } +} + func TestReadMissingTile(t *testing.T) { ctx := context.Background() s := newTestMySQLStorage(t, ctx) @@ -183,6 +272,10 @@ func TestReadMissingTile(t *testing.T) { t.Run(test.name, func(t *testing.T) { tile, err := s.ReadTile(ctx, test.level, test.index, test.width) if err != nil { + if os.IsNotExist(err) { + // this is success for this test + return + } t.Errorf("got err: %v", err) } if tile != nil { @@ -212,6 +305,10 @@ func TestReadMissingEntryBundle(t *testing.T) { t.Run(test.name, func(t *testing.T) { entryBundle, err := s.ReadEntryBundle(ctx, test.index, test.index) if err != nil { + if os.IsNotExist(err) { + // this is success for this test + return + } t.Errorf("got err: %v", err) } if entryBundle != nil { @@ -286,7 +383,7 @@ func TestTileRoundTrip(t *testing.T) { } tileLevel, tileIndex, _, nodeIndex := layout.NodeCoordsToTileAddress(0, entryIndex) - tileRaw, err := s.ReadTile(ctx, tileLevel, tileIndex, nodeIndex) + tileRaw, err := s.ReadTile(ctx, tileLevel, tileIndex, nodeIndex+1) if err != nil { t.Errorf("ReadTile got err: %v", err) } @@ -358,8 +455,12 @@ func TestEntryBundleRoundTrip(t *testing.T) { func newTestMySQLStorage(t *testing.T, ctx context.Context) *mysql.Storage { t.Helper() + initDatabaseSchema(ctx) - s, err := mysql.New(ctx, testDB, tessera.WithCheckpointSigner(noteSigner)) + s, err := mysql.New(ctx, testDB, + tessera.WithCheckpointSigner(noteSigner), + tessera.WithCheckpointInterval(200*time.Millisecond), + tessera.WithBatching(128, 100*time.Millisecond)) if err != nil { t.Errorf("Failed to create mysql.Storage: %v", err) } From b637d9f965c64894c7f874f2923d2446c1eca834 Mon Sep 17 00:00:00 2001 From: Patrick Flynn Date: Thu, 5 Dec 2024 09:27:47 -0500 Subject: [PATCH 02/17] polish gcp getting started doc (#359) * polish gcp getting started doc * fix quote * add extra serious warning on delete * Update deployment/live/gcp/conformance/README.md Co-authored-by: Roger Ng * Update deployment/live/gcp/conformance/README.md Co-authored-by: Roger Ng * Update deployment/live/gcp/conformance/README.md * Update deployment/live/gcp/conformance/README.md * Update deployment/live/gcp/conformance/README.md --------- Co-authored-by: Roger Ng Co-authored-by: Philippe Boneff --- deployment/live/gcp/conformance/README.md | 24 ++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/deployment/live/gcp/conformance/README.md b/deployment/live/gcp/conformance/README.md index afe09aa3..f836e54e 100644 --- a/deployment/live/gcp/conformance/README.md +++ b/deployment/live/gcp/conformance/README.md @@ -21,6 +21,7 @@ need to be manually applied. You'll need the following tools installed: +- [`golang`](https://go.dev/doc/install) - [`docker`](https://docs.docker.com/engine/install/) - [`gcloud`](https://cloud.google.com/sdk/docs/install) - One of: @@ -30,7 +31,12 @@ You'll need the following tools installed: #### Google Cloud tooling -Ensure you've got already created a project you want to use, and have configured your local `gcloud` +> [!CAUTION] +> This example creates real Google Cloud resources running in your project. They will almost certainly +> cost you real money if left running. For the purposes of this demo it is strongly recommended that +> you create a new project so that you can easily clean up at the end. + +Once you've got a Google Cloud project you want to use, have configured your local `gcloud` tool use use it, and authenticated as a principle with sufficient ACLs for the project: ```bash @@ -47,8 +53,8 @@ export GOOGLE_PROJECT=$(gcloud config get project) # This should be a note signer string. # You can use the generate_keys tool to create a new signer & verifier pair: -# go run github.com/transparency-dev/serverless-log/cmd/generate_keys@HEAD --key_name="TestTessera" -export TESSERA_SIGNER={VALUE} +go run github.com/transparency-dev/serverless-log/cmd/generate_keys@HEAD --key_name="TestTessera" --out_priv=tessera.sec --out_pub=tessera.pub +export TESSERA_SIGNER=$(cat tessera.sec) # This is the name of the artifact registry docker repo to create/use. export DOCKER_REPO_NAME=tessera-docker @@ -126,3 +132,15 @@ Finally, apply the config using `terragrunt`: This should create all necessary infrastructure, and spin up a Cloud Run instance with the docker image you created above. + +### Clean up + +> [!IMPORTANT] +> You need to run this step on your project if you want to ensure you don't get charged into perpetuity +> for the resources we've setup. + +**This will delete your project!** +Do not do this on a project that you didn't create expressly and exclusively to run this demo. +```bash +gcloud projects delete ${GOOGLE_PROJECT} +``` From b3345f02231c7ed222b2bbdcd4c43a3b94bbb460 Mon Sep 17 00:00:00 2001 From: Roger Ng Date: Thu, 5 Dec 2024 14:40:00 +0000 Subject: [PATCH 03/17] Replace `32` with `sha256.Size` const (#389) --- api/state_test.go | 3 ++- dedupe_test.go | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/api/state_test.go b/api/state_test.go index 7c3f5c47..5650e424 100644 --- a/api/state_test.go +++ b/api/state_test.go @@ -18,6 +18,7 @@ package api_test import ( "bytes" "crypto/rand" + "crypto/sha256" "fmt" "testing" @@ -44,7 +45,7 @@ func TestHashTile_MarshalTileRoundtrip(t *testing.T) { tile := api.HashTile{Nodes: make([][]byte, 0, test.size)} for i := 0; i < test.size; i++ { // Fill in the leaf index - tile.Nodes = append(tile.Nodes, make([]byte, 32)) + tile.Nodes = append(tile.Nodes, make([]byte, sha256.Size)) if _, err := rand.Read(tile.Nodes[i]); err != nil { t.Error(err) } diff --git a/dedupe_test.go b/dedupe_test.go index bdf12d5a..8bc2aa1d 100644 --- a/dedupe_test.go +++ b/dedupe_test.go @@ -16,6 +16,7 @@ package tessera_test import ( "context" + "crypto/sha256" "fmt" "sync" "testing" @@ -92,7 +93,7 @@ func BenchmarkDedupe(b *testing.B) { for leafIndex := range 1024 { wg.Add(1) go func(index int) { - _, err := dedupeAdd(ctx, tessera.NewEntry([]byte(fmt.Sprintf("leaf with value %d", index%32))))() + _, err := dedupeAdd(ctx, tessera.NewEntry([]byte(fmt.Sprintf("leaf with value %d", index%sha256.Size))))() if err != nil { b.Error(err) } From f3f351819614791f0164dc4b2635c47e76482c2f Mon Sep 17 00:00:00 2001 From: Al Cutter Date: Thu, 5 Dec 2024 14:59:35 +0000 Subject: [PATCH 04/17] Tweak wording in getting started (#377) --- README.md | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 346d739f..f54ad0db 100644 --- a/README.md +++ b/README.md @@ -127,13 +127,18 @@ When writing your Tessera personality, the biggest decision you need to make fir * [POSIX](./storage/posix/) Each of these implementations has a very similar API, but they have different characteristics. + The easiest implementations to operate and to scale are the cloud implementations: GCP and AWS. -These are the recommended choice for the majority of users. +These are the recommended choice for the majority of users running in production. + +If you aren't using a cloud provider, then your options are MySQL and POSIX: +- POSIX is the simplest to get started with as it needs little in the way of extra infrastructure, and + if you already serve static files as part of your business/project this could be a good fit. +- Alternatively, if you are used to operating user-facing applications backed by a RDBMS, then MySQL could + be a natural fit. -If you aren't using a cloud provider, then your options are MySQL and POSIX. -POSIX is the more niche choice, intended to be lightweight and for logs that are infrequently updated. -If you already serve static files as part of your business this could be a good fit. -If you are more used to operating user-facing applications backed by a RDBMS, then MySQL will be a natural fit. +To get a sense of the rough performance you can expect from the different backends, take a look at +[docs/performance.md](/docs/performance). #### Setup From 6883819f1e5585da4e56e8af3c672dcdd4652a50 Mon Sep 17 00:00:00 2001 From: Roger Ng Date: Thu, 5 Dec 2024 15:16:53 +0000 Subject: [PATCH 05/17] Fix broken link in README.md (#390) --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index f54ad0db..0e68d746 100644 --- a/README.md +++ b/README.md @@ -138,7 +138,7 @@ If you aren't using a cloud provider, then your options are MySQL and POSIX: be a natural fit. To get a sense of the rough performance you can expect from the different backends, take a look at -[docs/performance.md](/docs/performance). +[docs/performance.md](/docs/performance.md). #### Setup From 4b60f5993ce6e5ce5c955773dd58c97a5d1ed84a Mon Sep 17 00:00:00 2001 From: Al Cutter Date: Thu, 5 Dec 2024 17:09:37 +0000 Subject: [PATCH 06/17] Update storage design docs to include the publish step (#391) --- storage/aws/README.md | 36 +++++++++++++++--------------------- storage/gcp/README.md | 7 ++----- storage/mysql/DESIGN.md | 13 +++++++++---- 3 files changed, 26 insertions(+), 30 deletions(-) diff --git a/storage/aws/README.md b/storage/aws/README.md index 19b4aab6..838eff14 100644 --- a/storage/aws/README.md +++ b/storage/aws/README.md @@ -29,34 +29,28 @@ A table with a single row which is used to keep track of the next assignable seq This holds batches of entries keyed by the sequence number assigned to the first entry in the batch. ### `IntCoord` -TODO: add the new checkpoint updater logic, and update the docstring in aws.go. - -This table is used to coordinate integration of sequenced batches in the `Seq` table. +This table is used to coordinate integration of sequenced batches in the `Seq` table, and keep track of the current tree state. ## Life of a leaf -TODO: add the new checkpoint updater logic. - 1. Leaves are submitted by the binary built using Tessera via a call the storage's `Add` func. -2. [Not implemented yet - Dupe squashing: look for existing `` object, read assigned sequence number if present and return.] -3. The storage library batches these entries up, and, after a configurable period of time has elapsed +1. The storage library batches these entries up, and, after a configurable period of time has elapsed or the batch reaches a configurable size threshold, the batch is written to the `Seq` table which effectively assigns a sequence numbers to the entries using the following algorithm: In a transaction: 1. selects next from `SeqCoord` with for update ← this blocks other FE from writing their pools, but only for a short duration. - 2. Inserts batch of entries into `Seq` with key `SeqCoord.next` - 3. Update `SeqCoord` with `next+=len(batch)` -4. Integrators periodically integrate new sequenced entries into the tree: + 1. Inserts batch of entries into `Seq` with key `SeqCoord.next` + 1. Update `SeqCoord` with `next+=len(batch)` +1. Integrators periodically integrate new sequenced entries into the tree: In a transaction: 1. select `seq` from `IntCoord` with for update ← this blocks other integrators from proceeding. - 2. Select one or more consecutive batches from `Seq` for update, starting at `IntCoord.seq` - 3. Write leaf bundles to S3 using batched entries - 4. Integrate in Merkle tree and write tiles to S3 - 5. Update checkpoint in S3 - 6. Delete consumed batches from `Seq` - 7. Update `IntCoord` with `seq+=num_entries_integrated` - 8. [Not implemented yet - Dupe detection: - 1. Writes out `` containing the leaf's sequence number] + 1. Select one or more consecutive batches from `Seq` for update, starting at `IntCoord.seq` + 1. Write leaf bundles to S3 using batched entries + 1. Integrate in Merkle tree and write tiles to S3 + 1. Update checkpoint in S3 + 1. Delete consumed batches from `Seq` + 1. Update `IntCoord` with `seq+=num_entries_integrated` and the latest `rootHash` +1. Checkpoints representing the latest state of the tree are published at the configured interval. ## Dedup @@ -75,12 +69,12 @@ operational overhead, code complexity, and so was selected. The alpha implementation was tested with entries of size 1KB each, at a write rate of 1500/s. This was done using the smallest possible Aurora instance -availalbe, `db.r5.large`, running `8.0.mysql_aurora.3.05.2`. +available, `db.r5.large`, running `8.0.mysql_aurora.3.05.2`. Aurora (Serverless v2) worked out well, but seems less cost effective than -provisionned Aurora for sustained traffic. For now, we decided not to explore this option further. +provisioned Aurora for sustained traffic. For now, we decided not to explore this option further. -RDS (MySQL) worked out well, but requires more admistrative overhead than +RDS (MySQL) worked out well, but requires more administrative overhead than Aurora. For now, we decided not to explore this option further. DynamoDB worked out to be less cost efficient than Aurora and RDS. It also has diff --git a/storage/gcp/README.md b/storage/gcp/README.md index bee47c1f..07fdb002 100644 --- a/storage/gcp/README.md +++ b/storage/gcp/README.md @@ -34,7 +34,6 @@ This table is used to coordinate integration of sequenced batches in the `Seq` t ## Life of a leaf 1. Leaves are submitted by the binary built using Tessera via a call the storage's `Add` func. -1. Dupe squashing (TODO): look for existing `` object, read assigned sequence number if present and return. 1. The storage library batches these entries up, and, after a configurable period of time has elapsed or the batch reaches a configurable size threshold, the batch is written to the `Seq` table which effectively assigns a sequence numbers to the entries using the following algorithm: @@ -48,11 +47,9 @@ This table is used to coordinate integration of sequenced batches in the `Seq` t 1. Select one or more consecutive batches from `Seq` for update, starting at `IntCoord.seq` 1. Write leaf bundles to GCS using batched entries 1. Integrate in Merkle tree and write tiles to GCS - 1. Update checkpoint in GCS 1. Delete consumed batches from `Seq` - 1. Update `IntCoord` with `seq+=num_entries_integrated` - 1. Dupe detection (TODO): - 1. Writes out `` containing the leaf's sequence number + 1. Update `IntCoord` with `seq+=num_entries_integrated` and the latest `rootHash` +1. Checkpoints representing the latest state of the tree are published at the configured interval. ## Dedup diff --git a/storage/mysql/DESIGN.md b/storage/mysql/DESIGN.md index 60feeff7..6f67e203 100644 --- a/storage/mysql/DESIGN.md +++ b/storage/mysql/DESIGN.md @@ -17,7 +17,11 @@ The DB layout has been designed such that serving any read request is a point lo #### `Checkpoint` -A single row that records the current state of the log. Updated after every sequence + integration. +A single row that records the current published checkpoint. + +#### `TreeState` + +A single row that records the current state of the tree. Updated after every integration. #### `Subtree` @@ -51,12 +55,13 @@ Sequence pool: Sequence & integrate (DB integration starts here): 1. Takes a batch of entries to sequence and integrate -1. Starts a transaction, which first takes a write lock on the checkpoint row to ensure that: +1. Starts a transaction, which first takes a write lock on the `TreeState` row to ensure that: 1. No other processes will be competing with this work. - 1. That the next index to sequence is known (this is the same as the current checkpoint size) + 1. That the next index to sequence is known (this is the same as the current tree size) 1. Update the required TiledLeaves rows -1. Perform an integration operation to update the Merkle tree, updating/adding Subtree rows as needed, and eventually updating the Checkpoint row +1. Perform an integration operation to update the Merkle tree, updating/adding Subtree rows as needed, and eventually updating the `TreeState` row 1. Commit the transaction +1. Checkpoints representing the latest state of the tree are published at the configured interval. ## Costs From c6816de85e7e66cc69577e7b23fdc48723f793aa Mon Sep 17 00:00:00 2001 From: Martin Hutchinson Date: Thu, 5 Dec 2024 17:56:54 +0000 Subject: [PATCH 07/17] [MySQL] make the tests go faster (#392) Awaiting in the loop was causing this to take ages as it was doing one leaf at a time. They now happen in parallel, and we make sure they're all done before we allow the read parts of the test to start. --- storage/mysql/mysql_test.go | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/storage/mysql/mysql_test.go b/storage/mysql/mysql_test.go index 5989a499..d84d733e 100644 --- a/storage/mysql/mysql_test.go +++ b/storage/mysql/mysql_test.go @@ -172,18 +172,20 @@ func TestGetTile(t *testing.T) { awaiter := tessera.NewIntegrationAwaiter(ctx, s.ReadCheckpoint, 10*time.Millisecond) treeSize := 258 - var lastIndex uint64 + + wg := errgroup.Group{} for i := range treeSize { - idx, _, err := awaiter.Await(ctx, s.Add(ctx, tessera.NewEntry([]byte(fmt.Sprintf("TestGetTile %d", i))))) - if err != nil { - t.Fatalf("Failed to prep test with entry: %v", err) - } - if idx > lastIndex { - lastIndex = idx - } + wg.Go( + func() error { + _, _, err := awaiter.Await(ctx, s.Add(ctx, tessera.NewEntry([]byte(fmt.Sprintf("TestGetTile %d", i))))) + if err != nil { + return fmt.Errorf("failed to prep test with entry: %v", err) + } + return nil + }) } - if got, want := lastIndex, uint64(treeSize-1); got != want { - t.Fatalf("expected only newly created entries in database; tests are not hermetic (got %d, want %d)", got, want) + if err := wg.Wait(); err != nil { + t.Fatalf("Failed to set up database with required leaves: %v", err) } for _, test := range []struct { From afd61eaccb2d24feb56cd657e5375829a3437568 Mon Sep 17 00:00:00 2001 From: Al Cutter Date: Thu, 5 Dec 2024 18:05:50 +0000 Subject: [PATCH 08/17] [GCP] Dedup storage experiment (#363) --- cmd/conformance/gcp/main.go | 14 ++- dedupe.go | 31 ++++--- dedupe_test.go | 10 ++- storage/gcp/gcp.go | 169 ++++++++++++++++++++++++++++++++++++ 4 files changed, 208 insertions(+), 16 deletions(-) diff --git a/cmd/conformance/gcp/main.go b/cmd/conformance/gcp/main.go index 9bd9d9be..e34b08ed 100644 --- a/cmd/conformance/gcp/main.go +++ b/cmd/conformance/gcp/main.go @@ -37,6 +37,7 @@ var ( listen = flag.String("listen", ":2024", "Address:port to listen on") spanner = flag.String("spanner", "", "Spanner resource URI ('projects/.../...')") signer = flag.String("signer", "", "Note signer to use to sign checkpoints") + persistentDedup = flag.Bool("gcp_dedup", false, "EXPERIMENTAL: Set to true to enable persistent dedupe storage") additionalSigners = []string{} ) @@ -65,7 +66,18 @@ func main() { if err != nil { klog.Exitf("Failed to create new GCP storage: %v", err) } - dedupeAdd := tessera.InMemoryDedupe(storage.Add, 256) + + // Handle dedup configuration + addDelegate := storage.Add + + // PersistentDedup is currently experimental, so there's no terraform or documentation yet! + if *persistentDedup { + addDelegate, err = gcp.NewDedupe(ctx, fmt.Sprintf("%s_dedup", *spanner), addDelegate) + if err != nil { + klog.Exitf("Failed to create new GCP dedupe: %v", err) + } + } + dedupeAdd := tessera.InMemoryDedupe(addDelegate, 256) // Expose a HTTP handler for the conformance test writes. // This should accept arbitrary bytes POSTed to /add, and return an ascii diff --git a/dedupe.go b/dedupe.go index 23b9b2fa..20a31917 100644 --- a/dedupe.go +++ b/dedupe.go @@ -16,9 +16,10 @@ package tessera import ( "context" + "fmt" "sync" - "github.com/hashicorp/golang-lru/v2/expirable" + lru "github.com/hashicorp/golang-lru/v2" ) // InMemoryDedupe wraps an Add function to prevent duplicate entries being written to the underlying @@ -35,30 +36,38 @@ import ( // InMemoryDedupe. This allows recent duplicates to be deduplicated in memory, reducing the need to // make calls to a persistent storage. func InMemoryDedupe(delegate func(ctx context.Context, e *Entry) IndexFuture, size uint) func(context.Context, *Entry) IndexFuture { + c, err := lru.New[string, func() IndexFuture](int(size)) + if err != nil { + panic(fmt.Errorf("lru.New(%d): %v", size, err)) + } dedupe := &inMemoryDedupe{ delegate: delegate, - cache: expirable.NewLRU[string, IndexFuture](int(size), nil, 0), + cache: c, } return dedupe.add } type inMemoryDedupe struct { delegate func(ctx context.Context, e *Entry) IndexFuture - mu sync.Mutex // cache is thread safe, but this mutex allows us to do conditional writes - cache *expirable.LRU[string, IndexFuture] + cache *lru.Cache[string, func() IndexFuture] } // Add adds the entry to the underlying delegate only if e hasn't been recently seen. In either case, // an IndexFuture will be returned that the client can use to get the sequence number of this entry. func (d *inMemoryDedupe) add(ctx context.Context, e *Entry) IndexFuture { id := string(e.Identity()) - d.mu.Lock() - defer d.mu.Unlock() - f, ok := d.cache.Get(id) - if !ok { - f = d.delegate(ctx, e) - d.cache.Add(id, f) + // However many calls with the same entry come in and are deduped, we should only call delegate + // once for each unique entry: + f := sync.OnceValue(func() IndexFuture { + return d.delegate(ctx, e) + }) + + // if we've seen this entry before, discard our f and replace + // with the one we created last time, otherwise store f against id. + if prev, ok, _ := d.cache.PeekOrAdd(id, f); ok { + f = prev } - return f + + return f() } diff --git a/dedupe_test.go b/dedupe_test.go index 8bc2aa1d..c29b0851 100644 --- a/dedupe_test.go +++ b/dedupe_test.go @@ -60,13 +60,15 @@ func TestDedupe(t *testing.T) { dedupeAdd := tessera.InMemoryDedupe(delegate, 256) // Add foo, bar, baz to prime the cache to make things interesting - dedupeAdd(ctx, tessera.NewEntry([]byte("foo"))) - dedupeAdd(ctx, tessera.NewEntry([]byte("bar"))) - dedupeAdd(ctx, tessera.NewEntry([]byte("baz"))) + for _, s := range []string{"foo", "bar", "baz"} { + if _, err := dedupeAdd(ctx, tessera.NewEntry([]byte(s)))(); err != nil { + t.Fatalf("dedupeAdd(%q): %v", s, err) + } + } idx, err := dedupeAdd(ctx, tessera.NewEntry([]byte(tC.newValue)))() if err != nil { - t.Fatal(err) + t.Fatalf("dedupeAdd(%q): %v", tC.newValue, err) } if idx != tC.wantIdx { t.Errorf("got != want (%d != %d)", idx, tC.wantIdx) diff --git a/storage/gcp/gcp.go b/storage/gcp/gcp.go index 6b5fce2f..aa4a374a 100644 --- a/storage/gcp/gcp.go +++ b/storage/gcp/gcp.go @@ -37,11 +37,13 @@ import ( "io" "net/http" "os" + "sync/atomic" "time" "cloud.google.com/go/spanner" "cloud.google.com/go/spanner/apiv1/spannerpb" gcs "cloud.google.com/go/storage" + "github.com/globocom/go-buffer" "github.com/google/go-cmp/cmp" "github.com/transparency-dev/merkle/rfc6962" tessera "github.com/transparency-dev/trillian-tessera" @@ -783,3 +785,170 @@ func (s *gcsStorage) lastModified(ctx context.Context, obj string) (time.Time, e } return r.Attrs.LastModified, r.Close() } + +// NewDedupe returns wrapped Add func which will use Spanner to maintain a mapping of +// previously seen entries and their assigned indices. Future calls with the same entry +// will return the previously assigned index, as yet unseen entries will be passed to the provided +// delegate function to have an index assigned. +// +// For performance reasons, the ID -> index associations returned by the delegate are buffered before +// being flushed to Spanner. This can result in duplicates occuring in some circumstances, but in +// general this should not be a problem. +// +// Note that the storage for this mapping is entirely separate and unconnected to the storage used for +// maintaining the Merkle tree. +// +// This functionality is experimental! +func NewDedupe(ctx context.Context, spannerDB string, delegate func(ctx context.Context, e *tessera.Entry) tessera.IndexFuture) (func(ctx context.Context, e *tessera.Entry) tessera.IndexFuture, error) { + /* + Schema for reference: + + CREATE TABLE IDSeq ( + id INT64 NOT NULL, + h BYTES(MAX) NOT NULL, + idx INT64 NOT NULL, + ) PRIMARY KEY (id, h); + */ + dedupDB, err := spanner.NewClient(ctx, spannerDB) + if err != nil { + return nil, fmt.Errorf("failed to connect to Spanner: %v", err) + } + + r := &dedupStorage{ + ctx: ctx, + dbPool: dedupDB, + delegate: delegate, + } + + // TODO(al): Make these configurable + r.buf = buffer.New( + buffer.WithSize(64), + buffer.WithFlushInterval(200*time.Millisecond), + buffer.WithFlusher(buffer.FlusherFunc(r.flush)), + buffer.WithPushTimeout(15*time.Second), + ) + go func(ctx context.Context) { + t := time.NewTicker(time.Second) + for { + select { + case <-ctx.Done(): + return + case <-t.C: + klog.V(1).Infof("DEDUP: # Writes %d, # Lookups %d, # DB hits %v, # buffer Push discards %d", r.numWrites.Load(), r.numLookups.Load(), r.numDBDedups.Load(), r.numPushErrs.Load()) + } + } + }(ctx) + return r.add, nil +} + +type dedupStorage struct { + ctx context.Context + dbPool *spanner.Client + delegate func(ctx context.Context, e *tessera.Entry) tessera.IndexFuture + + numLookups atomic.Uint64 + numWrites atomic.Uint64 + numDBDedups atomic.Uint64 + numPushErrs atomic.Uint64 + + buf *buffer.Buffer +} + +// index returns the index (if any) previously associated with the provided hash +func (d *dedupStorage) index(ctx context.Context, h []byte) (*uint64, error) { + d.numLookups.Add(1) + var idx int64 + if row, err := d.dbPool.Single().ReadRow(ctx, "IDSeq", spanner.Key{0, h}, []string{"idx"}); err != nil { + if c := spanner.ErrCode(err); c == codes.NotFound { + return nil, nil + } + return nil, err + } else { + if err := row.Column(0, &idx); err != nil { + return nil, fmt.Errorf("failed to read dedup index: %v", err) + } + idx := uint64(idx) + d.numDBDedups.Add(1) + return &idx, nil + } +} + +// storeMappings stores the associations between the keys and IDs in a non-atomic fashion +// (i.e. it does not store all or none in a transactional sense). +// +// Returns an error if one or more mappings cannot be stored. +func (d *dedupStorage) storeMappings(ctx context.Context, entries []dedupeMapping) error { + m := make([]*spanner.MutationGroup, 0, len(entries)) + for _, e := range entries { + m = append(m, &spanner.MutationGroup{ + Mutations: []*spanner.Mutation{spanner.Insert("IDSeq", []string{"id", "h", "idx"}, []interface{}{0, e.ID, int64(e.Idx)})}, + }) + } + + i := d.dbPool.BatchWrite(ctx, m) + return i.Do(func(r *spannerpb.BatchWriteResponse) error { + s := r.GetStatus() + if c := codes.Code(s.Code); c != codes.OK && c != codes.AlreadyExists { + return fmt.Errorf("failed to write dedup record: %v (%v)", s.GetMessage(), c) + } + return nil + }) +} + +// dedupeMapping represents an ID -> index mapping. +type dedupeMapping struct { + ID []byte + Idx uint64 +} + +// add adds the entry to the underlying delegate only if e isn't already known. In either case, +// an IndexFuture will be returned that the client can use to get the sequence number of this entry. +func (d *dedupStorage) add(ctx context.Context, e *tessera.Entry) tessera.IndexFuture { + idx, err := d.index(ctx, e.Identity()) + if err != nil { + return func() (uint64, error) { return 0, err } + } + if idx != nil { + return func() (uint64, error) { return *idx, nil } + } + + i, err := d.delegate(ctx, e)() + if err != nil { + return func() (uint64, error) { return 0, err } + } + + err = d.enqueueMapping(ctx, e.Identity(), i) + return func() (uint64, error) { + return i, err + } +} + +// enqueueMapping buffers the provided ID -> index mapping ready to be flushed to storage. +func (d *dedupStorage) enqueueMapping(_ context.Context, h []byte, idx uint64) error { + err := d.buf.Push(dedupeMapping{ID: h, Idx: idx}) + if err != nil { + d.numPushErrs.Add(1) + // This means there's pressure flushing dedup writes out, so discard this write. + if err != buffer.ErrTimeout { + return err + } + } + return nil +} + +// flush writes enqueued mappings to storage. +func (d *dedupStorage) flush(items []interface{}) { + entries := make([]dedupeMapping, len(items)) + for i := range items { + entries[i] = items[i].(dedupeMapping) + } + + ctx, c := context.WithTimeout(d.ctx, 15*time.Second) + defer c() + + if err := d.storeMappings(ctx, entries); err != nil { + klog.Infof("Failed to flush dedup entries: %v", err) + return + } + d.numWrites.Add(uint64(len(entries))) +} From 61061560bcf255ebc9b14283d8a357f61f3d418a Mon Sep 17 00:00:00 2001 From: Roger Ng Date: Mon, 9 Dec 2024 09:52:49 +0000 Subject: [PATCH 09/17] Replace `os.IsNotExist(err)` with `errors.Is(err, fs.ErrNotExist)` (#396) --- cmd/conformance/mysql/main.go | 4 +++- storage/mysql/mysql.go | 3 ++- storage/mysql/mysql_test.go | 8 +++++--- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/cmd/conformance/mysql/main.go b/cmd/conformance/mysql/main.go index 2fe58d08..d8f7a4c6 100644 --- a/cmd/conformance/mysql/main.go +++ b/cmd/conformance/mysql/main.go @@ -18,9 +18,11 @@ package main import ( "context" "database/sql" + "errors" "flag" "fmt" "io" + "io/fs" "net/http" "os" "time" @@ -168,7 +170,7 @@ func configureTilesReadAPI(mux *http.ServeMux, storage *mysql.Storage) { inferredMinTreeSize := (index*256 + width) << (level * 8) tile, err := storage.ReadTile(r.Context(), level, index, inferredMinTreeSize) if err != nil { - if os.IsNotExist(err) { + if errors.Is(err, fs.ErrNotExist) { w.WriteHeader(http.StatusNotFound) return } diff --git a/storage/mysql/mysql.go b/storage/mysql/mysql.go index a6d62dd2..8ac67862 100644 --- a/storage/mysql/mysql.go +++ b/storage/mysql/mysql.go @@ -22,6 +22,7 @@ import ( "database/sql" "errors" "fmt" + "io/fs" "os" "strings" "time" @@ -123,7 +124,7 @@ func (s *Storage) maybeInitTree(ctx context.Context) error { }() treeState, err := s.readTreeState(ctx, tx) - if err != nil && !os.IsNotExist(err) { + if err != nil && !errors.Is(err, fs.ErrNotExist) { klog.Errorf("Failed to read tree state: %v", err) return err } diff --git a/storage/mysql/mysql_test.go b/storage/mysql/mysql_test.go index d84d733e..a9da561c 100644 --- a/storage/mysql/mysql_test.go +++ b/storage/mysql/mysql_test.go @@ -24,8 +24,10 @@ import ( "context" "crypto/sha256" "database/sql" + "errors" "flag" "fmt" + "io/fs" "os" "testing" "time" @@ -238,7 +240,7 @@ func TestGetTile(t *testing.T) { t.Run(test.name, func(t *testing.T) { tile, err := s.ReadTile(ctx, test.level, test.index, test.treeSize) if err != nil { - if notFound, wantNotFound := os.IsNotExist(err), test.wantNotFound; notFound != wantNotFound { + if notFound, wantNotFound := errors.Is(err, fs.ErrNotExist), test.wantNotFound; notFound != wantNotFound { t.Errorf("wantNotFound %v but notFound %v", wantNotFound, notFound) } if test.wantNotFound { @@ -274,7 +276,7 @@ func TestReadMissingTile(t *testing.T) { t.Run(test.name, func(t *testing.T) { tile, err := s.ReadTile(ctx, test.level, test.index, test.width) if err != nil { - if os.IsNotExist(err) { + if errors.Is(err, fs.ErrNotExist) { // this is success for this test return } @@ -307,7 +309,7 @@ func TestReadMissingEntryBundle(t *testing.T) { t.Run(test.name, func(t *testing.T) { entryBundle, err := s.ReadEntryBundle(ctx, test.index, test.index) if err != nil { - if os.IsNotExist(err) { + if errors.Is(err, fs.ErrNotExist) { // this is success for this test return } From 7adba0d0324c15f804f3ce2b68dff5621417927b Mon Sep 17 00:00:00 2001 From: Martin Hutchinson Date: Mon, 9 Dec 2024 11:40:51 +0000 Subject: [PATCH 10/17] Added and updated benchmarks (#397) Parsing entry bundles needs to be fast, so locked that in with a benchmark. While looking at other benchmarks, I noticed integrate wasn't using b.N, so wired that into the test instead of constant we already had to control the number of loops. --- api/state_test.go | 17 +++++++++++++++++ storage/internal/integrate_test.go | 3 +-- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/api/state_test.go b/api/state_test.go index 5650e424..e207da64 100644 --- a/api/state_test.go +++ b/api/state_test.go @@ -20,6 +20,7 @@ import ( "crypto/rand" "crypto/sha256" "fmt" + "strings" "testing" "github.com/google/go-cmp/cmp" @@ -144,3 +145,19 @@ func TestLeafBundle_UnmarshalText(t *testing.T) { }) } } + +func BenchmarkLeafBundle_UnmarshalText(b *testing.B) { + bs := bytes.Buffer{} + for i := range 222 { + // Create leaves of different lengths for interest in the parsing / memory allocation + leafStr := strings.Repeat(fmt.Sprintf("Leaf %d", i), i%20) + _, _ = bs.Write(tessera.NewEntry([]byte(leafStr)).MarshalBundleData(uint64(i))) + } + rawBundle := bs.Bytes() + for i := 0; i < b.N; i++ { + tile := api.EntryBundle{} + if err := tile.UnmarshalText(rawBundle); err != nil { + b.Fatal(err) + } + } +} diff --git a/storage/internal/integrate_test.go b/storage/internal/integrate_test.go index 6a12aafd..d29ae01b 100644 --- a/storage/internal/integrate_test.go +++ b/storage/internal/integrate_test.go @@ -170,9 +170,8 @@ func BenchmarkIntegrate(b *testing.B) { m := newMemTileStore[api.HashTile]() chunkSize := 200 - numChunks := 256 seq := uint64(0) - for chunk := 0; chunk < numChunks; chunk++ { + for chunk := 0; chunk < b.N; chunk++ { oldSeq := seq c := make([]SequencedEntry, chunkSize) for i := range c { From ea9eac7f91a85bce1c52998f094848b3967ed43a Mon Sep 17 00:00:00 2001 From: Al Cutter Date: Mon, 9 Dec 2024 12:11:40 +0000 Subject: [PATCH 11/17] Storage API uses partial width rather than logSize (#393) --- api/layout/example_test.go | 8 +- api/layout/paths.go | 34 ++++---- api/layout/paths_test.go | 109 ++++++------------------- api/layout/tile.go | 27 ++++-- api/state.go | 4 +- client/client.go | 8 +- client/client_test.go | 6 +- client/fetcher.go | 16 ++-- cmd/conformance/mysql/main.go | 9 +- ct_only.go | 4 +- ct_only_test.go | 13 +-- integration/storage_uniformity_test.go | 4 +- internal/hammer/client.go | 12 +-- internal/options/options.go | 2 +- storage/aws/aws.go | 39 +++++---- storage/aws/aws_test.go | 12 +-- storage/gcp/gcp.go | 42 +++++----- storage/gcp/gcp_test.go | 8 +- storage/internal/integrate.go | 4 +- storage/internal/integrate_test.go | 6 +- storage/mysql/mysql.go | 23 ++---- storage/mysql/mysql_test.go | 42 +++++----- storage/posix/files.go | 20 ++--- 23 files changed, 197 insertions(+), 255 deletions(-) diff --git a/api/layout/example_test.go b/api/layout/example_test.go index 87a90d49..db824a6e 100644 --- a/api/layout/example_test.go +++ b/api/layout/example_test.go @@ -28,19 +28,19 @@ func ExampleNodeCoordsToTileAddress() { } func ExampleTilePath() { - tilePath := layout.TilePath(0, 1234067, 315921160) + tilePath := layout.TilePath(0, 1234067, 8) fmt.Printf("tile path: %s", tilePath) // Output: tile path: tile/0/x001/x234/067.p/8 } func ExampleEntriesPath() { - entriesPath := layout.EntriesPath(1234067, 315921160) + entriesPath := layout.EntriesPath(1234067, 8) fmt.Printf("entries path: %s", entriesPath) // Output: entries path: tile/entries/x001/x234/067.p/8 } -func ExampleParseTileLevelIndexWidth() { - level, index, width, _ := layout.ParseTileLevelIndexWidth("0", "x001/x234/067.p/8") +func ExampleParseTileLevelIndexPartial() { + level, index, width, _ := layout.ParseTileLevelIndexPartial("0", "x001/x234/067.p/8") fmt.Printf("level: %d, index: %d, width: %d", level, index, width) // Output: level: 0, index: 1234067, width: 8 } diff --git a/api/layout/paths.go b/api/layout/paths.go index e8b4e5dc..07d2aa2b 100644 --- a/api/layout/paths.go +++ b/api/layout/paths.go @@ -37,13 +37,13 @@ const ( // The logSize is required so that a partial qualifier can be appended to tiles that // would contain fewer than 256 entries. func EntriesPathForLogIndex(seq, logSize uint64) string { - return EntriesPath(seq/256, logSize) + return EntriesPath(seq/EntryBundleWidth, PartialTileSize(0, seq, logSize)) } -// NWithSuffix returns a tiles-spec "N" path, with a partial suffix if applicable. -func NWithSuffix(l, n, logSize uint64) string { +// NWithSuffix returns a tiles-spec "N" path, with a partial suffix if p > 0. +func NWithSuffix(l, n uint64, p uint8) string { suffix := "" - if p := partialTileSize(l, n, logSize); p > 0 { + if p > 0 { suffix = fmt.Sprintf(".p/%d", p) } return fmt.Sprintf("%s%s", fmtN(n), suffix) @@ -51,13 +51,14 @@ func NWithSuffix(l, n, logSize uint64) string { // EntriesPath returns the local path for the nth entry bundle. p denotes the partial // tile size, or 0 if the tile is complete. -func EntriesPath(n, logSize uint64) string { - return fmt.Sprintf("tile/entries/%s", NWithSuffix(0, n, logSize)) +func EntriesPath(n uint64, p uint8) string { + return fmt.Sprintf("tile/entries/%s", NWithSuffix(0, n, p)) } // TilePath builds the path to the subtree tile with the given level and index in tile space. -func TilePath(tileLevel, tileIndex, logSize uint64) string { - return fmt.Sprintf("tile/%d/%s", tileLevel, NWithSuffix(tileLevel, tileIndex, logSize)) +// If p > 0 the path represents a partial tile. +func TilePath(tileLevel, tileIndex uint64, p uint8) string { + return fmt.Sprintf("tile/%d/%s", tileLevel, NWithSuffix(tileLevel, tileIndex, p)) } // fmtN returns the "N" part of a Tiles-spec path. @@ -83,13 +84,13 @@ func fmtN(N uint64) string { // Examples: // "/tile/0/x001/x234/067" means level 0 and index 1234067 of a full tile. // "/tile/0/x001/x234/067.p/8" means level 0, index 1234067 and width 8 of a partial tile. -func ParseTileLevelIndexWidth(level, index string) (uint64, uint64, uint64, error) { +func ParseTileLevelIndexPartial(level, index string) (uint64, uint64, uint8, error) { l, err := ParseTileLevel(level) if err != nil { return 0, 0, 0, err } - i, w, err := ParseTileIndexWidth(index) + i, w, err := ParseTileIndexPartial(index) if err != nil { return 0, 0, 0, err } @@ -107,17 +108,18 @@ func ParseTileLevel(level string) (uint64, error) { return l, err } -// ParseTileIndexWidth takes index in string, validates and returns the index and width in uint64. -func ParseTileIndexWidth(index string) (uint64, uint64, error) { - w := uint64(256) +// ParseTileIndexPartial takes index in string, validates and returns the index and width in uint64. +func ParseTileIndexPartial(index string) (uint64, uint8, error) { + w := uint8(0) indexPaths := strings.Split(index, "/") if strings.Contains(index, ".p") { var err error - w, err = strconv.ParseUint(indexPaths[len(indexPaths)-1], 10, 64) - if err != nil || w < 1 || w > 255 { - return 0, 0, fmt.Errorf("failed to parse tile index") + w64, err := strconv.ParseUint(indexPaths[len(indexPaths)-1], 10, 64) + if err != nil || w64 < 1 || w64 >= TileWidth { + return 0, 0, fmt.Errorf("failed to parse tile width") } + w = uint8(w64) indexPaths[len(indexPaths)-2] = strings.TrimSuffix(indexPaths[len(indexPaths)-2], ".p") indexPaths = indexPaths[:len(indexPaths)-1] } diff --git a/api/layout/paths_test.go b/api/layout/paths_test.go index 2ccb0b3f..093eab69 100644 --- a/api/layout/paths_test.go +++ b/api/layout/paths_test.go @@ -16,7 +16,6 @@ package layout import ( "fmt" - "math" "testing" ) @@ -65,39 +64,30 @@ func TestEntriesPathForLogIndex(t *testing.T) { func TestEntriesPath(t *testing.T) { for _, test := range []struct { N uint64 - logSize uint64 + p uint8 wantPath string + wantErr bool }{ { N: 0, - logSize: 289, wantPath: "tile/entries/000", }, { N: 0, - logSize: 8, + p: 8, wantPath: "tile/entries/000.p/8", }, { N: 255, - logSize: 256 * 256, wantPath: "tile/entries/255", }, { N: 255, - logSize: 255*256 - 3, + p: 253, wantPath: "tile/entries/255.p/253", - }, { - N: 256, - logSize: 257 * 256, - wantPath: "tile/entries/256", - }, { - N: 123456789000, - logSize: math.MaxUint64, - wantPath: "tile/entries/x123/x456/x789/000", }, } { desc := fmt.Sprintf("N %d", test.N) t.Run(desc, func(t *testing.T) { - gotPath := EntriesPath(test.N, test.logSize) + gotPath := EntriesPath(test.N, test.p) if gotPath != test.wantPath { t.Errorf("got file %q want %q", gotPath, test.wantPath) } @@ -109,59 +99,37 @@ func TestTilePath(t *testing.T) { for _, test := range []struct { level uint64 index uint64 - logSize uint64 + p uint8 wantPath string }{ { level: 0, index: 0, - logSize: 256, - wantPath: "tile/0/000", - }, { - level: 0, - index: 0, - logSize: 0, wantPath: "tile/0/000", }, { level: 0, index: 0, - logSize: 255, + p: 255, wantPath: "tile/0/000.p/255", }, { level: 1, index: 0, - logSize: math.MaxUint64, wantPath: "tile/1/000", - }, { - level: 1, - index: 0, - logSize: 256, - wantPath: "tile/1/000.p/1", - }, { - level: 1, - index: 0, - logSize: 1024, - wantPath: "tile/1/000.p/4", }, { level: 15, index: 455667, - logSize: math.MaxUint64, + p: 0, wantPath: "tile/15/x455/667", - }, { - level: 3, - index: 1234567, - logSize: math.MaxUint64, - wantPath: "tile/3/x001/x234/567", }, { level: 15, index: 123456789, - logSize: math.MaxUint64, - wantPath: "tile/15/x123/x456/789", + p: 41, + wantPath: "tile/15/x123/x456/789.p/41", }, } { desc := fmt.Sprintf("level %x index %x", test.level, test.index) t.Run(desc, func(t *testing.T) { - gotPath := TilePath(test.level, test.index, test.logSize) + gotPath := TilePath(test.level, test.index, test.p) if gotPath != test.wantPath { t.Errorf("Got path %q want %q", gotPath, test.wantPath) } @@ -173,59 +141,32 @@ func TestNWithSuffix(t *testing.T) { for _, test := range []struct { level uint64 index uint64 - logSize uint64 + p uint8 wantPath string }{ { level: 0, index: 0, - logSize: 256, - wantPath: "000", - }, { - level: 0, - index: 0, - logSize: 0, wantPath: "000", }, { level: 0, index: 0, - logSize: 255, + p: 255, wantPath: "000.p/255", - }, { - level: 1, - index: 0, - logSize: math.MaxUint64, - wantPath: "000", - }, { - level: 1, - index: 0, - logSize: 256, - wantPath: "000.p/1", - }, { - level: 1, - index: 0, - logSize: 1024, - wantPath: "000.p/4", }, { level: 15, index: 455667, - logSize: math.MaxUint64, wantPath: "x455/667", - }, { - level: 3, - index: 1234567, - logSize: math.MaxUint64, - wantPath: "x001/x234/567", }, { level: 15, index: 123456789, - logSize: math.MaxUint64, - wantPath: "x123/x456/789", + p: 65, + wantPath: "x123/x456/789.p/65", }, } { desc := fmt.Sprintf("level %x index %x", test.level, test.index) t.Run(desc, func(t *testing.T) { - gotPath := NWithSuffix(test.level, test.index, test.logSize) + gotPath := NWithSuffix(test.level, test.index, test.p) if gotPath != test.wantPath { t.Errorf("Got path %q want %q", gotPath, test.wantPath) } @@ -233,13 +174,13 @@ func TestNWithSuffix(t *testing.T) { } } -func TestParseTileLevelIndexWidth(t *testing.T) { +func TestParseTileLevelIndexPartial(t *testing.T) { for _, test := range []struct { pathLevel string pathIndex string wantLevel uint64 wantIndex uint64 - wantWidth uint64 + wantP uint8 wantErr bool }{ { @@ -247,28 +188,28 @@ func TestParseTileLevelIndexWidth(t *testing.T) { pathIndex: "x001/x234/067", wantLevel: 0, wantIndex: 1234067, - wantWidth: 256, + wantP: 0, }, { pathLevel: "0", pathIndex: "x001/x234/067.p/89", wantLevel: 0, wantIndex: 1234067, - wantWidth: 89, + wantP: 89, }, { pathLevel: "63", pathIndex: "x999/x999/x999/x999/x999/999.p/255", wantLevel: 63, wantIndex: 999999999999999999, - wantWidth: 255, + wantP: 255, }, { pathLevel: "0", pathIndex: "001", wantLevel: 0, wantIndex: 1, - wantWidth: 256, + wantP: 0, }, { pathLevel: "0", @@ -348,15 +289,15 @@ func TestParseTileLevelIndexWidth(t *testing.T) { } { desc := fmt.Sprintf("pathLevel: %q, pathIndex: %q", test.pathLevel, test.pathIndex) t.Run(desc, func(t *testing.T) { - gotLevel, gotIndex, gotWidth, err := ParseTileLevelIndexWidth(test.pathLevel, test.pathIndex) + gotLevel, gotIndex, gotWidth, err := ParseTileLevelIndexPartial(test.pathLevel, test.pathIndex) if gotLevel != test.wantLevel { t.Errorf("got level %d want %d", gotLevel, test.wantLevel) } if gotIndex != test.wantIndex { t.Errorf("got index %d want %d", gotIndex, test.wantIndex) } - if gotWidth != test.wantWidth { - t.Errorf("got width %d want %d", gotWidth, test.wantWidth) + if gotWidth != test.wantP { + t.Errorf("got width %d want %d", gotWidth, test.wantP) } gotErr := err != nil if gotErr != test.wantErr { diff --git a/api/layout/tile.go b/api/layout/tile.go index c6b31a97..ded1c6da 100644 --- a/api/layout/tile.go +++ b/api/layout/tile.go @@ -14,24 +14,35 @@ package layout -// partialTileSize returns the expected number of leaves in a tile at the given location within +const ( + // TileHeight is the maximum number of levels Merkle tree levels a tile represents. + // This is fixed at 8 by tlog-tile spec. + TileHeight = 8 + // TileWidth is the maximum number of hashes which can be present in the bottom row of a tile. + TileWidth = 1 << TileHeight + // EntryBundleWidth is the maximum number of entries which can be present in an EntryBundle. + // This is defined to be the same as the width of the node tiles by tlog-tile spec. + EntryBundleWidth = TileWidth +) + +// PartialTileSize returns the expected number of leaves in a tile at the given location within // a tree of the specified logSize, or 0 if the tile is expected to be fully populated. -func partialTileSize(level, index, logSize uint64) uint64 { - sizeAtLevel := logSize >> (level * 8) - fullTiles := sizeAtLevel / 256 +func PartialTileSize(level, index, logSize uint64) uint8 { + sizeAtLevel := logSize >> (level * TileHeight) + fullTiles := sizeAtLevel / TileWidth if index < fullTiles { return 0 } - return sizeAtLevel % 256 + return uint8(sizeAtLevel % TileWidth) } // NodeCoordsToTileAddress returns the (TileLevel, TileIndex) in tile-space, and the // (NodeLevel, NodeIndex) address within that tile of the specified tree node co-ordinates. func NodeCoordsToTileAddress(treeLevel, treeIndex uint64) (uint64, uint64, uint, uint64) { - tileRowWidth := uint64(1 << (8 - treeLevel%8)) - tileLevel := treeLevel / 8 + tileRowWidth := uint64(1 << (TileHeight - treeLevel%TileHeight)) + tileLevel := treeLevel / TileHeight tileIndex := treeIndex / tileRowWidth - nodeLevel := uint(treeLevel % 8) + nodeLevel := uint(treeLevel % TileHeight) nodeIndex := uint64(treeIndex % tileRowWidth) return tileLevel, tileIndex, nodeLevel, nodeIndex diff --git a/api/state.go b/api/state.go index 3c67af40..d86e67cc 100644 --- a/api/state.go +++ b/api/state.go @@ -22,6 +22,8 @@ import ( "crypto/sha256" "encoding/binary" "fmt" + + "github.com/transparency-dev/trillian-tessera/api/layout" ) // HashTile represents a tile within the Merkle hash tree. @@ -71,7 +73,7 @@ type EntryBundle struct { // UnmarshalText implements encoding/TextUnmarshaler and reads EntryBundles // which are encoded using the tlog-tiles spec. func (t *EntryBundle) UnmarshalText(raw []byte) error { - nodes := make([][]byte, 0, 256) + nodes := make([][]byte, 0, layout.EntryBundleWidth) for index := 0; index < len(raw); { dataIndex := index + 2 if dataIndex > len(raw) { diff --git a/client/client.go b/client/client.go index 564b280e..b3347a4f 100644 --- a/client/client.go +++ b/client/client.go @@ -53,7 +53,7 @@ type CheckpointFetcherFunc func(ctx context.Context) ([]byte, error) // Note that the implementation of this MUST return (either directly or wrapped) // an os.ErrIsNotExist when the file referenced by path does not exist, e.g. a HTTP // based implementation MUST return this error when it receives a 404 StatusCode. -type TileFetcherFunc func(ctx context.Context, level, index, logSize uint64) ([]byte, error) +type TileFetcherFunc func(ctx context.Context, level, index uint64, p uint8) ([]byte, error) // EntryBundleFetcherFunc is the signature of a function which can fetch the raw data // for a given entry bundle. @@ -61,7 +61,7 @@ type TileFetcherFunc func(ctx context.Context, level, index, logSize uint64) ([] // Note that the implementation of this MUST return (either directly or wrapped) // an os.ErrIsNotExist when the file referenced by path does not exist, e.g. a HTTP // based implementation MUST return this error when it receives a 404 StatusCode. -type EntryBundleFetcherFunc func(ctx context.Context, bundleIndex, logSize uint64) ([]byte, error) +type EntryBundleFetcherFunc func(ctx context.Context, bundleIndex uint64, p uint8) ([]byte, error) // ConsensusCheckpointFunc is a function which returns the largest checkpoint known which is // signed by logSigV and satisfies some consensus algorithm. @@ -255,7 +255,7 @@ func (n *nodeCache) GetNode(ctx context.Context, id compact.NodeID) ([]byte, err tKey := tileKey{tileLevel, tileIndex} t, ok := n.tiles[tKey] if !ok { - tileRaw, err := n.getTile(ctx, tileLevel, tileIndex, n.logSize) + tileRaw, err := n.getTile(ctx, tileLevel, tileIndex, layout.PartialTileSize(tileLevel, tileIndex, n.logSize)) if err != nil { return nil, fmt.Errorf("failed to fetch tile: %w", err) } @@ -286,7 +286,7 @@ func (n *nodeCache) GetNode(ctx context.Context, id compact.NodeID) ([]byte, err // GetEntryBundle fetches the entry bundle at the given _tile index_. func GetEntryBundle(ctx context.Context, f EntryBundleFetcherFunc, i, logSize uint64) (api.EntryBundle, error) { bundle := api.EntryBundle{} - sRaw, err := f(ctx, i, logSize) + sRaw, err := f(ctx, i, uint8(logSize%layout.EntryBundleWidth)) if err != nil { if errors.Is(err, os.ErrNotExist) { return bundle, fmt.Errorf("leaf bundle at index %d not found: %v", i, err) diff --git a/client/client_test.go b/client/client_test.go index 9807af02..eb67b2a8 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -76,8 +76,8 @@ func testLogFetcher(_ context.Context, p string) ([]byte, error) { return os.ReadFile(path) } -func testLogTileFetcher(ctx context.Context, l, i, s uint64) ([]byte, error) { - return testLogFetcher(ctx, layout.TilePath(l, i, s)) +func testLogTileFetcher(ctx context.Context, l, i uint64, p uint8) ([]byte, error) { + return testLogFetcher(ctx, layout.TilePath(l, i, p)) } // fetchCheckpointShim allows fetcher requests for checkpoints to be intercepted. @@ -309,7 +309,7 @@ func TestCheckConsistency(t *testing.T) { func TestNodeCacheHandlesInvalidRequest(t *testing.T) { ctx := context.Background() wantBytes := []byte("0123456789ABCDEF0123456789ABCDEF") - f := func(_ context.Context, _, _, _ uint64) ([]byte, error) { + f := func(_ context.Context, _, _ uint64, _ uint8) ([]byte, error) { h := &api.HashTile{ Nodes: [][]byte{wantBytes}, } diff --git a/client/fetcher.go b/client/fetcher.go index bfa0e9eb..681e0b82 100644 --- a/client/fetcher.go +++ b/client/fetcher.go @@ -98,12 +98,12 @@ func (h HTTPFetcher) ReadCheckpoint(ctx context.Context) ([]byte, error) { return h.fetch(ctx, layout.CheckpointPath) } -func (h HTTPFetcher) ReadTile(ctx context.Context, l, i, sz uint64) ([]byte, error) { - return h.fetch(ctx, layout.TilePath(l, i, sz)) +func (h HTTPFetcher) ReadTile(ctx context.Context, l, i uint64, p uint8) ([]byte, error) { + return h.fetch(ctx, layout.TilePath(l, i, p)) } -func (h HTTPFetcher) ReadEntryBundle(ctx context.Context, i, sz uint64) ([]byte, error) { - return h.fetch(ctx, layout.EntriesPath(i, sz)) +func (h HTTPFetcher) ReadEntryBundle(ctx context.Context, i uint64, p uint8) ([]byte, error) { + return h.fetch(ctx, layout.EntriesPath(i, p)) } // FileFetcher knows how to fetch log artifacts from a filesystem rooted at Root. @@ -115,10 +115,10 @@ func (f FileFetcher) ReadCheckpoint(_ context.Context) ([]byte, error) { return os.ReadFile(path.Join(f.Root, layout.CheckpointPath)) } -func (f FileFetcher) ReadTile(_ context.Context, l, i, sz uint64) ([]byte, error) { - return os.ReadFile(path.Join(f.Root, layout.TilePath(l, i, sz))) +func (f FileFetcher) ReadTile(_ context.Context, l, i uint64, p uint8) ([]byte, error) { + return os.ReadFile(path.Join(f.Root, layout.TilePath(l, i, p))) } -func (f FileFetcher) ReadEntryBundle(_ context.Context, i, sz uint64) ([]byte, error) { - return os.ReadFile(path.Join(f.Root, layout.EntriesPath(i, sz))) +func (f FileFetcher) ReadEntryBundle(_ context.Context, i uint64, p uint8) ([]byte, error) { + return os.ReadFile(path.Join(f.Root, layout.EntriesPath(i, p))) } diff --git a/cmd/conformance/mysql/main.go b/cmd/conformance/mysql/main.go index d8f7a4c6..00e24b29 100644 --- a/cmd/conformance/mysql/main.go +++ b/cmd/conformance/mysql/main.go @@ -159,7 +159,7 @@ func configureTilesReadAPI(mux *http.ServeMux, storage *mysql.Storage) { }) mux.HandleFunc("GET /tile/{level}/{index...}", func(w http.ResponseWriter, r *http.Request) { - level, index, width, err := layout.ParseTileLevelIndexWidth(r.PathValue("level"), r.PathValue("index")) + level, index, p, err := layout.ParseTileLevelIndexPartial(r.PathValue("level"), r.PathValue("index")) if err != nil { w.WriteHeader(http.StatusBadRequest) if _, werr := w.Write([]byte(fmt.Sprintf("Malformed URL: %s", err.Error()))); werr != nil { @@ -167,8 +167,7 @@ func configureTilesReadAPI(mux *http.ServeMux, storage *mysql.Storage) { } return } - inferredMinTreeSize := (index*256 + width) << (level * 8) - tile, err := storage.ReadTile(r.Context(), level, index, inferredMinTreeSize) + tile, err := storage.ReadTile(r.Context(), level, index, p) if err != nil { if errors.Is(err, fs.ErrNotExist) { w.WriteHeader(http.StatusNotFound) @@ -188,7 +187,7 @@ func configureTilesReadAPI(mux *http.ServeMux, storage *mysql.Storage) { }) mux.HandleFunc("GET /tile/entries/{index...}", func(w http.ResponseWriter, r *http.Request) { - index, width, err := layout.ParseTileIndexWidth(r.PathValue("index")) + index, p, err := layout.ParseTileIndexPartial(r.PathValue("index")) if err != nil { w.WriteHeader(http.StatusBadRequest) if _, werr := w.Write([]byte(fmt.Sprintf("Malformed URL: %s", err.Error()))); werr != nil { @@ -197,7 +196,7 @@ func configureTilesReadAPI(mux *http.ServeMux, storage *mysql.Storage) { return } - entryBundle, err := storage.ReadEntryBundle(r.Context(), index, width) + entryBundle, err := storage.ReadEntryBundle(r.Context(), index, p) if err != nil { klog.Errorf("/tile/entries/{index...}: %v", err) w.WriteHeader(http.StatusInternalServerError) diff --git a/ct_only.go b/ct_only.go index 34ede327..e39a21a6 100644 --- a/ct_only.go +++ b/ct_only.go @@ -69,6 +69,6 @@ func WithCTLayout() func(*options.StorageOptions) { } } -func ctEntriesPath(n, logSize uint64) string { - return fmt.Sprintf("tile/data/%s", layout.NWithSuffix(0, n, logSize)) +func ctEntriesPath(n uint64, p uint8) string { + return fmt.Sprintf("tile/data/%s", layout.NWithSuffix(0, n, p)) } diff --git a/ct_only_test.go b/ct_only_test.go index 7048a466..d5817da3 100644 --- a/ct_only_test.go +++ b/ct_only_test.go @@ -16,46 +16,41 @@ package tessera import ( "fmt" - "math" "testing" ) func TestCTEntriesPath(t *testing.T) { for _, test := range []struct { N uint64 - logSize uint64 + p uint8 wantPath string }{ { N: 0, - logSize: 289, wantPath: "tile/data/000", }, { N: 0, - logSize: 8, + p: 8, wantPath: "tile/data/000.p/8", }, { N: 255, - logSize: 256 * 256, wantPath: "tile/data/255", }, { N: 255, - logSize: 255*256 - 3, + p: 253, wantPath: "tile/data/255.p/253", }, { N: 256, - logSize: 257 * 256, wantPath: "tile/data/256", }, { N: 123456789000, - logSize: math.MaxUint64, wantPath: "tile/data/x123/x456/x789/000", }, } { desc := fmt.Sprintf("N %d", test.N) t.Run(desc, func(t *testing.T) { - gotPath := ctEntriesPath(test.N, test.logSize) + gotPath := ctEntriesPath(test.N, test.p) if gotPath != test.wantPath { t.Errorf("got file %q want %q", gotPath, test.wantPath) } diff --git a/integration/storage_uniformity_test.go b/integration/storage_uniformity_test.go index 123240aa..aeb12af4 100644 --- a/integration/storage_uniformity_test.go +++ b/integration/storage_uniformity_test.go @@ -28,8 +28,8 @@ import ( type StorageContract interface { Add(ctx context.Context, entry *tessera.Entry) tessera.IndexFuture ReadCheckpoint(ctx context.Context) ([]byte, error) - ReadTile(ctx context.Context, level, index, treeSize uint64) ([]byte, error) - ReadEntryBundle(ctx context.Context, index, treeSize uint64) ([]byte, error) + ReadTile(ctx context.Context, level, index uint64, p uint8) ([]byte, error) + ReadEntryBundle(ctx context.Context, index uint64, p uint8) ([]byte, error) } var ( diff --git a/internal/hammer/client.go b/internal/hammer/client.go index 58429100..63a04028 100644 --- a/internal/hammer/client.go +++ b/internal/hammer/client.go @@ -35,8 +35,8 @@ var ErrRetry = errors.New("retry") type fetcher interface { ReadCheckpoint(ctx context.Context) ([]byte, error) - ReadTile(ctx context.Context, l, i, sz uint64) ([]byte, error) - ReadEntryBundle(ctx context.Context, i, sz uint64) ([]byte, error) + ReadTile(ctx context.Context, l, i uint64, p uint8) ([]byte, error) + ReadEntryBundle(ctx context.Context, i uint64, p uint8) ([]byte, error) } // newLogClientsFromFlags returns a fetcher and a writer that will read @@ -110,14 +110,14 @@ func (rr *roundRobinFetcher) ReadCheckpoint(ctx context.Context) ([]byte, error) return f.ReadCheckpoint(ctx) } -func (rr *roundRobinFetcher) ReadTile(ctx context.Context, l, i, sz uint64) ([]byte, error) { +func (rr *roundRobinFetcher) ReadTile(ctx context.Context, l, i uint64, p uint8) ([]byte, error) { f := rr.next() - return f.ReadTile(ctx, l, i, sz) + return f.ReadTile(ctx, l, i, p) } -func (rr *roundRobinFetcher) ReadEntryBundle(ctx context.Context, i, sz uint64) ([]byte, error) { +func (rr *roundRobinFetcher) ReadEntryBundle(ctx context.Context, i uint64, p uint8) ([]byte, error) { f := rr.next() - return f.ReadEntryBundle(ctx, i, sz) + return f.ReadEntryBundle(ctx, i, p) } func (rr *roundRobinFetcher) next() fetcher { diff --git a/internal/options/options.go b/internal/options/options.go index 2d60c543..5a87c5ca 100644 --- a/internal/options/options.go +++ b/internal/options/options.go @@ -27,7 +27,7 @@ type NewCPFunc func(size uint64, hash []byte) ([]byte, error) type ParseCPFunc func(raw []byte) (*f_log.Checkpoint, error) // EntriesPathFunc is the signature of a function which knows how to format entry bundle paths. -type EntriesPathFunc func(n, logSize uint64) string +type EntriesPathFunc func(n uint64, p uint8) string // StorageOptions holds optional settings for all storage implementations. type StorageOptions struct { diff --git a/storage/aws/aws.go b/storage/aws/aws.go index c4766190..6950db1b 100644 --- a/storage/aws/aws.go +++ b/storage/aws/aws.go @@ -59,9 +59,8 @@ import ( ) const ( - entryBundleSize = 256 - logContType = "application/octet-stream" - ckptContType = "text/plain; charset=utf-8" + logContType = "application/octet-stream" + ckptContType = "text/plain; charset=utf-8" DefaultPushbackMaxOutstanding = 4096 DefaultIntegrationSizeLimit = 5 * 4096 @@ -225,12 +224,12 @@ func (s *Storage) ReadCheckpoint(ctx context.Context) ([]byte, error) { return s.get(ctx, layout.CheckpointPath) } -func (s *Storage) ReadTile(ctx context.Context, l, i, sz uint64) ([]byte, error) { - return s.get(ctx, layout.TilePath(l, i, sz)) +func (s *Storage) ReadTile(ctx context.Context, l, i uint64, p uint8) ([]byte, error) { + return s.get(ctx, layout.TilePath(l, i, p)) } -func (s *Storage) ReadEntryBundle(ctx context.Context, i, sz uint64) ([]byte, error) { - return s.get(ctx, s.entriesPath(i, sz)) +func (s *Storage) ReadEntryBundle(ctx context.Context, i uint64, p uint8) ([]byte, error) { + return s.get(ctx, s.entriesPath(i, p)) } // get returns the requested object. @@ -303,7 +302,7 @@ func (s *Storage) setTile(ctx context.Context, level, index, logSize uint64, til if err != nil { return err } - tPath := layout.TilePath(level, index, logSize) + tPath := layout.TilePath(level, index, layout.PartialTileSize(level, index, logSize)) klog.V(2).Infof("StoreTile: %s (%d entries)", tPath, len(tile.Nodes)) return s.objStore.setObjectIfNoneMatch(ctx, tPath, data, logContType) @@ -319,7 +318,7 @@ func (s *Storage) getTiles(ctx context.Context, tileIDs []storage.TileID, logSiz i := i id := id errG.Go(func() error { - objName := layout.TilePath(id.Level, id.Index, logSize) + objName := layout.TilePath(id.Level, id.Index, layout.PartialTileSize(id.Level, id.Index, logSize)) data, err := s.objStore.getObject(ctx, objName) if err != nil { // Do not use errors.Is. Keep errors.As to compare by type and not by value. @@ -349,8 +348,8 @@ func (s *Storage) getTiles(ctx context.Context, tileIDs []storage.TileID, logSiz // getEntryBundle returns the serialised entry bundle at the location implied by the given index and treeSize. // // Returns a wrapped os.ErrNotExist if the bundle does not exist. -func (s *Storage) getEntryBundle(ctx context.Context, bundleIndex uint64, logSize uint64) ([]byte, error) { - objName := s.entriesPath(bundleIndex, logSize) +func (s *Storage) getEntryBundle(ctx context.Context, bundleIndex uint64, p uint8) ([]byte, error) { + objName := s.entriesPath(bundleIndex, p) data, err := s.objStore.getObject(ctx, objName) if err != nil { // Do not use errors.Is. Keep errors.As to compare by type and not by value. @@ -367,8 +366,8 @@ func (s *Storage) getEntryBundle(ctx context.Context, bundleIndex uint64, logSiz } // setEntryBundle idempotently stores the serialised entry bundle at the location implied by the bundleIndex and treeSize. -func (s *Storage) setEntryBundle(ctx context.Context, bundleIndex uint64, logSize uint64, bundleRaw []byte) error { - objName := s.entriesPath(bundleIndex, logSize) +func (s *Storage) setEntryBundle(ctx context.Context, bundleIndex uint64, p uint8, bundleRaw []byte) error { + objName := s.entriesPath(bundleIndex, p) // Note that setObject does an idempotent interpretation of IfNoneMatch - it only // returns an error if the named object exists _and_ contains different data to what's // passed in here. @@ -433,11 +432,11 @@ func (s *Storage) updateEntryBundles(ctx context.Context, fromSeq uint64, entrie } numAdded := uint64(0) - bundleIndex, entriesInBundle := fromSeq/entryBundleSize, fromSeq%entryBundleSize + bundleIndex, entriesInBundle := fromSeq/layout.EntryBundleWidth, fromSeq%layout.EntryBundleWidth bundleWriter := &bytes.Buffer{} if entriesInBundle > 0 { // If the latest bundle is partial, we need to read the data it contains in for our newer, larger, bundle. - part, err := s.getEntryBundle(ctx, uint64(bundleIndex), uint64(entriesInBundle)) + part, err := s.getEntryBundle(ctx, uint64(bundleIndex), uint8(entriesInBundle)) if err != nil { return err } @@ -451,9 +450,9 @@ func (s *Storage) updateEntryBundles(ctx context.Context, fromSeq uint64, entrie // goSetEntryBundle is a function which uses seqErr to spin off a go-routine to write out an entry bundle. // It's used in the for loop below. - goSetEntryBundle := func(ctx context.Context, bundleIndex uint64, fromSeq uint64, bundleRaw []byte) { + goSetEntryBundle := func(ctx context.Context, bundleIndex uint64, p uint8, bundleRaw []byte) { seqErr.Go(func() error { - if err := s.setEntryBundle(ctx, bundleIndex, fromSeq, bundleRaw); err != nil { + if err := s.setEntryBundle(ctx, bundleIndex, p, bundleRaw); err != nil { return err } return nil @@ -468,10 +467,10 @@ func (s *Storage) updateEntryBundles(ctx context.Context, fromSeq uint64, entrie entriesInBundle++ fromSeq++ numAdded++ - if entriesInBundle == entryBundleSize { + if entriesInBundle == layout.EntryBundleWidth { // This bundle is full, so we need to write it out... klog.V(1).Infof("In-memory bundle idx %d is full, attempting write to S3", bundleIndex) - goSetEntryBundle(ctx, bundleIndex, fromSeq, bundleWriter.Bytes()) + goSetEntryBundle(ctx, bundleIndex, 0, bundleWriter.Bytes()) // ... and prepare the next entry bundle for any remaining entries in the batch bundleIndex++ entriesInBundle = 0 @@ -484,7 +483,7 @@ func (s *Storage) updateEntryBundles(ctx context.Context, fromSeq uint64, entrie // this needs writing out too. if entriesInBundle > 0 { klog.V(1).Infof("Attempting to write in-memory partial bundle idx %d.%d to S3", bundleIndex, entriesInBundle) - goSetEntryBundle(ctx, bundleIndex, fromSeq, bundleWriter.Bytes()) + goSetEntryBundle(ctx, bundleIndex, uint8(entriesInBundle), bundleWriter.Bytes()) } return seqErr.Wait() } diff --git a/storage/aws/aws_test.go b/storage/aws/aws_test.go index 5ce734be..7c866657 100644 --- a/storage/aws/aws_test.go +++ b/storage/aws/aws_test.go @@ -294,7 +294,7 @@ func TestTileRoundtrip(t *testing.T) { t.Fatalf("setTile: %v", err) } - expPath := layout.TilePath(test.level, test.index, test.logSize) + expPath := layout.TilePath(test.level, test.index, layout.PartialTileSize(test.level, test.index, test.logSize)) _, ok := m.mem[expPath] if !ok { t.Fatalf("want tile at %v but found none", expPath) @@ -334,29 +334,29 @@ func TestBundleRoundtrip(t *testing.T) { for _, test := range []struct { name string index uint64 - logSize uint64 + p uint8 bundleSize uint64 }{ { name: "ok", index: 3 * 256, - logSize: 3*256 + 20, + p: 20, bundleSize: 20, }, } { t.Run(test.name, func(t *testing.T) { wantBundle := makeBundle(t, test.bundleSize) - if err := s.setEntryBundle(ctx, test.index, test.logSize, wantBundle); err != nil { + if err := s.setEntryBundle(ctx, test.index, test.p, wantBundle); err != nil { t.Fatalf("setEntryBundle: %v", err) } - expPath := layout.EntriesPath(test.index, test.logSize) + expPath := layout.EntriesPath(test.index, test.p) _, ok := m.mem[expPath] if !ok { t.Fatalf("want bundle at %v but found none", expPath) } - got, err := s.getEntryBundle(ctx, test.index, test.logSize) + got, err := s.getEntryBundle(ctx, test.index, test.p) if err != nil { t.Fatalf("getEntryBundle: %v", err) } diff --git a/storage/gcp/gcp.go b/storage/gcp/gcp.go index aa4a374a..6947f2fa 100644 --- a/storage/gcp/gcp.go +++ b/storage/gcp/gcp.go @@ -59,9 +59,8 @@ import ( ) const ( - entryBundleSize = 256 - logContType = "application/octet-stream" - ckptContType = "text/plain; charset=utf-8" + logContType = "application/octet-stream" + ckptContType = "text/plain; charset=utf-8" DefaultPushbackMaxOutstanding = 4096 DefaultIntegrationSizeLimit = 5 * 4096 @@ -203,12 +202,12 @@ func (s *Storage) ReadCheckpoint(ctx context.Context) ([]byte, error) { return s.get(ctx, layout.CheckpointPath) } -func (s *Storage) ReadTile(ctx context.Context, l, i, sz uint64) ([]byte, error) { - return s.get(ctx, layout.TilePath(l, i, sz)) +func (s *Storage) ReadTile(ctx context.Context, l, i uint64, p uint8) ([]byte, error) { + return s.get(ctx, layout.TilePath(l, i, p)) } -func (s *Storage) ReadEntryBundle(ctx context.Context, i, sz uint64) ([]byte, error) { - return s.get(ctx, s.entriesPath(i, sz)) +func (s *Storage) ReadEntryBundle(ctx context.Context, i uint64, p uint8) ([]byte, error) { + return s.get(ctx, s.entriesPath(i, p)) } // get returns the requested object. @@ -277,7 +276,7 @@ func (s *Storage) setTile(ctx context.Context, level, index, logSize uint64, til if err != nil { return err } - tPath := layout.TilePath(level, index, logSize) + tPath := layout.TilePath(level, index, layout.PartialTileSize(level, index, logSize)) klog.V(2).Infof("StoreTile: %s (%d entries)", tPath, len(tile.Nodes)) return s.objStore.setObject(ctx, tPath, data, &gcs.Conditions{DoesNotExist: true}, logContType) @@ -293,7 +292,7 @@ func (s *Storage) getTiles(ctx context.Context, tileIDs []storage.TileID, logSiz i := i id := id errG.Go(func() error { - objName := layout.TilePath(id.Level, id.Index, logSize) + objName := layout.TilePath(id.Level, id.Index, layout.PartialTileSize(id.Level, id.Index, logSize)) data, _, err := s.objStore.getObject(ctx, objName) if err != nil { if errors.Is(err, gcs.ErrObjectNotExist) { @@ -318,11 +317,12 @@ func (s *Storage) getTiles(ctx context.Context, tileIDs []storage.TileID, logSiz } -// getEntryBundle returns the serialised entry bundle at the location implied by the given index and treeSize. +// getEntryBundle returns the serialised entry bundle at the location described by the given index and partial size. +// A partial size of zero implies a full tile. // // Returns a wrapped os.ErrNotExist if the bundle does not exist. -func (s *Storage) getEntryBundle(ctx context.Context, bundleIndex uint64, logSize uint64) ([]byte, error) { - objName := s.entriesPath(bundleIndex, logSize) +func (s *Storage) getEntryBundle(ctx context.Context, bundleIndex uint64, p uint8) ([]byte, error) { + objName := s.entriesPath(bundleIndex, p) data, _, err := s.objStore.getObject(ctx, objName) if err != nil { if errors.Is(err, gcs.ErrObjectNotExist) { @@ -337,8 +337,8 @@ func (s *Storage) getEntryBundle(ctx context.Context, bundleIndex uint64, logSiz } // setEntryBundle idempotently stores the serialised entry bundle at the location implied by the bundleIndex and treeSize. -func (s *Storage) setEntryBundle(ctx context.Context, bundleIndex uint64, logSize uint64, bundleRaw []byte) error { - objName := s.entriesPath(bundleIndex, logSize) +func (s *Storage) setEntryBundle(ctx context.Context, bundleIndex uint64, p uint8, bundleRaw []byte) error { + objName := s.entriesPath(bundleIndex, p) // Note that setObject does an idempotent interpretation of DoesNotExist - it only // returns an error if the named object exists _and_ contains different data to what's // passed in here. @@ -400,11 +400,11 @@ func (s *Storage) updateEntryBundles(ctx context.Context, fromSeq uint64, entrie } numAdded := uint64(0) - bundleIndex, entriesInBundle := fromSeq/entryBundleSize, fromSeq%entryBundleSize + bundleIndex, entriesInBundle := fromSeq/layout.EntryBundleWidth, fromSeq%layout.EntryBundleWidth bundleWriter := &bytes.Buffer{} if entriesInBundle > 0 { // If the latest bundle is partial, we need to read the data it contains in for our newer, larger, bundle. - part, err := s.getEntryBundle(ctx, uint64(bundleIndex), uint64(entriesInBundle)) + part, err := s.getEntryBundle(ctx, uint64(bundleIndex), uint8(entriesInBundle)) if err != nil { return err } @@ -418,9 +418,9 @@ func (s *Storage) updateEntryBundles(ctx context.Context, fromSeq uint64, entrie // goSetEntryBundle is a function which uses seqErr to spin off a go-routine to write out an entry bundle. // It's used in the for loop below. - goSetEntryBundle := func(ctx context.Context, bundleIndex uint64, fromSeq uint64, bundleRaw []byte) { + goSetEntryBundle := func(ctx context.Context, bundleIndex uint64, p uint8, bundleRaw []byte) { seqErr.Go(func() error { - if err := s.setEntryBundle(ctx, bundleIndex, fromSeq, bundleRaw); err != nil { + if err := s.setEntryBundle(ctx, bundleIndex, p, bundleRaw); err != nil { return err } return nil @@ -435,10 +435,10 @@ func (s *Storage) updateEntryBundles(ctx context.Context, fromSeq uint64, entrie entriesInBundle++ fromSeq++ numAdded++ - if entriesInBundle == entryBundleSize { + if entriesInBundle == layout.EntryBundleWidth { // This bundle is full, so we need to write it out... klog.V(1).Infof("In-memory bundle idx %d is full, attempting write to GCS", bundleIndex) - goSetEntryBundle(ctx, bundleIndex, fromSeq, bundleWriter.Bytes()) + goSetEntryBundle(ctx, bundleIndex, 0, bundleWriter.Bytes()) // ... and prepare the next entry bundle for any remaining entries in the batch bundleIndex++ entriesInBundle = 0 @@ -451,7 +451,7 @@ func (s *Storage) updateEntryBundles(ctx context.Context, fromSeq uint64, entrie // this needs writing out too. if entriesInBundle > 0 { klog.V(1).Infof("Attempting to write in-memory partial bundle idx %d.%d to GCS", bundleIndex, entriesInBundle) - goSetEntryBundle(ctx, bundleIndex, fromSeq, bundleWriter.Bytes()) + goSetEntryBundle(ctx, bundleIndex, uint8(entriesInBundle), bundleWriter.Bytes()) } return seqErr.Wait() } diff --git a/storage/gcp/gcp_test.go b/storage/gcp/gcp_test.go index 2210301b..66feddc8 100644 --- a/storage/gcp/gcp_test.go +++ b/storage/gcp/gcp_test.go @@ -232,7 +232,7 @@ func TestTileRoundtrip(t *testing.T) { t.Fatalf("setTile: %v", err) } - expPath := layout.TilePath(test.level, test.index, test.logSize) + expPath := layout.TilePath(test.level, test.index, layout.PartialTileSize(test.level, test.index, test.logSize)) _, ok := m.mem[expPath] if !ok { t.Fatalf("want tile at %v but found none", expPath) @@ -284,17 +284,17 @@ func TestBundleRoundtrip(t *testing.T) { } { t.Run(test.name, func(t *testing.T) { wantBundle := makeBundle(t, test.bundleSize) - if err := s.setEntryBundle(ctx, test.index, test.logSize, wantBundle); err != nil { + if err := s.setEntryBundle(ctx, test.index, uint8(test.bundleSize), wantBundle); err != nil { t.Fatalf("setEntryBundle: %v", err) } - expPath := layout.EntriesPath(test.index, test.logSize) + expPath := layout.EntriesPath(test.index, uint8(test.logSize%layout.EntryBundleWidth)) _, ok := m.mem[expPath] if !ok { t.Fatalf("want bundle at %v but found none", expPath) } - got, err := s.getEntryBundle(ctx, test.index, test.logSize) + got, err := s.getEntryBundle(ctx, test.index, uint8(test.logSize%layout.EntryBundleWidth)) if err != nil { t.Fatalf("getEntryBundle: %v", err) } diff --git a/storage/internal/integrate.go b/storage/internal/integrate.go index d04ac27b..275016c8 100644 --- a/storage/internal/integrate.go +++ b/storage/internal/integrate.go @@ -177,7 +177,7 @@ func newTileReadCache(getTiles func(ctx context.Context, tileIDs []TileID, treeS // Get returns a previously set tile and true, or, if no such tile is in the cache, attempt to fetch it. func (r *tileReadCache) Get(ctx context.Context, tileID TileID, treeSize uint64) (*populatedTile, error) { - k := layout.TilePath(uint64(tileID.Level), tileID.Index, treeSize) + k := layout.TilePath(uint64(tileID.Level), tileID.Index, layout.PartialTileSize(tileID.Level, tileID.Index, treeSize)) e, ok := r.entries[k] if !ok { klog.V(1).Infof("Readcache miss: %q", k) @@ -207,7 +207,7 @@ func (r *tileReadCache) Prewarm(ctx context.Context, tileIDs []TileID, treeSize if err != nil { return fmt.Errorf("failed to create fulltile: %v", err) } - k := layout.TilePath(uint64(tileIDs[i].Level), tileIDs[i].Index, treeSize) + k := layout.TilePath(uint64(tileIDs[i].Level), tileIDs[i].Index, layout.PartialTileSize(tileIDs[i].Level, tileIDs[i].Index, treeSize)) r.entries[k] = e } return nil diff --git a/storage/internal/integrate_test.go b/storage/internal/integrate_test.go index d29ae01b..07a6c737 100644 --- a/storage/internal/integrate_test.go +++ b/storage/internal/integrate_test.go @@ -221,7 +221,7 @@ func (m *memTileStore[T]) getTile(_ context.Context, id TileID, treeSize uint64) m.RLock() defer m.RUnlock() - k := layout.TilePath(id.Level, id.Index, treeSize) + k := layout.TilePath(id.Level, id.Index, layout.PartialTileSize(id.Level, id.Index, treeSize)) d := m.mem[k] return d, nil } @@ -232,7 +232,7 @@ func (m *memTileStore[T]) getTiles(_ context.Context, ids []TileID, treeSize uin r := make([]*T, len(ids)) for i, id := range ids { - k := layout.TilePath(id.Level, id.Index, treeSize) + k := layout.TilePath(id.Level, id.Index, layout.PartialTileSize(id.Level, id.Index, treeSize)) klog.V(1).Infof("mem.getTile(%q, %d)", k, treeSize) d, ok := m.mem[k] if !ok { @@ -247,7 +247,7 @@ func (m *memTileStore[T]) setTile(_ context.Context, id TileID, treeSize uint64, m.Lock() defer m.Unlock() - k := layout.TilePath(id.Level, id.Index, treeSize) + k := layout.TilePath(id.Level, id.Index, layout.PartialTileSize(id.Level, id.Index, treeSize)) klog.V(1).Infof("mem.setTile(%q, %d)", k, treeSize) _, ok := m.mem[k] if ok { diff --git a/storage/mysql/mysql.go b/storage/mysql/mysql.go index 8ac67862..7c17af13 100644 --- a/storage/mysql/mysql.go +++ b/storage/mysql/mysql.go @@ -243,7 +243,7 @@ func (s *Storage) writeTreeState(ctx context.Context, tx *sql.Tx, size uint64, r // Note that if a partial tile is requested, but a larger tile is available, this // will return the largest tile available. This could be trimmed to return only the // number of entries specifically requested if this behaviour becomes problematic. -func (s *Storage) ReadTile(ctx context.Context, level, index, minTreeSize uint64) ([]byte, error) { +func (s *Storage) ReadTile(ctx context.Context, level, index uint64, p uint8) ([]byte, error) { row := s.db.QueryRowContext(ctx, selectSubtreeByLevelAndIndexSQL, level, index) if err := row.Err(); err != nil { return nil, err @@ -258,10 +258,12 @@ func (s *Storage) ReadTile(ctx context.Context, level, index, minTreeSize uint64 return nil, fmt.Errorf("scan tile: %v", err) } - requestedWidth := partialTileSize(level, index, minTreeSize) numEntries := uint64(len(tile) / sha256.Size) - - if requestedWidth > numEntries { + requestedEntries := uint64(p) + if requestedEntries == 0 { + requestedEntries = 256 + } + if requestedEntries > numEntries { // If the user has requested a size larger than we have, they can't have it return nil, os.ErrNotExist } @@ -269,17 +271,6 @@ func (s *Storage) ReadTile(ctx context.Context, level, index, minTreeSize uint64 return tile, nil } -// partialTileSize returns the expected number of leaves in a tile at the given location within -// a tree of the specified logSize, or 0 if the tile is expected to be fully populated. -func partialTileSize(level, index, logSize uint64) uint64 { - sizeAtLevel := logSize >> (level * 8) - fullTiles := sizeAtLevel / 256 - if index < fullTiles { - return 256 - } - return sizeAtLevel % 256 -} - // writeTile replaces the tile nodes at the given level and index. func (s *Storage) writeTile(ctx context.Context, tx *sql.Tx, level, index uint64, nodes []byte) error { if _, err := tx.ExecContext(ctx, replaceSubtreeSQL, level, index, nodes); err != nil { @@ -299,7 +290,7 @@ func (s *Storage) writeTile(ctx context.Context, tx *sql.Tx, level, index uint64 // 3. Partial tile request with full/larger partial tile output: Return trimmed partial tile with correct tile width. // 4. Partial tile request with partial tile (same width) output: Return partial tile. // 5. Partial tile request with smaller partial tile output: Return error. -func (s *Storage) ReadEntryBundle(ctx context.Context, index, treeSize uint64) ([]byte, error) { +func (s *Storage) ReadEntryBundle(ctx context.Context, index uint64, p uint8) ([]byte, error) { row := s.db.QueryRowContext(ctx, selectTiledLeavesSQL, index) if err := row.Err(); err != nil { return nil, err diff --git a/storage/mysql/mysql_test.go b/storage/mysql/mysql_test.go index a9da561c..ada8f85d 100644 --- a/storage/mysql/mysql_test.go +++ b/storage/mysql/mysql_test.go @@ -191,54 +191,55 @@ func TestGetTile(t *testing.T) { } for _, test := range []struct { - name string - level, index, treeSize uint64 - wantEntries int - wantNotFound bool + name string + level, index uint64 + p uint8 + wantEntries int + wantNotFound bool }{ { name: "requested partial tile for a complete tile", - level: 0, index: 0, treeSize: 10, + level: 0, index: 0, p: 10, wantEntries: 256, wantNotFound: false, }, { name: "too small but that's ok", - level: 0, index: 1, treeSize: uint64(treeSize) - 1, + level: 0, index: 1, p: layout.PartialTileSize(0, 1, uint64(treeSize-1)), wantEntries: 2, wantNotFound: false, }, { name: "just right", - level: 0, index: 1, treeSize: uint64(treeSize), + level: 0, index: 1, p: layout.PartialTileSize(0, 1, uint64(treeSize)), wantEntries: 2, wantNotFound: false, }, { name: "too big", - level: 0, index: 1, treeSize: uint64(treeSize + 1), + level: 0, index: 1, p: layout.PartialTileSize(0, 1, uint64(treeSize+1)), wantNotFound: true, }, { name: "level 1 too small", - level: 1, index: 0, treeSize: uint64(treeSize - 1), + level: 1, index: 0, p: layout.PartialTileSize(1, 0, uint64(treeSize-1)), wantEntries: 1, wantNotFound: false, }, { name: "level 1 just right", - level: 1, index: 0, treeSize: uint64(treeSize), + level: 1, index: 0, p: layout.PartialTileSize(1, 0, uint64(treeSize)), wantEntries: 1, wantNotFound: false, }, { name: "level 1 too big", - level: 1, index: 0, treeSize: 550, + level: 1, index: 0, p: layout.PartialTileSize(1, 0, 550), wantNotFound: true, }, } { t.Run(test.name, func(t *testing.T) { - tile, err := s.ReadTile(ctx, test.level, test.index, test.treeSize) + tile, err := s.ReadTile(ctx, test.level, test.index, test.p) if err != nil { if notFound, wantNotFound := errors.Is(err, fs.ErrNotExist), test.wantNotFound; notFound != wantNotFound { t.Errorf("wantNotFound %v but notFound %v", wantNotFound, notFound) @@ -261,20 +262,21 @@ func TestReadMissingTile(t *testing.T) { s := newTestMySQLStorage(t, ctx) for _, test := range []struct { - name string - level, index, width uint64 + name string + level, index uint64 + p uint8 }{ { name: "0/0/0", - level: 0, index: 0, width: 0, + level: 0, index: 0, p: 0, }, { name: "123/456/789", - level: 123, index: 456, width: 789, + level: 123, index: 456, p: 789 % layout.TileWidth, }, } { t.Run(test.name, func(t *testing.T) { - tile, err := s.ReadTile(ctx, test.level, test.index, test.width) + tile, err := s.ReadTile(ctx, test.level, test.index, test.p) if err != nil { if errors.Is(err, fs.ErrNotExist) { // this is success for this test @@ -307,7 +309,7 @@ func TestReadMissingEntryBundle(t *testing.T) { }, } { t.Run(test.name, func(t *testing.T) { - entryBundle, err := s.ReadEntryBundle(ctx, test.index, test.index) + entryBundle, err := s.ReadEntryBundle(ctx, test.index, uint8(test.index%layout.TileWidth)) if err != nil { if errors.Is(err, fs.ErrNotExist) { // this is success for this test @@ -387,7 +389,7 @@ func TestTileRoundTrip(t *testing.T) { } tileLevel, tileIndex, _, nodeIndex := layout.NodeCoordsToTileAddress(0, entryIndex) - tileRaw, err := s.ReadTile(ctx, tileLevel, tileIndex, nodeIndex+1) + tileRaw, err := s.ReadTile(ctx, tileLevel, tileIndex, uint8(nodeIndex+1)) if err != nil { t.Errorf("ReadTile got err: %v", err) } @@ -436,7 +438,7 @@ func TestEntryBundleRoundTrip(t *testing.T) { if err != nil { t.Errorf("Add got err: %v", err) } - entryBundleRaw, err := s.ReadEntryBundle(ctx, entryIndex/256, entryIndex) + entryBundleRaw, err := s.ReadEntryBundle(ctx, entryIndex/256, uint8(entryIndex%layout.TileWidth)) if err != nil { t.Errorf("ReadEntryBundle got err: %v", err) } diff --git a/storage/posix/files.go b/storage/posix/files.go index 5e78658f..4fca1a4c 100644 --- a/storage/posix/files.go +++ b/storage/posix/files.go @@ -148,12 +148,12 @@ func (s *Storage) ReadCheckpoint(_ context.Context) ([]byte, error) { } // ReadEntryBundle retrieves the Nth entries bundle for a log of the given size. -func (s *Storage) ReadEntryBundle(_ context.Context, index, logSize uint64) ([]byte, error) { - return os.ReadFile(filepath.Join(s.path, s.entriesPath(index, logSize))) +func (s *Storage) ReadEntryBundle(_ context.Context, index uint64, p uint8) ([]byte, error) { + return os.ReadFile(filepath.Join(s.path, s.entriesPath(index, p))) } -func (s *Storage) ReadTile(_ context.Context, level, index, logSize uint64) ([]byte, error) { - return os.ReadFile(filepath.Join(s.path, layout.TilePath(level, index, logSize))) +func (s *Storage) ReadTile(_ context.Context, level, index uint64, p uint8) ([]byte, error) { + return os.ReadFile(filepath.Join(s.path, layout.TilePath(level, index, p))) } // sequenceBatch writes the entries from the provided batch into the entry bundle files of the log. @@ -194,7 +194,7 @@ func (s *Storage) sequenceBatch(ctx context.Context, entries []*tessera.Entry) e bundleIndex, entriesInBundle := seq/uint64(256), seq%uint64(256) if entriesInBundle > 0 { // If the latest bundle is partial, we need to read the data it contains in for our newer, larger, bundle. - part, err := s.ReadEntryBundle(ctx, bundleIndex, s.curSize) + part, err := s.ReadEntryBundle(ctx, bundleIndex, uint8(s.curSize%layout.EntryBundleWidth)) if err != nil { return err } @@ -203,7 +203,7 @@ func (s *Storage) sequenceBatch(ctx context.Context, entries []*tessera.Entry) e } } writeBundle := func(bundleIndex uint64) error { - bf := filepath.Join(s.path, s.entriesPath(bundleIndex, newSize)) + bf := filepath.Join(s.path, s.entriesPath(bundleIndex, uint8(newSize%layout.EntryBundleWidth))) if err := os.MkdirAll(filepath.Dir(bf), dirPerm); err != nil { return fmt.Errorf("failed to make entries directory structure: %w", err) } @@ -287,7 +287,7 @@ func (s *Storage) doIntegrate(ctx context.Context, fromSeq uint64, entries []sto func (s *Storage) readTiles(ctx context.Context, tileIDs []storage.TileID, treeSize uint64) ([]*api.HashTile, error) { r := make([]*api.HashTile, 0, len(tileIDs)) for _, id := range tileIDs { - t, err := s.readTile(ctx, id.Level, id.Index, treeSize) + t, err := s.readTile(ctx, id.Level, id.Index, layout.PartialTileSize(id.Level, id.Index, treeSize)) if err != nil { return nil, err } @@ -299,8 +299,8 @@ func (s *Storage) readTiles(ctx context.Context, tileIDs []storage.TileID, treeS // readTile returns the parsed tile at the given tile-level and tile-index. // If no complete tile exists at that location, it will attempt to find a // partial tile for the given tree size at that location. -func (s *Storage) readTile(ctx context.Context, level, index, logSize uint64) (*api.HashTile, error) { - t, err := s.ReadTile(ctx, level, index, logSize) +func (s *Storage) readTile(ctx context.Context, level, index uint64, p uint8) (*api.HashTile, error) { + t, err := s.ReadTile(ctx, level, index, p) if err != nil { if errors.Is(err, os.ErrNotExist) { // We'll signal to higher levels that it wasn't found by retuning a nil for this tile. @@ -331,7 +331,7 @@ func (s *Storage) storeTile(_ context.Context, level, index, logSize uint64, til return fmt.Errorf("failed to marshal tile: %w", err) } - tPath := filepath.Join(s.path, layout.TilePath(level, index, logSize)) + tPath := filepath.Join(s.path, layout.TilePath(level, index, layout.PartialTileSize(level, index, logSize))) tDir := filepath.Dir(tPath) if err := os.MkdirAll(tDir, dirPerm); err != nil { return fmt.Errorf("failed to create directory %q: %w", tDir, err) From 6d75e8320d67569dbb866741da845275a7dc159e Mon Sep 17 00:00:00 2001 From: Al Cutter Date: Mon, 9 Dec 2024 12:40:59 +0000 Subject: [PATCH 12/17] Storage implementations enforce minimum checkpoint intervals (#394) --- await.go | 4 +++- cmd/examples/posix-oneshot/main.go | 2 +- storage/aws/aws.go | 9 +++++++-- storage/gcp/gcp.go | 13 +++++++++++-- storage/mysql/mysql.go | 6 ++++++ storage/mysql/mysql_test.go | 4 ++-- storage/posix/files.go | 5 +++++ 7 files changed, 35 insertions(+), 8 deletions(-) diff --git a/await.go b/await.go index 8f0b6c55..59f08c18 100644 --- a/await.go +++ b/await.go @@ -16,7 +16,9 @@ package tessera import ( "context" + "errors" "fmt" + "os" "sync" "time" @@ -111,7 +113,7 @@ func (a *IntegrationAwaiter) pollLoop(ctx context.Context, readCheckpoint func(c // Note that for now, this releases all clients in the event of a single failure. // If this causes problems, this could be changed to attempt retries. rawCp, err := readCheckpoint(ctx) - if err != nil { + if err != nil && !errors.Is(err, os.ErrNotExist) { a.releaseClientsErr(fmt.Errorf("readCheckpoint: %v", err)) continue } diff --git a/cmd/examples/posix-oneshot/main.go b/cmd/examples/posix-oneshot/main.go index c794fe0c..ad30279d 100644 --- a/cmd/examples/posix-oneshot/main.go +++ b/cmd/examples/posix-oneshot/main.go @@ -46,7 +46,7 @@ const ( // Since this is a short-lived command-line tool, we set this to a relatively low value so that // the tool can publish the new checkpoint and exit relatively quickly after integrating the entries // into the tree. - checkpointInterval = 500 * time.Millisecond + checkpointInterval = time.Second ) // entryInfo binds the actual bytes to be added as a leaf with a diff --git a/storage/aws/aws.go b/storage/aws/aws.go index 6950db1b..19e3b954 100644 --- a/storage/aws/aws.go +++ b/storage/aws/aws.go @@ -59,8 +59,10 @@ import ( ) const ( - logContType = "application/octet-stream" - ckptContType = "text/plain; charset=utf-8" + entryBundleSize = 256 + logContType = "application/octet-stream" + ckptContType = "text/plain; charset=utf-8" + minCheckpointInterval = time.Second DefaultPushbackMaxOutstanding = 4096 DefaultIntegrationSizeLimit = 5 * 4096 @@ -128,6 +130,9 @@ func New(ctx context.Context, cfg Config, opts ...func(*options.StorageOptions)) if opt.PushbackMaxOutstanding == 0 { opt.PushbackMaxOutstanding = DefaultPushbackMaxOutstanding } + if opt.CheckpointInterval < minCheckpointInterval { + return nil, fmt.Errorf("requested CheckpointInterval (%v) is less than minimum permitted %v", opt.CheckpointInterval, minCheckpointInterval) + } sdkConfig, err := config.LoadDefaultConfig(ctx) if err != nil { diff --git a/storage/gcp/gcp.go b/storage/gcp/gcp.go index 6947f2fa..3194f402 100644 --- a/storage/gcp/gcp.go +++ b/storage/gcp/gcp.go @@ -59,8 +59,14 @@ import ( ) const ( - logContType = "application/octet-stream" - ckptContType = "text/plain; charset=utf-8" + entryBundleSize = 256 + logContType = "application/octet-stream" + ckptContType = "text/plain; charset=utf-8" + // minCheckpointInterval is the shortest permitted interval between updating published checkpoints. + // GCS has a rate limit 1 update per second for individual objects, but we've observed that attempting + // to update at exactly that rate still results in the occasional refusal, so bake in a little wiggle + // room. + minCheckpointInterval = 1200 * time.Millisecond DefaultPushbackMaxOutstanding = 4096 DefaultIntegrationSizeLimit = 5 * 4096 @@ -120,6 +126,9 @@ func New(ctx context.Context, cfg Config, opts ...func(*options.StorageOptions)) if opt.PushbackMaxOutstanding == 0 { opt.PushbackMaxOutstanding = DefaultPushbackMaxOutstanding } + if opt.CheckpointInterval < minCheckpointInterval { + return nil, fmt.Errorf("requested CheckpointInterval (%v) is less than minimum permitted %v", opt.CheckpointInterval, minCheckpointInterval) + } c, err := gcs.NewClient(ctx, gcs.WithJSONReads()) if err != nil { diff --git a/storage/mysql/mysql.go b/storage/mysql/mysql.go index 7c17af13..c6e0d04a 100644 --- a/storage/mysql/mysql.go +++ b/storage/mysql/mysql.go @@ -51,6 +51,8 @@ const ( checkpointID = 0 treeStateID = 0 entryBundleSize = 256 + + minCheckpointInterval = time.Second ) // Storage is a MySQL-based storage implementation for Tessera. @@ -67,6 +69,10 @@ type Storage struct { // Note that `tessera.WithCheckpointSigner()` is mandatory in the `opts` argument. func New(ctx context.Context, db *sql.DB, opts ...func(*options.StorageOptions)) (*Storage, error) { opt := storage.ResolveStorageOptions(opts...) + if opt.CheckpointInterval < minCheckpointInterval { + return nil, fmt.Errorf("requested CheckpointInterval too low - %v < %v", opt.CheckpointInterval, minCheckpointInterval) + } + s := &Storage{ db: db, newCheckpoint: opt.NewCP, diff --git a/storage/mysql/mysql_test.go b/storage/mysql/mysql_test.go index ada8f85d..60f88c12 100644 --- a/storage/mysql/mysql_test.go +++ b/storage/mysql/mysql_test.go @@ -465,10 +465,10 @@ func newTestMySQLStorage(t *testing.T, ctx context.Context) *mysql.Storage { s, err := mysql.New(ctx, testDB, tessera.WithCheckpointSigner(noteSigner), - tessera.WithCheckpointInterval(200*time.Millisecond), + tessera.WithCheckpointInterval(time.Second), tessera.WithBatching(128, 100*time.Millisecond)) if err != nil { - t.Errorf("Failed to create mysql.Storage: %v", err) + t.Fatalf("Failed to create mysql.Storage: %v", err) } return s diff --git a/storage/posix/files.go b/storage/posix/files.go index 4fca1a4c..e2981ebb 100644 --- a/storage/posix/files.go +++ b/storage/posix/files.go @@ -39,6 +39,8 @@ const ( dirPerm = 0o755 filePerm = 0o644 stateDir = ".state" + + minCheckpointInterval = time.Second ) // Storage implements storage functions for a POSIX filesystem. @@ -64,6 +66,9 @@ type NewTreeFunc func(size uint64, root []byte) error // - create must only be set when first creating the log, and will create the directory structure and an empty checkpoint func New(ctx context.Context, path string, create bool, opts ...func(*options.StorageOptions)) (*Storage, error) { opt := storage.ResolveStorageOptions(opts...) + if opt.CheckpointInterval < minCheckpointInterval { + return nil, fmt.Errorf("requested CheckpointInterval (%v) is less than minimum permitted %v", opt.CheckpointInterval, minCheckpointInterval) + } r := &Storage{ path: path, From 0d5a7706fd1e7bee4d56d18d9e331eac417f682e Mon Sep 17 00:00:00 2001 From: Al Cutter Date: Mon, 9 Dec 2024 13:21:56 +0000 Subject: [PATCH 13/17] Just use layout.PartialTileSize (#401) --- client/client.go | 2 +- storage/gcp/gcp_test.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/client/client.go b/client/client.go index b3347a4f..5bb5d92c 100644 --- a/client/client.go +++ b/client/client.go @@ -286,7 +286,7 @@ func (n *nodeCache) GetNode(ctx context.Context, id compact.NodeID) ([]byte, err // GetEntryBundle fetches the entry bundle at the given _tile index_. func GetEntryBundle(ctx context.Context, f EntryBundleFetcherFunc, i, logSize uint64) (api.EntryBundle, error) { bundle := api.EntryBundle{} - sRaw, err := f(ctx, i, uint8(logSize%layout.EntryBundleWidth)) + sRaw, err := f(ctx, i, layout.PartialTileSize(0, i, logSize)) if err != nil { if errors.Is(err, os.ErrNotExist) { return bundle, fmt.Errorf("leaf bundle at index %d not found: %v", i, err) diff --git a/storage/gcp/gcp_test.go b/storage/gcp/gcp_test.go index 66feddc8..682cf1e6 100644 --- a/storage/gcp/gcp_test.go +++ b/storage/gcp/gcp_test.go @@ -288,13 +288,13 @@ func TestBundleRoundtrip(t *testing.T) { t.Fatalf("setEntryBundle: %v", err) } - expPath := layout.EntriesPath(test.index, uint8(test.logSize%layout.EntryBundleWidth)) + expPath := layout.EntriesPath(test.index, layout.PartialTileSize(0, test.index, test.logSize)) _, ok := m.mem[expPath] if !ok { t.Fatalf("want bundle at %v but found none", expPath) } - got, err := s.getEntryBundle(ctx, test.index, uint8(test.logSize%layout.EntryBundleWidth)) + got, err := s.getEntryBundle(ctx, test.index, layout.PartialTileSize(0, test.index, test.logSize)) if err != nil { t.Fatalf("getEntryBundle: %v", err) } From 4eb23e992796100b69403ec7cd7d6f5c921ecc81 Mon Sep 17 00:00:00 2001 From: Al Cutter Date: Mon, 9 Dec 2024 14:09:55 +0000 Subject: [PATCH 14/17] Add CacheControl headers to GCP (#400) --- storage/gcp/gcp.go | 19 +++++++++++-------- storage/gcp/gcp_test.go | 4 ++-- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/storage/gcp/gcp.go b/storage/gcp/gcp.go index 3194f402..bf4e95a7 100644 --- a/storage/gcp/gcp.go +++ b/storage/gcp/gcp.go @@ -59,15 +59,17 @@ import ( ) const ( - entryBundleSize = 256 - logContType = "application/octet-stream" - ckptContType = "text/plain; charset=utf-8" // minCheckpointInterval is the shortest permitted interval between updating published checkpoints. // GCS has a rate limit 1 update per second for individual objects, but we've observed that attempting // to update at exactly that rate still results in the occasional refusal, so bake in a little wiggle // room. minCheckpointInterval = 1200 * time.Millisecond + logContType = "application/octet-stream" + ckptContType = "text/plain; charset=utf-8" + logCacheControl = "max-age=604800,immutable" + ckptCacheControl = "no-cache" + DefaultPushbackMaxOutstanding = 4096 DefaultIntegrationSizeLimit = 5 * 4096 ) @@ -88,7 +90,7 @@ type Storage struct { // objStore describes a type which can store and retrieve objects. type objStore interface { getObject(ctx context.Context, obj string) ([]byte, int64, error) - setObject(ctx context.Context, obj string, data []byte, cond *gcs.Conditions, contType string) error + setObject(ctx context.Context, obj string, data []byte, cond *gcs.Conditions, contType string, cacheCtl string) error lastModified(ctx context.Context, obj string) (time.Time, error) } @@ -270,7 +272,7 @@ func (s *Storage) publishCheckpoint(ctx context.Context, minStaleness time.Durat return fmt.Errorf("newCP: %v", err) } - if err := s.objStore.setObject(ctx, layout.CheckpointPath, cpRaw, nil, ckptContType); err != nil { + if err := s.objStore.setObject(ctx, layout.CheckpointPath, cpRaw, nil, ckptContType, ckptCacheControl); err != nil { return fmt.Errorf("writeCheckpoint: %v", err) } return nil @@ -288,7 +290,7 @@ func (s *Storage) setTile(ctx context.Context, level, index, logSize uint64, til tPath := layout.TilePath(level, index, layout.PartialTileSize(level, index, logSize)) klog.V(2).Infof("StoreTile: %s (%d entries)", tPath, len(tile.Nodes)) - return s.objStore.setObject(ctx, tPath, data, &gcs.Conditions{DoesNotExist: true}, logContType) + return s.objStore.setObject(ctx, tPath, data, &gcs.Conditions{DoesNotExist: true}, logContType, logCacheControl) } // getTiles returns the tiles with the given tile-coords for the specified log size. @@ -351,7 +353,7 @@ func (s *Storage) setEntryBundle(ctx context.Context, bundleIndex uint64, p uint // Note that setObject does an idempotent interpretation of DoesNotExist - it only // returns an error if the named object exists _and_ contains different data to what's // passed in here. - if err := s.objStore.setObject(ctx, objName, bundleRaw, &gcs.Conditions{DoesNotExist: true}, logContType); err != nil { + if err := s.objStore.setObject(ctx, objName, bundleRaw, &gcs.Conditions{DoesNotExist: true}, logContType, logCacheControl); err != nil { return fmt.Errorf("setObject(%q): %v", objName, err) } @@ -748,7 +750,7 @@ func (s *gcsStorage) getObject(ctx context.Context, obj string) ([]byte, int64, // Note that when preconditions are specified and are not met, an error will be returned *unless* // the currently stored data is bit-for-bit identical to the data to-be-written. // This is intended to provide idempotentency for writes. -func (s *gcsStorage) setObject(ctx context.Context, objName string, data []byte, cond *gcs.Conditions, contType string) error { +func (s *gcsStorage) setObject(ctx context.Context, objName string, data []byte, cond *gcs.Conditions, contType string, cacheCtl string) error { bkt := s.gcsClient.Bucket(s.bucket) obj := bkt.Object(objName) @@ -760,6 +762,7 @@ func (s *gcsStorage) setObject(ctx context.Context, objName string, data []byte, w = obj.If(*cond).NewWriter(ctx) } w.ObjectAttrs.ContentType = contType + w.ObjectAttrs.CacheControl = cacheCtl if _, err := w.Write(data); err != nil { return fmt.Errorf("failed to write object %q to bucket %q: %w", objName, s.bucket, err) } diff --git a/storage/gcp/gcp_test.go b/storage/gcp/gcp_test.go index 682cf1e6..ed44df86 100644 --- a/storage/gcp/gcp_test.go +++ b/storage/gcp/gcp_test.go @@ -347,7 +347,7 @@ func TestPublishCheckpoint(t *testing.T) { t.Fatalf("storage.init: %v", err) } cpOld := []byte("bananas") - if err := m.setObject(ctx, layout.CheckpointPath, cpOld, nil, ""); err != nil { + if err := m.setObject(ctx, layout.CheckpointPath, cpOld, nil, "", ""); err != nil { t.Fatalf("setObject(bananas): %v", err) } m.lMod = test.cpModifiedAt @@ -394,7 +394,7 @@ func (m *memObjStore) getObject(_ context.Context, obj string) ([]byte, int64, e } // TODO(phboneff): add content type tests -func (m *memObjStore) setObject(_ context.Context, obj string, data []byte, cond *gcs.Conditions, _ string) error { +func (m *memObjStore) setObject(_ context.Context, obj string, data []byte, cond *gcs.Conditions, _, _ string) error { m.Lock() defer m.Unlock() From 98766aaf6903460b58ab0bfa1cf7296fefaf2a13 Mon Sep 17 00:00:00 2001 From: Al Cutter Date: Mon, 9 Dec 2024 16:20:39 +0000 Subject: [PATCH 15/17] Add test for GetEntryBundle (#403) --- client/client_test.go | 41 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/client/client_test.go b/client/client_test.go index eb67b2a8..10e34b1b 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -343,3 +343,44 @@ func TestHandleZeroRoot(t *testing.T) { t.Fatalf("NewProofBuilder: %v", err) } } + +func TestGetEntryBundleAddressing(t *testing.T) { + for _, test := range []struct { + name string + idx, logSize uint64 + wantPartialTileSize uint8 + }{ + { + name: "works - partial tile", + idx: 0, + logSize: 34, + wantPartialTileSize: 34, + }, + { + name: "works - full tile", + idx: 1, + logSize: 256*2 + 45, + wantPartialTileSize: 0, + }, + } { + t.Run(test.name, func(t *testing.T) { + gotIdx := uint64(0) + gotTileSize := uint8(0) + f := func(_ context.Context, i uint64, sz uint8) ([]byte, error) { + gotIdx = i + gotTileSize = sz + return []byte{}, nil + } + _, err := GetEntryBundle(context.Background(), f, test.idx, test.logSize) + if err != nil { + t.Fatalf("GetEntryBundle: %v", err) + } + if gotIdx != test.idx { + t.Errorf("f got idx %d, want %d", gotIdx, test.idx) + } + if gotTileSize != test.wantPartialTileSize { + t.Errorf("f got tileSize %d, want %d", gotTileSize, test.wantPartialTileSize) + } + }) + } +} From c30d02cb4c4d82b0dc2f6c2423815714b12bb559 Mon Sep 17 00:00:00 2001 From: Martin Hutchinson Date: Mon, 9 Dec 2024 16:52:03 +0000 Subject: [PATCH 16/17] [MySQL] complete entry bundle implementation (#398) MySQL API now checks that the number of entries returned in a bundle is not smaller than the number requested. Now this is fixed, the conformance personality is updated to set cache headers on the entry bundles to allow aggressive caching. Also fixed the checkpoint implementation to never cache, and handle not found properly. This fixes #364. While making a change to the SQL schema, took the opportunity to set a couple of other integers to the smallest value needed. --- cmd/conformance/mysql/main.go | 17 ++++++++------- storage/mysql/mysql.go | 40 +++++++++++++++++++++++------------ storage/mysql/mysql_test.go | 4 ++-- storage/mysql/schema.sql | 8 ++++--- 4 files changed, 42 insertions(+), 27 deletions(-) diff --git a/cmd/conformance/mysql/main.go b/cmd/conformance/mysql/main.go index 00e24b29..16eb7959 100644 --- a/cmd/conformance/mysql/main.go +++ b/cmd/conformance/mysql/main.go @@ -143,15 +143,19 @@ func configureTilesReadAPI(mux *http.ServeMux, storage *mysql.Storage) { mux.HandleFunc("GET /checkpoint", func(w http.ResponseWriter, r *http.Request) { checkpoint, err := storage.ReadCheckpoint(r.Context()) if err != nil { + if errors.Is(err, fs.ErrNotExist) { + w.WriteHeader(http.StatusNotFound) + return + } klog.Errorf("/checkpoint: %v", err) w.WriteHeader(http.StatusInternalServerError) return } - if checkpoint == nil { - w.WriteHeader(http.StatusNotFound) - return - } + // Don't cache checkpoints as the endpoint refreshes regularly. + // A personality that wanted to _could_ set a small cache time here which was no higher + // than the checkpoint publish interval. + w.Header().Set("Cache-Control", "no-cache") if _, err := w.Write(checkpoint); err != nil { klog.Errorf("/checkpoint: %v", err) return @@ -207,10 +211,7 @@ func configureTilesReadAPI(mux *http.ServeMux, storage *mysql.Storage) { return } - // TODO: Add immutable Cache-Control header. - // Only do this once we're sure we're returning the right number of entries - // Currently a user can request a full tile and we can return a partial tile. - // If cache headers were set then this could cause caches to be poisoned. + w.Header().Set("Cache-Control", "public, max-age=31536000, immutable") if _, err := w.Write(entryBundle); err != nil { klog.Errorf("/tile/entries/{index...}: %v", err) diff --git a/storage/mysql/mysql.go b/storage/mysql/mysql.go index c6e0d04a..537db2db 100644 --- a/storage/mysql/mysql.go +++ b/storage/mysql/mysql.go @@ -31,6 +31,7 @@ import ( "github.com/transparency-dev/merkle/rfc6962" tessera "github.com/transparency-dev/trillian-tessera" "github.com/transparency-dev/trillian-tessera/api" + "github.com/transparency-dev/trillian-tessera/api/layout" options "github.com/transparency-dev/trillian-tessera/internal/options" storage "github.com/transparency-dev/trillian-tessera/storage/internal" "k8s.io/klog/v2" @@ -45,8 +46,8 @@ const ( replaceTreeStateSQL = "REPLACE INTO `TreeState` (`id`, `size`, `root`) VALUES (?, ?, ?)" selectSubtreeByLevelAndIndexSQL = "SELECT `nodes` FROM `Subtree` WHERE `level` = ? AND `index` = ?" replaceSubtreeSQL = "REPLACE INTO `Subtree` (`level`, `index`, `nodes`) VALUES (?, ?, ?)" - selectTiledLeavesSQL = "SELECT `data` FROM `TiledLeaves` WHERE `tile_index` = ?" - replaceTiledLeavesSQL = "REPLACE INTO `TiledLeaves` (`tile_index`, `data`) VALUES (?, ?)" + selectTiledLeavesSQL = "SELECT `size`, `data` FROM `TiledLeaves` WHERE `tile_index` = ?" + replaceTiledLeavesSQL = "REPLACE INTO `TiledLeaves` (`tile_index`, `size`, `data`) VALUES (?, ?, ?)" checkpointID = 0 treeStateID = 0 @@ -290,31 +291,38 @@ func (s *Storage) writeTile(ctx context.Context, tx *sql.Tx, level, index uint64 // ReadEntryBundle returns the log entries at the given index. // If the entry bundle is not found, it returns os.ErrNotExist. // -// TODO: Handle the following scenarios: -// 1. Full tile request with full tile output: Return full tile. -// 2. Full tile request with partial tile output: Return error. -// 3. Partial tile request with full/larger partial tile output: Return trimmed partial tile with correct tile width. -// 4. Partial tile request with partial tile (same width) output: Return partial tile. -// 5. Partial tile request with smaller partial tile output: Return error. +// Note that if a partial tile is requested, but a larger tile is available, this +// will return the largest tile available. This could be trimmed to return only the +// number of entries specifically requested if this behaviour becomes problematic. func (s *Storage) ReadEntryBundle(ctx context.Context, index uint64, p uint8) ([]byte, error) { row := s.db.QueryRowContext(ctx, selectTiledLeavesSQL, index) if err := row.Err(); err != nil { return nil, err } + var size uint32 var entryBundle []byte - if err := row.Scan(&entryBundle); err != nil { + if err := row.Scan(&size, &entryBundle); err != nil { if err == sql.ErrNoRows { return nil, os.ErrNotExist } return nil, fmt.Errorf("scan entry bundle: %v", err) } + requestedSize := uint32(p) + if requestedSize == 0 { + requestedSize = layout.EntryBundleWidth + } + + if requestedSize > size { + return nil, fmt.Errorf("bundle with %d entries requested, but only %d available: %w", requestedSize, size, os.ErrNotExist) + } + return entryBundle, nil } -func (s *Storage) writeEntryBundle(ctx context.Context, tx *sql.Tx, index uint64, entryBundle []byte) error { - if _, err := tx.ExecContext(ctx, replaceTiledLeavesSQL, index, entryBundle); err != nil { +func (s *Storage) writeEntryBundle(ctx context.Context, tx *sql.Tx, index uint64, size uint32, entryBundle []byte) error { + if _, err := tx.ExecContext(ctx, replaceTiledLeavesSQL, index, size, entryBundle); err != nil { klog.Errorf("Failed to execute replaceTiledLeavesSQL: %v", err) return err } @@ -452,10 +460,14 @@ func (s *Storage) integrate(ctx context.Context, tx *sql.Tx, fromSeq uint64, ent return fmt.Errorf("query tiled leaves: %v", err) } + var size uint32 var partialEntryBundle []byte - if err := row.Scan(&partialEntryBundle); err != nil { + if err := row.Scan(&size, &partialEntryBundle); err != nil { return fmt.Errorf("scan partial entry bundle: %w", err) } + if size != uint32(entriesInBundle) { + return fmt.Errorf("expected %d entries in storage but found %d", entriesInBundle, size) + } if _, err := bundleWriter.Write(partialEntryBundle); err != nil { return fmt.Errorf("write partial entry bundle: %w", err) @@ -471,7 +483,7 @@ func (s *Storage) integrate(ctx context.Context, tx *sql.Tx, fromSeq uint64, ent // This bundle is full, so we need to write it out. if entriesInBundle == entryBundleSize { - if err := s.writeEntryBundle(ctx, tx, bundleIndex, bundleWriter.Bytes()); err != nil { + if err := s.writeEntryBundle(ctx, tx, bundleIndex, uint32(entriesInBundle), bundleWriter.Bytes()); err != nil { return fmt.Errorf("writeEntryBundle: %w", err) } @@ -485,7 +497,7 @@ func (s *Storage) integrate(ctx context.Context, tx *sql.Tx, fromSeq uint64, ent // If we have a partial bundle remaining once we've added all the entries from the batch, // this needs writing out too. if entriesInBundle > 0 { - if err := s.writeEntryBundle(ctx, tx, bundleIndex, bundleWriter.Bytes()); err != nil { + if err := s.writeEntryBundle(ctx, tx, bundleIndex, uint32(entriesInBundle), bundleWriter.Bytes()); err != nil { return fmt.Errorf("writeEntryBundle: %w", err) } } diff --git a/storage/mysql/mysql_test.go b/storage/mysql/mysql_test.go index 60f88c12..60c0aae4 100644 --- a/storage/mysql/mysql_test.go +++ b/storage/mysql/mysql_test.go @@ -438,9 +438,9 @@ func TestEntryBundleRoundTrip(t *testing.T) { if err != nil { t.Errorf("Add got err: %v", err) } - entryBundleRaw, err := s.ReadEntryBundle(ctx, entryIndex/256, uint8(entryIndex%layout.TileWidth)) + entryBundleRaw, err := s.ReadEntryBundle(ctx, entryIndex/256, layout.PartialTileSize(0, entryIndex, entryIndex+1)) if err != nil { - t.Errorf("ReadEntryBundle got err: %v", err) + t.Fatalf("ReadEntryBundle got err: %v", err) } bundle := api.EntryBundle{} diff --git a/storage/mysql/schema.sql b/storage/mysql/schema.sql index 955c5422..82c281c9 100644 --- a/storage/mysql/schema.sql +++ b/storage/mysql/schema.sql @@ -19,7 +19,7 @@ -- on an indepentent timeframe to the internal updating of state. CREATE TABLE IF NOT EXISTS `Checkpoint` ( -- id is expected to be always 0 to maintain a maximum of a single row. - `id` INT UNSIGNED NOT NULL, + `id` TINYINT UNSIGNED NOT NULL, -- note is the text signed by one or more keys in the checkpoint format. See https://c2sp.org/tlog-checkpoint and https://c2sp.org/signed-note. `note` MEDIUMBLOB NOT NULL, -- published_at is the millisecond UNIX timestamp of when this row was written. @@ -31,7 +31,7 @@ CREATE TABLE IF NOT EXISTS `Checkpoint` ( -- This is not the same thing as a Checkpoint, which is a signed commitment to such a state. CREATE TABLE IF NOT EXISTS `TreeState` ( -- id is expected to be always 0 to maintain a maximum of a single row. - `id` INT UNSIGNED NOT NULL, + `id` TINYINT UNSIGNED NOT NULL, -- size is the extent of the currently integrated tree. `size` BIGINT UNSIGNED NOT NULL, -- root is the root hash of the tree at the size stored in `size`. @@ -42,7 +42,7 @@ CREATE TABLE IF NOT EXISTS `TreeState` ( -- "Subtree" table is an internal tile consisting of hashes. There is one row for each internal tile, and this is updated until it is completed, at which point it is immutable. CREATE TABLE IF NOT EXISTS `Subtree` ( -- level is the level of the tile. - `level` INT UNSIGNED NOT NULL, + `level` TINYINT UNSIGNED NOT NULL, -- index is the index of the tile. `index` BIGINT UNSIGNED NOT NULL, -- nodes stores the hashes of the leaves. @@ -53,6 +53,8 @@ CREATE TABLE IF NOT EXISTS `Subtree` ( -- "TiledLeaves" table stores the data committed to by the leaves of the tree. Follows the same evolution as Subtree. CREATE TABLE IF NOT EXISTS `TiledLeaves` ( `tile_index` BIGINT UNSIGNED NOT NULL, + -- size is the number of entries serialized into this leaf bundle. + `size` SMALLINT UNSIGNED NOT NULL, `data` LONGBLOB NOT NULL, PRIMARY KEY(`tile_index`) ); From cfc5fe29bd7faaa2f04a3cd7de2acb466ad104ec Mon Sep 17 00:00:00 2001 From: Martin Hutchinson Date: Mon, 9 Dec 2024 16:55:02 +0000 Subject: [PATCH 17/17] Set up benchmark action (#405) Towards #338. --- .github/workflows/benchmark-go.yml | 36 ++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) create mode 100644 .github/workflows/benchmark-go.yml diff --git a/.github/workflows/benchmark-go.yml b/.github/workflows/benchmark-go.yml new file mode 100644 index 00000000..b07517c8 --- /dev/null +++ b/.github/workflows/benchmark-go.yml @@ -0,0 +1,36 @@ +name: Benchmark Go + +on: + push: + branches: + - main + +permissions: + # deployments permission to deploy GitHub pages website + deployments: write + # contents permission to update benchmark contents in gh-pages branch + contents: write + # allow posting comments to pull request + pull-requests: write + +jobs: + benchmark: + name: Performance regression check + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-go@v4 + with: + go-version-file: go.mod + - name: Run benchmark + run: set -o pipefail; go test ./... -benchmem -run=^$ -bench . | tee output.txt + - name: Store benchmark result + uses: benchmark-action/github-action-benchmark@v1.20.4 + with: + tool: 'go' + output-file-path: output.txt + github-token: ${{ secrets.GITHUB_TOKEN }} + auto-push: true + alert-threshold: "150%" + comment-on-alert: true + fail-on-alert: true