Skip to content

Commit

Permalink
add publish worker
Browse files Browse the repository at this point in the history
  • Loading branch information
richardhuaaa committed Aug 21, 2024
1 parent 647b702 commit b994784
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 17 deletions.
89 changes: 89 additions & 0 deletions pkg/api/publishWorker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package api

Check failure on line 1 in pkg/api/publishWorker.go

View workflow job for this annotation

GitHub Actions / Lint

: # github.com/xmtp/xmtpd/pkg/api [github.com/xmtp/xmtpd/pkg/api.test]

import (
"context"
"database/sql"
"time"

"github.com/xmtp/xmtpd/pkg/db"
"github.com/xmtp/xmtpd/pkg/db/queries"
"github.com/xmtp/xmtpd/pkg/registrant"
"google.golang.org/protobuf/proto"
)

type PublishWorker struct {
listener <-chan []queries.StagedOriginatorEnvelope
registrant *registrant.Registrant
store *sql.DB
subscription db.DBSubscription[queries.StagedOriginatorEnvelope]
}

func StartPublishWorker(
ctx context.Context,
reg *registrant.Registrant,
store *sql.DB,
notifier <-chan bool,
) (*PublishWorker, error) {
query := func(lastSeenID int64, numRows int32) ([]queries.StagedOriginatorEnvelope, int64, error) {
results, err := queries.New(store).SelectStagedOriginatorEnvelopes(
ctx,
queries.SelectStagedOriginatorEnvelopesParams{
LastSeenID: lastSeenID,
NumRows: numRows,
},
)
if err != nil {
return nil, 0, err
}
if len(results) > 0 {
lastSeenID = results[len(results)-1].ID
}
return results, lastSeenID, nil
}
subscription := db.NewDBSubscription(
query,
0, // lastSeenID
db.PollingOptions{Interval: 5 * time.Second, Notifier: notifier, NumRows: 100},
)
listener, err := subscription.Start()
if err != nil {
return nil, err
}

worker := &PublishWorker{
subscription: *subscription,
listener: listener,
registrant: reg,
store: store,
}
go worker.start()

return worker, nil
}

