Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GCP] Dedup storage experiment #363

Merged
merged 8 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion cmd/conformance/gcp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ var (
listen = flag.String("listen", ":2024", "Address:port to listen on")
spanner = flag.String("spanner", "", "Spanner resource URI ('projects/.../...')")
signer = flag.String("signer", "", "Note signer to use to sign checkpoints")
persistentDedup = flag.Bool("gcp_dedup", false, "EXPERIMENTAL: Set to true to enable persistent dedupe storage")
additionalSigners = []string{}
)

Expand Down Expand Up @@ -65,7 +66,18 @@ func main() {
if err != nil {
klog.Exitf("Failed to create new GCP storage: %v", err)
}
dedupeAdd := tessera.InMemoryDedupe(storage.Add, 256)

// Handle dedup configuration
addDelegate := storage.Add

// PersistentDedup is currently experimental, so there's no terraform or documentation yet!
if *persistentDedup {
addDelegate, err = gcp.NewDedupe(ctx, fmt.Sprintf("%s_dedup", *spanner), addDelegate)
if err != nil {
klog.Exitf("Failed to create new GCP dedupe: %v", err)
}
}
dedupeAdd := tessera.InMemoryDedupe(addDelegate, 256)

// Expose a HTTP handler for the conformance test writes.
// This should accept arbitrary bytes POSTed to /add, and return an ascii
Expand Down
31 changes: 20 additions & 11 deletions dedupe.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ package tessera

import (
"context"
"fmt"
"sync"

"github.com/hashicorp/golang-lru/v2/expirable"
lru "github.com/hashicorp/golang-lru/v2"
)

// InMemoryDedupe wraps an Add function to prevent duplicate entries being written to the underlying
Expand All @@ -35,30 +36,38 @@ import (
// InMemoryDedupe. This allows recent duplicates to be deduplicated in memory, reducing the need to
// make calls to a persistent storage.
func InMemoryDedupe(delegate func(ctx context.Context, e *Entry) IndexFuture, size uint) func(context.Context, *Entry) IndexFuture {
c, err := lru.New[string, func() IndexFuture](int(size))
if err != nil {
panic(fmt.Errorf("lru.New(%d): %v", size, err))
}
dedupe := &inMemoryDedupe{
delegate: delegate,
cache: expirable.NewLRU[string, IndexFuture](int(size), nil, 0),
cache: c,
}
return dedupe.add
}

type inMemoryDedupe struct {
delegate func(ctx context.Context, e *Entry) IndexFuture
mu sync.Mutex // cache is thread safe, but this mutex allows us to do conditional writes
cache *expirable.LRU[string, IndexFuture]
cache *lru.Cache[string, func() IndexFuture]
}

// Add adds the entry to the underlying delegate only if e hasn't been recently seen. In either case,
// an IndexFuture will be returned that the client can use to get the sequence number of this entry.
func (d *inMemoryDedupe) add(ctx context.Context, e *Entry) IndexFuture {
id := string(e.Identity())
d.mu.Lock()
defer d.mu.Unlock()

f, ok := d.cache.Get(id)
if !ok {
f = d.delegate(ctx, e)
d.cache.Add(id, f)
// However many calls with the same entry come in and are deduped, we should only call delegate
// once for each unique entry:
f := sync.OnceValue(func() IndexFuture {
return d.delegate(ctx, e)
})

// if we've seen this entry before, discard our f and replace
// with the one we created last time, otherwise store f against id.
if prev, ok, _ := d.cache.PeekOrAdd(id, f); ok {
f = prev
}
return f

return f()
}
10 changes: 6 additions & 4 deletions dedupe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,15 @@ func TestDedupe(t *testing.T) {
dedupeAdd := tessera.InMemoryDedupe(delegate, 256)

// Add foo, bar, baz to prime the cache to make things interesting
dedupeAdd(ctx, tessera.NewEntry([]byte("foo")))
dedupeAdd(ctx, tessera.NewEntry([]byte("bar")))
dedupeAdd(ctx, tessera.NewEntry([]byte("baz")))
for _, s := range []string{"foo", "bar", "baz"} {
if _, err := dedupeAdd(ctx, tessera.NewEntry([]byte(s)))(); err != nil {
t.Fatalf("dedupeAdd(%q): %v", s, err)
}
}

idx, err := dedupeAdd(ctx, tessera.NewEntry([]byte(tC.newValue)))()
if err != nil {
t.Fatal(err)
t.Fatalf("dedupeAdd(%q): %v", tC.newValue, err)
}
if idx != tC.wantIdx {
t.Errorf("got != want (%d != %d)", idx, tC.wantIdx)
Expand Down
169 changes: 169 additions & 0 deletions storage/gcp/gcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,13 @@ import (
"io"
"net/http"
"os"
"sync/atomic"
"time"

"cloud.google.com/go/spanner"
"cloud.google.com/go/spanner/apiv1/spannerpb"
gcs "cloud.google.com/go/storage"
"github.com/globocom/go-buffer"
"github.com/google/go-cmp/cmp"
"github.com/transparency-dev/merkle/rfc6962"
tessera "github.com/transparency-dev/trillian-tessera"
Expand Down Expand Up @@ -783,3 +785,170 @@ func (s *gcsStorage) lastModified(ctx context.Context, obj string) (time.Time, e
}
return r.Attrs.LastModified, r.Close()
}

// NewDedupe returns wrapped Add func which will use Spanner to maintain a mapping of
// previously seen entries and their assigned indices. Future calls with the same entry
// will return the previously assigned index, as yet unseen entries will be passed to the provided
// delegate function to have an index assigned.
//
// For performance reasons, the ID -> index associations returned by the delegate are buffered before
// being flushed to Spanner. This can result in duplicates occuring in some circumstances, but in
// general this should not be a problem.
//
// Note that the storage for this mapping is entirely separate and unconnected to the storage used for
// maintaining the Merkle tree.
//
// This functionality is experimental!
func NewDedupe(ctx context.Context, spannerDB string, delegate func(ctx context.Context, e *tessera.Entry) tessera.IndexFuture) (func(ctx context.Context, e *tessera.Entry) tessera.IndexFuture, error) {
/*
Schema for reference:

CREATE TABLE IDSeq (
id INT64 NOT NULL,
h BYTES(MAX) NOT NULL,
idx INT64 NOT NULL,
) PRIMARY KEY (id, h);
*/
dedupDB, err := spanner.NewClient(ctx, spannerDB)
if err != nil {
return nil, fmt.Errorf("failed to connect to Spanner: %v", err)
}

r := &dedupStorage{
ctx: ctx,
dbPool: dedupDB,
delegate: delegate,
}

// TODO(al): Make these configurable
r.buf = buffer.New(
buffer.WithSize(64),
buffer.WithFlushInterval(200*time.Millisecond),
buffer.WithFlusher(buffer.FlusherFunc(r.flush)),
buffer.WithPushTimeout(15*time.Second),
)
go func(ctx context.Context) {
t := time.NewTicker(time.Second)
for {
select {
case <-ctx.Done():
return
case <-t.C:
klog.V(1).Infof("DEDUP: # Writes %d, # Lookups %d, # DB hits %v, # buffer Push discards %d", r.numWrites.Load(), r.numLookups.Load(), r.numDBDedups.Load(), r.numPushErrs.Load())
}
}
}(ctx)
return r.add, nil
}

type dedupStorage struct {
ctx context.Context
dbPool *spanner.Client
delegate func(ctx context.Context, e *tessera.Entry) tessera.IndexFuture

numLookups atomic.Uint64
numWrites atomic.Uint64
numDBDedups atomic.Uint64
numPushErrs atomic.Uint64

buf *buffer.Buffer
}

// index returns the index (if any) previously associated with the provided hash
func (d *dedupStorage) index(ctx context.Context, h []byte) (*uint64, error) {
d.numLookups.Add(1)
var idx int64
if row, err := d.dbPool.Single().ReadRow(ctx, "IDSeq", spanner.Key{0, h}, []string{"idx"}); err != nil {
if c := spanner.ErrCode(err); c == codes.NotFound {
return nil, nil
}
return nil, err
} else {
if err := row.Column(0, &idx); err != nil {
return nil, fmt.Errorf("failed to read dedup index: %v", err)
}
idx := uint64(idx)
d.numDBDedups.Add(1)
return &idx, nil
}
}

// storeMappings stores the associations between the keys and IDs in a non-atomic fashion
// (i.e. it does not store all or none in a transactional sense).
//
// Returns an error if one or more mappings cannot be stored.
func (d *dedupStorage) storeMappings(ctx context.Context, entries []dedupeMapping) error {
m := make([]*spanner.MutationGroup, 0, len(entries))
for _, e := range entries {
m = append(m, &spanner.MutationGroup{
Mutations: []*spanner.Mutation{spanner.Insert("IDSeq", []string{"id", "h", "idx"}, []interface{}{0, e.ID, int64(e.Idx)})},
})
}

i := d.dbPool.BatchWrite(ctx, m)
return i.Do(func(r *spannerpb.BatchWriteResponse) error {
s := r.GetStatus()
if c := codes.Code(s.Code); c != codes.OK && c != codes.AlreadyExists {
return fmt.Errorf("failed to write dedup record: %v (%v)", s.GetMessage(), c)
}
return nil
})
}

// dedupeMapping represents an ID -> index mapping.
type dedupeMapping struct {
ID []byte
Idx uint64
}

// add adds the entry to the underlying delegate only if e isn't already known. In either case,
// an IndexFuture will be returned that the client can use to get the sequence number of this entry.
func (d *dedupStorage) add(ctx context.Context, e *tessera.Entry) tessera.IndexFuture {
idx, err := d.index(ctx, e.Identity())
if err != nil {
return func() (uint64, error) { return 0, err }
}
if idx != nil {
return func() (uint64, error) { return *idx, nil }
}

i, err := d.delegate(ctx, e)()
if err != nil {
return func() (uint64, error) { return 0, err }
}

err = d.enqueueMapping(ctx, e.Identity(), i)
return func() (uint64, error) {
return i, err
}
}

// enqueueMapping buffers the provided ID -> index mapping ready to be flushed to storage.
func (d *dedupStorage) enqueueMapping(_ context.Context, h []byte, idx uint64) error {
err := d.buf.Push(dedupeMapping{ID: h, Idx: idx})
if err != nil {
d.numPushErrs.Add(1)
// This means there's pressure flushing dedup writes out, so discard this write.
if err != buffer.ErrTimeout {
return err
}
}
return nil
}

// flush writes enqueued mappings to storage.
func (d *dedupStorage) flush(items []interface{}) {
entries := make([]dedupeMapping, len(items))
for i := range items {
entries[i] = items[i].(dedupeMapping)
}

ctx, c := context.WithTimeout(d.ctx, 15*time.Second)
defer c()

if err := d.storeMappings(ctx, entries); err != nil {
klog.Infof("Failed to flush dedup entries: %v", err)
return
}
d.numWrites.Add(uint64(len(entries)))
}
Loading