Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MySQL] complete entry bundle implementation #398

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions cmd/conformance/mysql/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,15 +143,19 @@ func configureTilesReadAPI(mux *http.ServeMux, storage *mysql.Storage) {
mux.HandleFunc("GET /checkpoint", func(w http.ResponseWriter, r *http.Request) {
checkpoint, err := storage.ReadCheckpoint(r.Context())
if err != nil {
if errors.Is(err, fs.ErrNotExist) {
w.WriteHeader(http.StatusNotFound)
return
}
klog.Errorf("/checkpoint: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
if checkpoint == nil {
w.WriteHeader(http.StatusNotFound)
return
}

// Don't cache checkpoints as the endpoint refreshes regularly.
// A personality that wanted to _could_ set a small cache time here which was no higher
// than the checkpoint publish interval.
w.Header().Set("Cache-Control", "no-cache")
if _, err := w.Write(checkpoint); err != nil {
klog.Errorf("/checkpoint: %v", err)
return
Expand Down Expand Up @@ -207,10 +211,7 @@ func configureTilesReadAPI(mux *http.ServeMux, storage *mysql.Storage) {
return
}

// TODO: Add immutable Cache-Control header.
// Only do this once we're sure we're returning the right number of entries
// Currently a user can request a full tile and we can return a partial tile.
// If cache headers were set then this could cause caches to be poisoned.
w.Header().Set("Cache-Control", "public, max-age=31536000, immutable")

if _, err := w.Write(entryBundle); err != nil {
klog.Errorf("/tile/entries/{index...}: %v", err)
Expand Down
40 changes: 26 additions & 14 deletions storage/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/transparency-dev/merkle/rfc6962"
tessera "github.com/transparency-dev/trillian-tessera"
"github.com/transparency-dev/trillian-tessera/api"
"github.com/transparency-dev/trillian-tessera/api/layout"
options "github.com/transparency-dev/trillian-tessera/internal/options"
storage "github.com/transparency-dev/trillian-tessera/storage/internal"
"k8s.io/klog/v2"
Expand All @@ -45,8 +46,8 @@ const (
replaceTreeStateSQL = "REPLACE INTO `TreeState` (`id`, `size`, `root`) VALUES (?, ?, ?)"
selectSubtreeByLevelAndIndexSQL = "SELECT `nodes` FROM `Subtree` WHERE `level` = ? AND `index` = ?"
replaceSubtreeSQL = "REPLACE INTO `Subtree` (`level`, `index`, `nodes`) VALUES (?, ?, ?)"
selectTiledLeavesSQL = "SELECT `data` FROM `TiledLeaves` WHERE `tile_index` = ?"
replaceTiledLeavesSQL = "REPLACE INTO `TiledLeaves` (`tile_index`, `data`) VALUES (?, ?)"
selectTiledLeavesSQL = "SELECT `size`, `data` FROM `TiledLeaves` WHERE `tile_index` = ?"
replaceTiledLeavesSQL = "REPLACE INTO `TiledLeaves` (`tile_index`, `size`, `data`) VALUES (?, ?, ?)"

checkpointID = 0
treeStateID = 0
Expand Down Expand Up @@ -290,31 +291,38 @@ func (s *Storage) writeTile(ctx context.Context, tx *sql.Tx, level, index uint64
// ReadEntryBundle returns the log entries at the given index.
// If the entry bundle is not found, it returns os.ErrNotExist.
//
// TODO: Handle the following scenarios:
// 1. Full tile request with full tile output: Return full tile.
// 2. Full tile request with partial tile output: Return error.
// 3. Partial tile request with full/larger partial tile output: Return trimmed partial tile with correct tile width.
// 4. Partial tile request with partial tile (same width) output: Return partial tile.
// 5. Partial tile request with smaller partial tile output: Return error.
// Note that if a partial tile is requested, but a larger tile is available, this
// will return the largest tile available. This could be trimmed to return only the
// number of entries specifically requested if this behaviour becomes problematic.
func (s *Storage) ReadEntryBundle(ctx context.Context, index uint64, p uint8) ([]byte, error) {
row := s.db.QueryRowContext(ctx, selectTiledLeavesSQL, index)
if err := row.Err(); err != nil {
return nil, err
}

var size uint32
var entryBundle []byte
if err := row.Scan(&entryBundle); err != nil {
if err := row.Scan(&size, &entryBundle); err != nil {
if err == sql.ErrNoRows {
return nil, os.ErrNotExist
}
return nil, fmt.Errorf("scan entry bundle: %v", err)
}

requestedSize := uint32(p)
if requestedSize == 0 {
requestedSize = layout.EntryBundleWidth
}

if requestedSize > size {
return nil, fmt.Errorf("bundle with %d entries requested, but only %d available: %w", requestedSize, size, os.ErrNotExist)
}

return entryBundle, nil
}

func (s *Storage) writeEntryBundle(ctx context.Context, tx *sql.Tx, index uint64, entryBundle []byte) error {
if _, err := tx.ExecContext(ctx, replaceTiledLeavesSQL, index, entryBundle); err != nil {
func (s *Storage) writeEntryBundle(ctx context.Context, tx *sql.Tx, index uint64, size uint32, entryBundle []byte) error {
if _, err := tx.ExecContext(ctx, replaceTiledLeavesSQL, index, size, entryBundle); err != nil {
klog.Errorf("Failed to execute replaceTiledLeavesSQL: %v", err)
return err
}
Expand Down Expand Up @@ -452,10 +460,14 @@ func (s *Storage) integrate(ctx context.Context, tx *sql.Tx, fromSeq uint64, ent
return fmt.Errorf("query tiled leaves: %v", err)
}

var size uint32
var partialEntryBundle []byte
if err := row.Scan(&partialEntryBundle); err != nil {
if err := row.Scan(&size, &partialEntryBundle); err != nil {
return fmt.Errorf("scan partial entry bundle: %w", err)
}
if size != uint32(entriesInBundle) {
return fmt.Errorf("expected %d entries in storage but found %d", entriesInBundle, size)
}

if _, err := bundleWriter.Write(partialEntryBundle); err != nil {
return fmt.Errorf("write partial entry bundle: %w", err)
Expand All @@ -471,7 +483,7 @@ func (s *Storage) integrate(ctx context.Context, tx *sql.Tx, fromSeq uint64, ent

// This bundle is full, so we need to write it out.
if entriesInBundle == entryBundleSize {
if err := s.writeEntryBundle(ctx, tx, bundleIndex, bundleWriter.Bytes()); err != nil {
if err := s.writeEntryBundle(ctx, tx, bundleIndex, uint32(entriesInBundle), bundleWriter.Bytes()); err != nil {
return fmt.Errorf("writeEntryBundle: %w", err)
}

Expand All @@ -485,7 +497,7 @@ func (s *Storage) integrate(ctx context.Context, tx *sql.Tx, fromSeq uint64, ent
// If we have a partial bundle remaining once we've added all the entries from the batch,
// this needs writing out too.
if entriesInBundle > 0 {
if err := s.writeEntryBundle(ctx, tx, bundleIndex, bundleWriter.Bytes()); err != nil {
if err := s.writeEntryBundle(ctx, tx, bundleIndex, uint32(entriesInBundle), bundleWriter.Bytes()); err != nil {
return fmt.Errorf("writeEntryBundle: %w", err)
}
}
Expand Down
4 changes: 2 additions & 2 deletions storage/mysql/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,9 +438,9 @@ func TestEntryBundleRoundTrip(t *testing.T) {
if err != nil {
t.Errorf("Add got err: %v", err)
}
entryBundleRaw, err := s.ReadEntryBundle(ctx, entryIndex/256, uint8(entryIndex%layout.TileWidth))
entryBundleRaw, err := s.ReadEntryBundle(ctx, entryIndex/256, layout.PartialTileSize(0, entryIndex, entryIndex+1))
if err != nil {
t.Errorf("ReadEntryBundle got err: %v", err)
t.Fatalf("ReadEntryBundle got err: %v", err)
}

bundle := api.EntryBundle{}
Expand Down
8 changes: 5 additions & 3 deletions storage/mysql/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
-- on an indepentent timeframe to the internal updating of state.
CREATE TABLE IF NOT EXISTS `Checkpoint` (
-- id is expected to be always 0 to maintain a maximum of a single row.
`id` INT UNSIGNED NOT NULL,
`id` TINYINT UNSIGNED NOT NULL,
-- note is the text signed by one or more keys in the checkpoint format. See https://c2sp.org/tlog-checkpoint and https://c2sp.org/signed-note.
`note` MEDIUMBLOB NOT NULL,
-- published_at is the millisecond UNIX timestamp of when this row was written.
Expand All @@ -31,7 +31,7 @@ CREATE TABLE IF NOT EXISTS `Checkpoint` (
-- This is not the same thing as a Checkpoint, which is a signed commitment to such a state.
CREATE TABLE IF NOT EXISTS `TreeState` (
-- id is expected to be always 0 to maintain a maximum of a single row.
`id` INT UNSIGNED NOT NULL,
`id` TINYINT UNSIGNED NOT NULL,
-- size is the extent of the currently integrated tree.
`size` BIGINT UNSIGNED NOT NULL,
-- root is the root hash of the tree at the size stored in `size`.
Expand All @@ -42,7 +42,7 @@ CREATE TABLE IF NOT EXISTS `TreeState` (
-- "Subtree" table is an internal tile consisting of hashes. There is one row for each internal tile, and this is updated until it is completed, at which point it is immutable.
CREATE TABLE IF NOT EXISTS `Subtree` (
-- level is the level of the tile.
`level` INT UNSIGNED NOT NULL,
`level` TINYINT UNSIGNED NOT NULL,
-- index is the index of the tile.
`index` BIGINT UNSIGNED NOT NULL,
-- nodes stores the hashes of the leaves.
Expand All @@ -53,6 +53,8 @@ CREATE TABLE IF NOT EXISTS `Subtree` (
-- "TiledLeaves" table stores the data committed to by the leaves of the tree. Follows the same evolution as Subtree.
CREATE TABLE IF NOT EXISTS `TiledLeaves` (
`tile_index` BIGINT UNSIGNED NOT NULL,
-- size is the number of entries serialized into this leaf bundle.
`size` SMALLINT UNSIGNED NOT NULL,
`data` LONGBLOB NOT NULL,
PRIMARY KEY(`tile_index`)
);
Loading