Skip to content

Commit

Permalink
Add PanicPublisher for triggering restart on Publish fail.
Browse files Browse the repository at this point in the history
  • Loading branch information
AndreasBergmeier6176 committed Jan 17, 2024
1 parent eb5c499 commit d8be33d
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 1 deletion.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require (
github.com/google/go-cmp v0.5.9
go.opentelemetry.io/otel v1.14.0
go.opentelemetry.io/otel/sdk v1.2.0
go.opentelemetry.io/otel/trace v1.14.0
google.golang.org/api v0.114.0
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1
google.golang.org/grpc v1.56.3
Expand Down Expand Up @@ -81,7 +82,6 @@ require (
github.com/stretchr/testify v1.8.2 // indirect
github.com/xlab/treeprint v0.0.0-20181112141820-a009c3971eca // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/otel/trace v1.14.0 // indirect
go.starlark.net v0.0.0-20200306205701-8dd3e2ee1dd5 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
Expand Down
42 changes: 42 additions & 0 deletions pkg/publisher/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@ package publisher

import (
"context"
"fmt"

"cloud.google.com/go/errorreporting"
cepubsub "github.com/cloudevents/sdk-go/protocol/pubsub/v2"
"github.com/otto-de/sherlock-microservice/pkg/gcp/errorreports"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)

type Option struct {
Expand Down Expand Up @@ -39,3 +43,41 @@ func ApplyCloudEventsPubSubOrderingKey(ctx context.Context, opts ...Option) cont
}
return ctx
}

// PanicPublisher wraps a Publisher.
// Panics if publishing fails.
type PanicPublisher[EV any] struct {
er *errorreporting.Client
base Publisher[EV]
}

func NewPanicPublisher[EV any](er *errorreporting.Client, base Publisher[EV]) *PanicPublisher[EV] {
return &PanicPublisher[EV]{
er: er,
base: base,
}
}

func (p *PanicPublisher[EV]) PublishWithNACKPanic(ctx context.Context, event *EV, opts ...Option) errorreports.Error {
publishErr := p.base.Publish(ctx, event, opts...)
if publishErr != nil {
return nil
}
span := trace.SpanFromContext(ctx)
span.SetStatus(codes.Error, "Send failed")
span.RecordError(publishErr)
if publishErr.IsACK() {
p.er.Report(errorreporting.Entry{
Error: publishErr,
})
} else {
p.er.ReportSync(ctx, errorreporting.Entry{
Error: publishErr,
})
// For now we do not recover here but just panic
// TODO: Optimally we should introduce circuit breakers instead
panicErr := fmt.Errorf("choosing to panic (may trigger restart) due to possible unrecoverable publishing error: %w", publishErr)
panic(panicErr)
}
return publishErr
}

0 comments on commit d8be33d

Please sign in to comment.