func (p *PublishWorker) start() {
for new_batch := range p.listener {
for _, stagedEnv := range new_batch {
originatedEnv, sid, err := p.registrant.SignStagedEnvelope(stagedEnv)

Check failure on line 67 in pkg/api/publishWorker.go

View workflow job for this annotation

GitHub Actions / Lint

sid declared and not used

Check failure on line 67 in pkg/api/publishWorker.go

View workflow job for this annotation

GitHub Actions / Lint

sid declared and not used

Check failure on line 67 in pkg/api/publishWorker.go

View workflow job for this annotation

GitHub Actions / Test (Node)

sid declared and not used
if err != nil {
panic("TODO(rich)")
}
originatedBytes, err := proto.Marshal(originatedEnv)

Check failure on line 71 in pkg/api/publishWorker.go

View workflow job for this annotation

GitHub Actions / Lint

originatedBytes declared and not used

Check failure on line 71 in pkg/api/publishWorker.go

View workflow job for this annotation

GitHub Actions / Lint

originatedBytes declared and not used

Check failure on line 71 in pkg/api/publishWorker.go

View workflow job for this annotation

GitHub Actions / Test (Node)

originatedBytes declared and not used
if err != nil {
panic("TODO(rich)")
}
q := queries.New(p.store)

Check failure on line 75 in pkg/api/publishWorker.go

View workflow job for this annotation

GitHub Actions / Lint

q declared and not used) (typecheck)

Check failure on line 75 in pkg/api/publishWorker.go

View workflow job for this annotation

GitHub Actions / Lint

q declared and not used (typecheck)

Check failure on line 75 in pkg/api/publishWorker.go

View workflow job for this annotation

GitHub Actions / Test (Node)

q declared and not used
// TODO(rich) Verify context
// q.InsertGatewayEnvelope(context.Background(), queries.InsertGatewayEnvelopeParams{
// OriginatorSid: sid,
// Topic: originatedBytes,
// OriginatorEnvelope: proto.Marshal(originatedEnv),
// })

// Start transaction
// Sign envelope
// Insert into all envelopes
// Delete envelope from staged_originator_envelopes
}
}
}
30 changes: 22 additions & 8 deletions pkg/api/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,33 @@ import (
type Service struct {
message_api.UnimplementedReplicationApiServer

ctx context.Context
log *zap.Logger
registrant *registrant.Registrant
queries *queries.Queries
ctx context.Context
log *zap.Logger
notifyStagedPublish chan<- bool
registrant *registrant.Registrant
store *sql.DB
worker *PublishWorker
}

func NewReplicationApiService(
ctx context.Context,
log *zap.Logger,
registrant *registrant.Registrant,
writerDB *sql.DB,
store *sql.DB,
) (*Service, error) {
return &Service{ctx: ctx, log: log, registrant: registrant, queries: queries.New(writerDB)}, nil
notifier := make(chan bool, 1)
// worker, err := StartPublishWorker(ctx, store, notifier)
// if err != nil {
// return nil, err
// }
return &Service{
ctx: ctx,
log: log,
notifyStagedPublish: notifier,
registrant: registrant,
store: store,
// worker: worker,
}, nil
}

func (s *Service) Close() {
Expand Down Expand Up @@ -71,12 +85,12 @@ func (s *Service) PublishEnvelope(
return nil, status.Errorf(codes.Internal, "could not marshal envelope: %v", err)
}

stagedEnv, err := s.queries.InsertStagedOriginatorEnvelope(ctx, payerBytes)
stagedEnv, err := queries.New(s.store).InsertStagedOriginatorEnvelope(ctx, payerBytes)
if err != nil {
return nil, status.Errorf(codes.Internal, "could not insert staged envelope: %v", err)
}

originatorEnv, err := s.registrant.SignStagedEnvelope(stagedEnv)
originatorEnv, _, err := s.registrant.SignStagedEnvelope(stagedEnv)
if err != nil {
return nil, status.Errorf(codes.Internal, "could not sign envelope: %v", err)
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/registrant/registrant.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,14 @@ func (r *Registrant) signKeccak256(data []byte) ([]byte, error) {

func (r *Registrant) SignStagedEnvelope(
stagedEnv queries.StagedOriginatorEnvelope,
) (*message_api.OriginatorEnvelope, error) {
) (*message_api.OriginatorEnvelope, uint64, error) {
payerEnv := &message_api.PayerEnvelope{}
if err := proto.Unmarshal(stagedEnv.PayerEnvelope, payerEnv); err != nil {
return nil, fmt.Errorf("Could not unmarshal payer envelope: %v", err)
return nil, 0, fmt.Errorf("Could not unmarshal payer envelope: %v", err)
}
sid, err := r.sid(stagedEnv.ID)
if err != nil {
return nil, err
return nil, 0, err
}
unsignedEnv := message_api.UnsignedOriginatorEnvelope{
OriginatorSid: sid,
Expand All @@ -78,12 +78,12 @@ func (r *Registrant) SignStagedEnvelope(
}
unsignedBytes, err := proto.Marshal(&unsignedEnv)
if err != nil {
return nil, err
return nil, 0, err
}

sig, err := r.signKeccak256(unsignedBytes)
if err != nil {
return nil, err
return nil, 0, err
}

signedEnv := message_api.OriginatorEnvelope{
Expand All @@ -95,7 +95,7 @@ func (r *Registrant) SignStagedEnvelope(
},
}

return &signedEnv, nil
return &signedEnv, sid, nil
}

func getRegistryRecord(
Expand Down
6 changes: 3 additions & 3 deletions pkg/registrant/registrant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func TestSignStagedEnvelopeInvalidEnvelope(t *testing.T) {
_, r, cleanup := setupWithRegistrant(t)
defer cleanup()

_, err := r.SignStagedEnvelope(
_, _, err := r.SignStagedEnvelope(
queries.StagedOriginatorEnvelope{
ID: 1,
OriginatorTime: time.Now(),
Expand All @@ -205,7 +205,7 @@ func TestSignStagedEnvelopeSIDExhaustion(t *testing.T) {
payerBytes, err := proto.Marshal(&message_api.PayerEnvelope{})
require.NoError(t, err)

_, err = r.SignStagedEnvelope(
_, _, err = r.SignStagedEnvelope(
queries.StagedOriginatorEnvelope{
ID: 0b0000000000000001000000000000000000000000000000000000000000000000,
OriginatorTime: time.Now(),
Expand All @@ -224,7 +224,7 @@ func TestSignStagedEnvelopeSuccess(t *testing.T) {
)
require.NoError(t, err)

env, err := r.SignStagedEnvelope(
env, _, err := r.SignStagedEnvelope(
queries.StagedOriginatorEnvelope{
ID: 50,
OriginatorTime: time.Now(),
Expand Down

0 comments on commit b994784

Please sign in to comment.