Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[POSIX, GCP] Separate integration from publishing checkpoints #323

Merged
merged 18 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/conformance/gcp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func main() {
gcpCfg := storageConfigFromFlags()
storage, err := gcp.New(ctx, gcpCfg,
tessera.WithCheckpointSigner(s, a...),
tessera.WithCheckpointInterval(10*time.Second),
tessera.WithBatching(1024, time.Second),
tessera.WithPushback(10*4096),
)
Expand Down
16 changes: 15 additions & 1 deletion cmd/examples/posix-oneshot/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ var (
privKeyFile = flag.String("private_key", "", "Location of private key file. If unset, uses the contents of the LOG_PRIVATE_KEY environment variable.")
)

const (
// checkpointInterval is used as the value to pass to the WithCheckpointInterval option below.
// Since this is a short-lived command-line tool, we set this to a relatively low value so that
// the tool can publish the new checkpoint and exit relatively quickly after integrating the entries
// into the tree.
checkpointInterval = 500 * time.Millisecond
)

// entryInfo binds the actual bytes to be added as a leaf with a
// user-recognisable name for the source of those bytes.
// The name is only used below in order to inform the user of the
Expand Down Expand Up @@ -77,7 +85,13 @@ func main() {
// The options provide the checkpoint signer & verifier, and batch options.
// In this case, we want to create a single batch containing all of the leaves being added in order to
// add all of these leaves without creating any intermediate checkpoints.
st, err := posix.New(ctx, *storageDir, *initialise, tessera.WithCheckpointSigner(s), tessera.WithBatching(uint(len(filesToAdd)), time.Second))
st, err := posix.New(
ctx,
*storageDir,
*initialise,
tessera.WithCheckpointSigner(s),
tessera.WithCheckpointInterval(checkpointInterval),
tessera.WithBatching(uint(len(filesToAdd)), time.Second))
if err != nil {
klog.Exitf("Failed to construct storage: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion deployment/modules/gcp/gcs/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ resource "google_spanner_database" "log_db" {
ddl = [
"CREATE TABLE SeqCoord (id INT64 NOT NULL, next INT64 NOT NULL,) PRIMARY KEY (id)",
"CREATE TABLE Seq (id INT64 NOT NULL, seq INT64 NOT NULL, v BYTES(MAX),) PRIMARY KEY (id, seq)",
"CREATE TABLE IntCoord (id INT64 NOT NULL, seq INT64 NOT NULL,) PRIMARY KEY (id)",
"CREATE TABLE IntCoord (id INT64 NOT NULL, seq INT64 NOT NULL, rootHash BYTES(32)) PRIMARY KEY (id)",
]

deletion_protection = !var.ephemeral
Expand Down
23 changes: 15 additions & 8 deletions integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,14 +139,21 @@ func TestLiveLogIntegration(t *testing.T) {
t.Errorf("entryWriter.add: %v", err)
}
entryIndexMap.Store(i, index)
checkpoint, _, _, err := client.FetchCheckpoint(ctx, logReadCP, noteVerifier, noteVerifier.Name())
if err != nil {
t.Errorf("client.FetchCheckpoint: %v", err)
}
if checkpoint == nil {
// This was a t.Fatalf but that terminates the goroutine, stopping the error being returned on the next line
t.Errorf("checkpoint not found at index: %d, test entry size: %d", index, i)
return fmt.Errorf("failed to get checkpoint after writing entry %d (assigned sequence %d)", i, index)

// Wait for the entry to be integrated, or the test to time out.
for size := uint64(0); size < index; {
time.Sleep(500 * time.Millisecond)

checkpoint, _, _, err := client.FetchCheckpoint(ctx, logReadCP, noteVerifier, noteVerifier.Name())
if err != nil {
t.Errorf("client.FetchCheckpoint: %v", err)
}
if checkpoint == nil {
// This was a t.Fatalf but that terminates the goroutine, stopping the error being returned on the next line
t.Errorf("checkpoint not found at index: %d, test entry size: %d", index, i)
return fmt.Errorf("failed to get checkpoint after writing entry %d (assigned sequence %d)", i, index)
}
size = checkpoint.Size
}
checkpoints[i+1] = *checkpoint
return err
Expand Down
2 changes: 2 additions & 0 deletions internal/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,6 @@ type StorageOptions struct {
PushbackMaxOutstanding uint

EntriesPath EntriesPathFunc

CheckpointInterval time.Duration
}
18 changes: 18 additions & 0 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,21 @@ func WithPushback(maxOutstanding uint) func(*options.StorageOptions) {
o.PushbackMaxOutstanding = maxOutstanding
}
}

// WithCheckpointInterval configures the frequency at which Tessera will attempt to create & publish
// a new checkpoint.
//
// Well behaved clients of the log will only "see" newly sequenced entries once a new checkpoint is published,
// so it's important to consider the value being set.
//
// Regularly publishing new checkpoints:
// - helps show that the log is "live", even if no entries are being added.
// - enables clients of the log to reason about how frequently they need to have their
// view of the log refreshed, which in turn helps reduce work/load across the ecosystem.
//
// Note that this option probably only makes sense for long-lived applications (e.g. HTTP servers).
func WithCheckpointInterval(interval time.Duration) func(*options.StorageOptions) {
return func(o *options.StorageOptions) {
o.CheckpointInterval = interval
}
}
113 changes: 93 additions & 20 deletions storage/gcp/gcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"cloud.google.com/go/spanner/apiv1/spannerpb"
gcs "cloud.google.com/go/storage"
"github.com/google/go-cmp/cmp"
"github.com/transparency-dev/merkle/rfc6962"
tessera "github.com/transparency-dev/trillian-tessera"
"github.com/transparency-dev/trillian-tessera/api"
"github.com/transparency-dev/trillian-tessera/api/layout"
Expand Down Expand Up @@ -73,12 +74,15 @@ type Storage struct {
objStore objStore

queue *storage.Queue

cpUpdated chan struct{}
}

// objStore describes a type which can store and retrieve objects.
type objStore interface {
getObject(ctx context.Context, obj string) ([]byte, int64, error)
setObject(ctx context.Context, obj string, data []byte, cond *gcs.Conditions, contType string) error
lastModified(ctx context.Context, obj string) (time.Time, error)
}

// sequencer describes a type which knows how to sequence entries.
Expand All @@ -92,10 +96,14 @@ type sequencer interface {
// If forceUpdate is true, then the consumeFunc should be called, with an empty slice of entries if
// necessary. This allows the log self-initialise in a transactionally safe manner.
consumeEntries(ctx context.Context, limit uint64, f consumeFunc, forceUpdate bool) (bool, error)
// currentTree returns the sequencer's view of the current tree state.
currentTree(ctx context.Context) (uint64, []byte, error)
}

// consumeFunc is the signature of a function which can consume entries from the sequencer.
type consumeFunc func(ctx context.Context, from uint64, entries []storage.SequencedEntry) error
// consumeFunc is the signature of a function which can consume entries from the sequencer and integrate
// them into the log.
// Returns the new rootHash once all passed entries have been integrated.
type consumeFunc func(ctx context.Context, from uint64, entries []storage.SequencedEntry) ([]byte, error)

// Config holds GCP project and resource configuration for a storage instance.
type Config struct {
Expand Down Expand Up @@ -130,6 +138,7 @@ func New(ctx context.Context, cfg Config, opts ...func(*options.StorageOptions))
sequencer: seq,
newCP: opt.NewCP,
entriesPath: opt.EntriesPath,
cpUpdated: make(chan struct{}),
}
r.queue = storage.NewQueue(ctx, opt.BatchMaxAge, opt.BatchMaxSize, r.sequencer.assignEntries)

Expand All @@ -154,11 +163,31 @@ func New(ctx context.Context, cfg Config, opts ...func(*options.StorageOptions))

if _, err := r.sequencer.consumeEntries(cctx, DefaultIntegrationSizeLimit, r.integrate, false); err != nil {
klog.Errorf("integrate: %v", err)
select {
case r.cpUpdated <- struct{}{}:
default:
}
}
}()
}
}()

go func(ctx context.Context, i time.Duration) {
t := time.NewTicker(i)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-r.cpUpdated:
case <-t.C:
}
if err := r.publishCheckpoint(ctx, i); err != nil {
klog.Warningf("publishCheckpoint: %v", err)
}
}
}(ctx, opt.CheckpointInterval)

