Skip to content

Commit

Permalink
Support direct signaling for advancing advertiser cleanroom state (#44)
Browse files Browse the repository at this point in the history
Co-authored-by: amanjpro <[email protected]>
  • Loading branch information
amanjpro and amanjpro authored Nov 28, 2024
1 parent f992485 commit da18aa6
Show file tree
Hide file tree
Showing 10 changed files with 65 additions and 218 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/google/uuid v1.6.0
github.com/gtank/ristretto255 v0.1.2
github.com/optable/match v1.4.0
github.com/optable/match-api/v2 v2.6.0
github.com/optable/match-api/v2 v2.7.0
github.com/rs/zerolog v1.33.0
gocloud.dev v0.39.0
golang.org/x/oauth2 v0.22.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,8 @@ github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APP
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/optable/match v1.4.0 h1:kyj1ty6qFIRVFsB6zTJab0RF3Duq9xqPIdld7+4IDa4=
github.com/optable/match v1.4.0/go.mod h1:l8DT0v6TfmIT53vBbEAp+W0EFAxJ22NIEeJDz0z3WDM=
github.com/optable/match-api/v2 v2.6.0 h1:MHZD5JWjwu7evHong9m3deyHi2hDzk77odMNtG33xbk=
github.com/optable/match-api/v2 v2.6.0/go.mod h1:b4eo6B06BE4goiWwhJ3bNl1BTuMF6hIZdGEhbRgdEkI=
github.com/optable/match-api/v2 v2.7.0 h1:fn4Qhrg9CoapikvrfpXhphoe03HipPnwju47c/89UpM=
github.com/optable/match-api/v2 v2.7.0/go.mod h1:b4eo6B06BE4goiWwhJ3bNl1BTuMF6hIZdGEhbRgdEkI=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
Expand Down
10 changes: 10 additions & 0 deletions pkg/bucket/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,16 @@ func (b *BucketCompleter) Complete(ctx context.Context) error {
return b.client.Close()
}

// Checks if the .Completed file exists in the destination bucket.
func (b *BucketCompleter) HasCompleted(ctx context.Context) (bool, error) {
dstBucket := b.client.Bucket(b.dstPrefixedBucket.bucket)
_, err := dstBucket.Object(fmt.Sprintf("%s/%s", b.dstPrefixedBucket.prefix, CompletedFile)).Attrs(ctx)
if errors.Is(err, storage.ErrObjectNotExist) {
return false, nil
}
return err == nil, err
}

// NewBucketReadWriter creates a new Bucket object and opens readers and writers for the specified source and destination URLs.
// Caller needs to call Close() on the returned Bucket object to release resources.
func NewBucketReadWriter(ctx context.Context, downscopedToken string, dstURL string, opts ...BucketOption) (*BucketReadWriter, error) {
Expand Down
9 changes: 3 additions & 6 deletions pkg/cmd/cli/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,9 @@ type CliContext struct {

type (
CleanroomCmd struct {
Get GetCmd `cmd:"" help:"Get the current status and configuration associated with the specified Optable PAIR clean room."`
Participate ParticipateCmd `cmd:"" hidden:"" help:"Participate in the PAIR operation by contributing advertiser hashed and encrypted data."`
ReEncrypt ReEncryptCmd `cmd:"" hidden:"" help:"Re-encrypt publisher's PAIR IDs with the advertiser key."`
Match MatchCmd `cmd:"" hidden:"" help:"Match publisher's PAIR IDs with advertiser's PAIR IDs."`
Run RunCmd `cmd:"" help:"As the advertiser clean room, run the PAIR match protocol with the publisher that has invited you to the specified Optable PAIR clean room."`
Decrypt DecryptCmd `cmd:"" help:"Decrypt a list of previously matched triple encrypted PAIR IDs using the advertiser clean room's private key."`
Get GetCmd `cmd:"" help:"Get the current status and configuration associated with the specified Optable PAIR clean room."`
Run RunCmd `cmd:"" help:"As the advertiser clean room, run the PAIR match protocol with the publisher that has invited you to the specified Optable PAIR clean room."`
Decrypt DecryptCmd `cmd:"" help:"Decrypt a list of previously matched triple encrypted PAIR IDs using the advertiser clean room's private key."`
}

KeyCmd struct {
Expand Down
79 changes: 0 additions & 79 deletions pkg/cmd/cli/match.go

This file was deleted.

27 changes: 27 additions & 0 deletions pkg/cmd/cli/pair.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,15 @@ func (c *pairConfig) hashEncryt(ctx context.Context, input string) (err error) {
if err != nil {
return fmt.Errorf("bucket.NewBucketCompleter: %w", err)
}
hasCompleted, err := bucketCompleter.HasCompleted(ctx)
if err != nil {
return fmt.Errorf("bucketCompleter.HasCompleted: %w", err)
}
if hasCompleted {
// nothing to do if the advertiser data has pushed the data
return nil
}

defer func() {
// don't complete the bucket if there was an error to prevent writing
// unwanted files.
Expand All @@ -95,6 +104,7 @@ func (c *pairConfig) hashEncryt(ctx context.Context, input string) (err error) {
logger.Error().Err(err).Msg("failed to write .Completed file to bucket")
return
}

}()

b, err := bucket.NewBucketReadWriter(ctx, c.downscopedToken, c.advTwicePath, bucket.WithReader(in))
Expand Down Expand Up @@ -141,6 +151,15 @@ func (c *pairConfig) reEncrypt(ctx context.Context, publisherPAIRIDsPath string)
if err != nil {
return fmt.Errorf("bucket.NewBucketCompleter: %w", err)
}

hasCompleted, err := bucketCompleter.HasCompleted(ctx)
if err != nil {
return fmt.Errorf("bucketCompleter.HasCompleted: %w", err)
}
if hasCompleted {
// nothing to do if the advertiser data has pushed the data
return nil
}
defer func() {
// don't complete the bucket if there was an error to prevent writing
// unwanted files.
Expand Down Expand Up @@ -251,3 +270,11 @@ func (c *pairConfig) match(ctx context.Context, outputPath string, publisherPAIR

return nil
}

func readersFromReadClosers(rs []io.ReadCloser) []io.Reader {
readers := make([]io.Reader, len(rs))
for i, r := range rs {
readers[i] = r
}
return readers
}
57 changes: 0 additions & 57 deletions pkg/cmd/cli/participate.go

This file was deleted.

73 changes: 0 additions & 73 deletions pkg/cmd/cli/re-encrypt.go

This file was deleted.

12 changes: 12 additions & 0 deletions pkg/cmd/cli/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,19 @@ func startFromStepOne(ctx context.Context, pairCfg *pairConfig, input, output st
return fmt.Errorf("hashEncryt: %w", err)
}

if _, err := pairCfg.cleanroomClient.AdvanceAdvertiserState(ctx); err != nil {
return fmt.Errorf("failed to advance advertiser state: %w", err)
}

// Step 2. Re-encrypt the publisher's hashed and encrypted PAIR IDs and output to pubTriplePath.
if err := pairCfg.reEncrypt(ctx, publisherData); err != nil {
return fmt.Errorf("reEncrypt: %w", err)
}

if _, err := pairCfg.cleanroomClient.AdvanceAdvertiserState(ctx); err != nil {
return fmt.Errorf("failed to advance advertiser state: %w", err)
}

if output == "" {
return nil
}
Expand All @@ -182,6 +190,10 @@ func startFromStepTwo(ctx context.Context, pairCfg *pairConfig, output string, p
return fmt.Errorf("reEncrypt: %w", err)
}

if _, err := pairCfg.cleanroomClient.AdvanceAdvertiserState(ctx); err != nil {
return fmt.Errorf("failed to advance advertiser state: %w", err)
}

if output == "" {
return nil
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@ func (c *CleanroomClient) RefreshToken(ctx context.Context) (*v1.Cleanroom, erro
return c.do(ctx, req)
}

func (c *CleanroomClient) AdvanceAdvertiserState(ctx context.Context) (*v1.Cleanroom, error) {
req := &v1.AdvanceCleanroomAdvertiserStateRequest{
Name: c.cleanroomName,
}

return c.do(ctx, req)
}

func (c *CleanroomClient) GetDownScopedToken(ctx context.Context) (string, error) {
cleanroom, err := c.GetCleanroom(ctx, true)
if err != nil {
Expand Down Expand Up @@ -159,6 +167,8 @@ func (c *CleanroomClient) do(ctx context.Context, req proto.Message) (*v1.Cleanr
path = "/admin/api/external/v1/cleanroom/get"
case *v1.RefreshTokenRequest:
path = "/admin/api/external/v1/cleanroom/refresh-token"
case *v1.AdvanceCleanroomAdvertiserStateRequest:
path = "/admin/api/external/v1/cleanroom/advance-advertiser-state"
default:
return nil, fmt.Errorf("unknown request type")
}
Expand Down

0 comments on commit da18aa6

Please sign in to comment.