return r, nil
}

Expand Down Expand Up @@ -193,13 +222,17 @@ func (s *Storage) init(ctx context.Context) error {
if err != nil {
if errors.Is(err, gcs.ErrObjectNotExist) {
// No checkpoint exists, do a forced (possibly empty) integration to create one in a safe
// way (calling updateCP directly here would not be safe as it's outside the transactional
// way (setting the checkpoint directly here would not be safe as it's outside the transactional
// framework which prevents the tree from rolling backwards or otherwise forking).
cctx, c := context.WithTimeout(ctx, 10*time.Second)
defer c()
if _, err := s.sequencer.consumeEntries(cctx, DefaultIntegrationSizeLimit, s.integrate, true); err != nil {
return fmt.Errorf("forced integrate: %v", err)
}
select {
case s.cpUpdated <- struct{}{}:
default:
}
return nil
}
return fmt.Errorf("failed to read checkpoint: %v", err)
Expand All @@ -208,11 +241,24 @@ func (s *Storage) init(ctx context.Context) error {
return nil
}

func (s *Storage) updateCP(ctx context.Context, newSize uint64, newRoot []byte) error {
cpRaw, err := s.newCP(newSize, newRoot)
func (s *Storage) publishCheckpoint(ctx context.Context, minStaleness time.Duration) error {
m, err := s.objStore.lastModified(ctx, layout.CheckpointPath)
if err != nil && !errors.Is(err, gcs.ErrObjectNotExist) {
return fmt.Errorf("lastModified(%q): %v", layout.CheckpointPath, err)
}
if time.Since(m) < minStaleness {
return nil
}

size, root, err := s.sequencer.currentTree(ctx)
if err != nil {
return fmt.Errorf("currentTree: %v", err)
}
cpRaw, err := s.newCP(size, root)
if err != nil {
return fmt.Errorf("newCP: %v", err)
}

if err := s.objStore.setObject(ctx, layout.CheckpointPath, cpRaw, nil, ckptContType); err != nil {
return fmt.Errorf("writeCheckpoint: %v", err)
}
Expand Down Expand Up @@ -301,7 +347,9 @@ func (s *Storage) setEntryBundle(ctx context.Context, bundleIndex uint64, logSiz
}

// integrate incorporates the provided entries into the log starting at fromSeq.
func (s *Storage) integrate(ctx context.Context, fromSeq uint64, entries []storage.SequencedEntry) error {
func (s *Storage) integrate(ctx context.Context, fromSeq uint64, entries []storage.SequencedEntry) ([]byte, error) {
var newRoot []byte

errG := errgroup.Group{}

errG.Go(func() error {
Expand All @@ -319,30 +367,29 @@ func (s *Storage) integrate(ctx context.Context, fromSeq uint64, entries []stora
}
return n, nil
}
newSize, newRoot, tiles, err := storage.Integrate(ctx, getTiles, fromSeq, entries)

newSize, root, tiles, err := storage.Integrate(ctx, getTiles, fromSeq, entries)
if err != nil {
return fmt.Errorf("Integrate: %v", err)
}
if newSize > 0 {
newRoot = root
} else {
newRoot = rfc6962.DefaultHasher.EmptyRoot()
}
for k, v := range tiles {
func(ctx context.Context, k storage.TileID, v *api.HashTile) {
errG.Go(func() error {
return s.setTile(ctx, uint64(k.Level), k.Index, newSize, v)
})
}(ctx, k, v)
}
errG.Go(func() error {
klog.Infof("New CP: %d, %x", newSize, newRoot)
if s.newCP != nil {
return s.updateCP(ctx, newSize, newRoot)
}
return nil
})
klog.Infof("New tree: %d, %x", newSize, newRoot)

return nil
})

return errG.Wait()
return newRoot, errG.Wait()
}

// updateEntryBundles adds the entries being integrated into the entry bundles.
Expand Down Expand Up @@ -466,6 +513,7 @@ func (s *spannerSequencer) initDB(ctx context.Context) error {
CREATE TABLE IntCoord (
id INT64 NOT NULL,
seq INT64 NOT NULL,
rootHash BYTES(32) NOT NULL,
) PRIMARY KEY (id);
*/

Expand All @@ -476,7 +524,7 @@ func (s *spannerSequencer) initDB(ctx context.Context) error {
if _, err := s.dbPool.Apply(ctx, []*spanner.Mutation{spanner.Insert("SeqCoord", []string{"id", "next"}, []interface{}{0, 0})}); err != nil && spanner.ErrCode(err) != codes.AlreadyExists {
return err
}
if _, err := s.dbPool.Apply(ctx, []*spanner.Mutation{spanner.Insert("IntCoord", []string{"id", "seq"}, []interface{}{0, 0})}); err != nil && spanner.ErrCode(err) != codes.AlreadyExists {
if _, err := s.dbPool.Apply(ctx, []*spanner.Mutation{spanner.Insert("IntCoord", []string{"id", "seq", "rootHash"}, []interface{}{0, 0, rfc6962.DefaultHasher.EmptyRoot()})}); err != nil && spanner.ErrCode(err) != codes.AlreadyExists {
return err
}
return nil
Expand Down Expand Up @@ -569,12 +617,13 @@ func (s *spannerSequencer) consumeEntries(ctx context.Context, limit uint64, f c
didWork := false
_, err := s.dbPool.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
// Figure out which is the starting index of sequenced entries to start consuming from.
row, err := txn.ReadRowWithOptions(ctx, "IntCoord", spanner.Key{0}, []string{"seq"}, &spanner.ReadOptions{LockHint: spannerpb.ReadRequest_LOCK_HINT_EXCLUSIVE})
row, err := txn.ReadRowWithOptions(ctx, "IntCoord", spanner.Key{0}, []string{"seq", "rootHash"}, &spanner.ReadOptions{LockHint: spannerpb.ReadRequest_LOCK_HINT_EXCLUSIVE})
if err != nil {
return err
}
var fromSeq int64 // Spanner doesn't support uint64
if err := row.Column(0, &fromSeq); err != nil {
var rootHash []byte
if err := row.Columns(&fromSeq, &rootHash); err != nil {
return fmt.Errorf("failed to read integration coordination info: %v", err)
}
klog.V(1).Infof("Consuming from %d", fromSeq)
Expand Down Expand Up @@ -620,14 +669,15 @@ func (s *spannerSequencer) consumeEntries(ctx context.Context, limit uint64, f c
}

// Call consumeFunc with the entries we've found
if err := f(ctx, uint64(fromSeq), entries); err != nil {
newRoot, err := f(ctx, uint64(fromSeq), entries)
if err != nil {
return err
}

// consumeFunc was successful, so we can update our coordination row, and delete the row(s) for
// the then consumed entries.
m := make([]*spanner.Mutation, 0)
m = append(m, spanner.Update("IntCoord", []string{"id", "seq"}, []interface{}{0, int64(orderCheck)}))
m = append(m, spanner.Update("IntCoord", []string{"id", "seq", "rootHash"}, []interface{}{0, int64(orderCheck), newRoot}))
for _, c := range seqsConsumed {
m = append(m, spanner.Delete("Seq", spanner.Key{0, c}))
}
Expand All @@ -647,6 +697,21 @@ func (s *spannerSequencer) consumeEntries(ctx context.Context, limit uint64, f c
return didWork, nil
}

// currentTree returns the size and root hash of the currently integrated tree.
func (s *spannerSequencer) currentTree(ctx context.Context) (uint64, []byte, error) {
row, err := s.dbPool.Single().ReadRow(ctx, "IntCoord", spanner.Key{0}, []string{"seq", "rootHash"})
AlCutter marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return 0, nil, fmt.Errorf("failed to read IntCoord: %v", err)
}
var fromSeq int64 // Spanner doesn't support uint64
var rootHash []byte
if err := row.Columns(&fromSeq, &rootHash); err != nil {
return 0, nil, fmt.Errorf("failed to read integration coordination info: %v", err)
}

return uint64(fromSeq), rootHash, nil
}

// gcsStorage knows how to store and retrieve objects from GCS.
type gcsStorage struct {
bucket string
Expand Down Expand Up @@ -713,3 +778,11 @@ func (s *gcsStorage) setObject(ctx context.Context, objName string, data []byte,
}
return nil
}

func (s *gcsStorage) lastModified(ctx context.Context, obj string) (time.Time, error) {
r, err := s.gcsClient.Bucket(s.bucket).Object(obj).NewReader(ctx)
if err != nil {
return time.Time{}, fmt.Errorf("failed to create reader for object %q in bucket %q: %w", obj, s.bucket, err)
}
return r.Attrs.LastModified, r.Close()
}
Loading
Loading