diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index ed107525..e33afb06 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -19,7 +19,7 @@ jobs: - name: Set up Go uses: actions/setup-go@v3 with: - go-version: "1.18" + go-version: "1.20" cache: true check-latest: true - name: Get release tag diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index f90df090..7257cfbc 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -19,7 +19,7 @@ jobs: - name: Set up Go uses: actions/setup-go@v3 with: - go-version: '^1.18' + go-version: '^1.20' cache: true check-latest: true - name: Login to DockerHub diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index cc0f4577..0bd6180a 100755 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -14,7 +14,7 @@ jobs: - name: Set up Go uses: actions/setup-go@v3 with: - go-version: '1.18' + go-version: '1.20' cache: true - name: Install dependencies run: sudo apt-get install build-essential @@ -53,7 +53,7 @@ jobs: - name: Set up Go uses: actions/setup-go@v3 with: - go-version: '1.18' + go-version: '1.20' cache: true - name: Install dependencies run: sudo apt-get install build-essential @@ -74,7 +74,7 @@ jobs: - name: Set up Go uses: actions/setup-go@v3 with: - go-version: '1.18' + go-version: '1.20' cache: true - name: Download coverage uses: actions/download-artifact@v3 diff --git a/cmd/lint.go b/cmd/lint.go index bf559441..d81c3fcc 100644 --- a/cmd/lint.go +++ b/cmd/lint.go @@ -28,7 +28,7 @@ func LintCmd() *cobra.Command { return &cobra.Command{ Use: "lint [path]", Aliases: []string{"l"}, - Args: cobra.MatchAll(cobra.ExactArgs(1)), + Args: cobra.MatchAll(cobra.ExactArgs(1), cobra.OnlyValidArgs), Short: "Check for issues in recipes", Long: heredoc.Doc(` Check for issues specified recipes. @@ -193,14 +193,12 @@ func printConfigError(rcp recipe.Recipe, pluginNode recipe.PluginNode, err plugi } // findPluginByName checks plugin by provided name -func findPluginByName(plugins []recipe.PluginRecipe, name string) (plugin recipe.PluginRecipe, exists bool) { - for _, p := range plugins { +func findPluginByName(pp []recipe.PluginRecipe, name string) (recipe.PluginRecipe, bool) { + for _, p := range pp { if p.Name == name { - exists = true - plugin = p - return + return p, true } } - return + return recipe.PluginRecipe{}, false } diff --git a/go.mod b/go.mod index 67b778b2..1909118c 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/raystack/meteor -go 1.18 +go 1.20 require ( cloud.google.com/go/bigquery v1.52.0 @@ -142,7 +142,7 @@ require ( github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect github.com/google/uuid v1.3.0 // indirect github.com/googleapis/enterprise-certificate-proxy v0.2.5 // indirect - github.com/googleapis/gax-go/v2 v2.12.0 // indirect + github.com/googleapis/gax-go/v2 v2.12.0 github.com/gopherjs/gopherjs v0.0.0-20210503212227-fb464eba2686 // indirect github.com/gorilla/css v1.0.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.1 // indirect diff --git a/metrics/otelgrpc/otelgrpc.go b/metrics/otelgrpc/otelgrpc.go new file mode 100644 index 00000000..7db03aff --- /dev/null +++ b/metrics/otelgrpc/otelgrpc.go @@ -0,0 +1,198 @@ +package otelgrpc + +import ( + "context" + "net" + "strings" + "time" + + "github.com/raystack/meteor/utils" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + semconv "go.opentelemetry.io/otel/semconv/v1.20.0" + "google.golang.org/grpc" + "google.golang.org/grpc/peer" + "google.golang.org/protobuf/proto" +) + +type UnaryParams struct { + Start time.Time + Method string + Req any + Res any + Err error +} + +type Monitor struct { + duration metric.Int64Histogram + requestSize metric.Int64Histogram + responseSize metric.Int64Histogram + attributes []attribute.KeyValue +} + +func NewOtelGRPCMonitor(hostName string) Monitor { + meter := otel.Meter("github.com/raystack/meteor/metrics/otelgrpc") + + duration, err := meter.Int64Histogram("rpc.client.duration", metric.WithUnit("ms")) + handleOtelErr(err) + + requestSize, err := meter.Int64Histogram("rpc.client.request.size", metric.WithUnit("By")) + handleOtelErr(err) + + responseSize, err := meter.Int64Histogram("rpc.client.response.size", metric.WithUnit("By")) + handleOtelErr(err) + + addr, port := ExtractAddress(hostName) + + return Monitor{ + duration: duration, + requestSize: requestSize, + responseSize: responseSize, + attributes: []attribute.KeyValue{ + semconv.RPCSystemGRPC, + attribute.String("network.transport", "tcp"), + attribute.String("server.address", addr), + attribute.String("server.port", port), + }, + } +} + +func GetProtoSize(p any) int { + if p == nil { + return 0 + } + + size := proto.Size(p.(proto.Message)) + return size +} + +func (m *Monitor) RecordUnary(ctx context.Context, p UnaryParams) { + reqSize := GetProtoSize(p.Req) + resSize := GetProtoSize(p.Res) + + attrs := make([]attribute.KeyValue, len(m.attributes)) + copy(attrs, m.attributes) + attrs = append(attrs, attribute.String("rpc.grpc.status_text", utils.StatusText(p.Err))) + attrs = append(attrs, attribute.String("network.type", netTypeFromCtx(ctx))) + attrs = append(attrs, ParseFullMethod(p.Method)...) + + m.duration.Record(ctx, + time.Since(p.Start).Milliseconds(), + metric.WithAttributes(attrs...)) + + m.requestSize.Record(ctx, + int64(reqSize), + metric.WithAttributes(attrs...)) + + m.responseSize.Record(ctx, + int64(resSize), + metric.WithAttributes(attrs...)) +} + +func (m *Monitor) RecordStream(ctx context.Context, start time.Time, method string, err error) { + attrs := make([]attribute.KeyValue, len(m.attributes)) + copy(attrs, m.attributes) + attrs = append(attrs, attribute.String("rpc.grpc.status_text", utils.StatusText(err))) + attrs = append(attrs, attribute.String("network.type", netTypeFromCtx(ctx))) + attrs = append(attrs, ParseFullMethod(method)...) + + m.duration.Record(ctx, + time.Since(start).Milliseconds(), + metric.WithAttributes(attrs...)) +} + +func (m *Monitor) UnaryClientInterceptor() grpc.UnaryClientInterceptor { + return func(ctx context.Context, + method string, + req, reply interface{}, + cc *grpc.ClientConn, + invoker grpc.UnaryInvoker, + opts ...grpc.CallOption, + ) (err error) { + defer func(start time.Time) { + m.RecordUnary(ctx, UnaryParams{ + Start: start, + Req: req, + Res: reply, + Err: err, + }) + }(time.Now()) + + return invoker(ctx, method, req, reply, cc, opts...) + } +} + +func (m *Monitor) StreamClientInterceptor() grpc.StreamClientInterceptor { + return func(ctx context.Context, + desc *grpc.StreamDesc, + cc *grpc.ClientConn, + method string, + streamer grpc.Streamer, + opts ...grpc.CallOption, + ) (s grpc.ClientStream, err error) { + defer func(start time.Time) { + m.RecordStream(ctx, start, method, err) + }(time.Now()) + + return streamer(ctx, desc, cc, method, opts...) + } +} + +func (m *Monitor) GetAttributes() []attribute.KeyValue { + return m.attributes +} + +func ParseFullMethod(fullMethod string) []attribute.KeyValue { + name := strings.TrimLeft(fullMethod, "/") + service, method, found := strings.Cut(name, "/") + if !found { + return nil + } + + var attrs []attribute.KeyValue + if service != "" { + attrs = append(attrs, semconv.RPCService(service)) + } + if method != "" { + attrs = append(attrs, semconv.RPCMethod(method)) + } + return attrs +} + +func handleOtelErr(err error) { + if err != nil { + otel.Handle(err) + } +} + +func ExtractAddress(addr string) (host, port string) { + host, port, err := net.SplitHostPort(addr) + if err != nil { + return addr, "80" + } + + return host, port +} + +func netTypeFromCtx(ctx context.Context) (ipType string) { + ipType = "unknown" + p, ok := peer.FromContext(ctx) + if !ok { + return ipType + } + + clientIP, _, err := net.SplitHostPort(p.Addr.String()) + if err != nil { + return ipType + } + + ip := net.ParseIP(clientIP) + if ip.To4() != nil { + ipType = "ipv4" + } else if ip.To16() != nil { + ipType = "ipv6" + } + + return ipType +} diff --git a/metrics/otelgrpc/otelgrpc_test.go b/metrics/otelgrpc/otelgrpc_test.go new file mode 100644 index 00000000..1113de91 --- /dev/null +++ b/metrics/otelgrpc/otelgrpc_test.go @@ -0,0 +1,106 @@ +package otelgrpc_test + +import ( + "context" + "errors" + "reflect" + "testing" + "time" + + "github.com/raystack/meteor/metrics/otelgrpc" + pb "github.com/raystack/optimus/protos/raystack/optimus/core/v1beta1" + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/otel/attribute" + semconv "go.opentelemetry.io/otel/semconv/v1.20.0" +) + +func Test_otelGRPCMonitor_Record(t *testing.T) { + mt := otelgrpc.NewOtelGRPCMonitor("localhost:1001") + assert.NotNil(t, mt) + initialAttr := mt.GetAttributes() + + uc := mt.UnaryClientInterceptor() + assert.NotNil(t, uc) + assert.Equal(t, initialAttr, mt.GetAttributes()) + + sc := mt.StreamClientInterceptor() + assert.NotNil(t, sc) + assert.Equal(t, initialAttr, mt.GetAttributes()) + + mt.RecordUnary(context.Background(), otelgrpc.UnaryParams{ + Start: time.Now(), + Method: "/service.raystack.com/MethodName", + Req: nil, + Res: nil, + Err: nil, + }) + assert.Equal(t, initialAttr, mt.GetAttributes()) + + mt.RecordUnary(context.Background(), otelgrpc.UnaryParams{ + Start: time.Now(), + Method: "", + Req: &pb.ListProjectsRequest{}, + Res: nil, + Err: nil, + }) + assert.Equal(t, initialAttr, mt.GetAttributes()) + + mt.RecordStream(context.Background(), time.Now(), "", nil) + assert.Equal(t, initialAttr, mt.GetAttributes()) + + mt.RecordStream(context.Background(), time.Now(), "/service.raystack.com/MethodName", errors.New("dummy error")) + assert.Equal(t, initialAttr, mt.GetAttributes()) +} + +func Test_parseFullMethod(t *testing.T) { + type args struct { + fullMethod string + } + tests := []struct { + name string + args args + want []attribute.KeyValue + }{ + {name: "should parse correct method", args: args{ + fullMethod: "/test.service.name/MethodNameV1", + }, want: []attribute.KeyValue{ + semconv.RPCService("test.service.name"), + semconv.RPCMethod("MethodNameV1"), + }}, + + {name: "should return empty attributes on incorrect method", args: args{ + fullMethod: "incorrectMethod", + }, want: nil}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := otelgrpc.ParseFullMethod(tt.args.fullMethod); !reflect.DeepEqual(got, tt.want) { + t.Errorf("parseFullMethod() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_getProtoSize(t *testing.T) { + req := &pb.ListProjectNamespacesRequest{ + ProjectName: "asd", + } + + if got := otelgrpc.GetProtoSize(req); got != 5 { + t.Errorf("getProtoSize() = %v, want %v", got, 5) + } +} + +func TestExtractAddress(t *testing.T) { + gotHost, gotPort := otelgrpc.ExtractAddress("localhost:1001") + assert.Equal(t, "localhost", gotHost) + assert.Equal(t, "1001", gotPort) + + gotHost, gotPort = otelgrpc.ExtractAddress("localhost") + assert.Equal(t, "localhost", gotHost) + assert.Equal(t, "80", gotPort) + + gotHost, gotPort = otelgrpc.ExtractAddress("some.address.golabs.io:15010") + assert.Equal(t, "some.address.golabs.io", gotHost) + assert.Equal(t, "15010", gotPort) +} diff --git a/metrics/otelhttpclient/http_transport.go b/metrics/otelhttpclient/http_transport.go index b6745c8f..52325321 100644 --- a/metrics/otelhttpclient/http_transport.go +++ b/metrics/otelhttpclient/http_transport.go @@ -46,7 +46,7 @@ func NewHTTPTransport(baseTransport http.RoundTripper) http.RoundTripper { } icl := &httpTransport{roundTripper: baseTransport} - icl.createMeasures(otel.Meter("github.com/goto/meteor/metrics/otehttpclient")) + icl.createMeasures(otel.Meter("github.com/raystack/meteor/metrics/otehttpclient")) return icl } diff --git a/plugins/extractors/caramlstore/caramlstore_core_client.go b/plugins/extractors/caramlstore/caramlstore_core_client.go index 73810a61..fd65beb3 100644 --- a/plugins/extractors/caramlstore/caramlstore_core_client.go +++ b/plugins/extractors/caramlstore/caramlstore_core_client.go @@ -8,6 +8,7 @@ import ( grpcmw "github.com/grpc-ecosystem/go-grpc-middleware" grpcretry "github.com/grpc-ecosystem/go-grpc-middleware/retry" grpcprom "github.com/grpc-ecosystem/go-grpc-prometheus" + og "github.com/raystack/meteor/metrics/otelgrpc" "github.com/raystack/meteor/plugins/extractors/caramlstore/internal/core" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" @@ -23,7 +24,6 @@ const ( type gRPCClient struct { opts []grpc.DialOption core.CoreServiceClient - conn *grpc.ClientConn timeout time.Duration } @@ -106,6 +106,8 @@ func (c *gRPCClient) createConnection(ctx context.Context, hostURL string, maxSi maxSizeInMB = gRPCMaxClientRecvSizeMB } + m := og.NewOtelGRPCMonitor(hostURL) + opts := []grpc.DialOption{ grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock(), @@ -120,10 +122,12 @@ func (c *gRPCClient) createConnection(ctx context.Context, hostURL string, maxSi ), otelgrpc.UnaryClientInterceptor(), grpcprom.UnaryClientInterceptor, + m.UnaryClientInterceptor(), )), grpc.WithStreamInterceptor(grpcmw.ChainStreamClient( otelgrpc.StreamClientInterceptor(), grpcprom.StreamClientInterceptor, + m.StreamClientInterceptor(), )), } opts = append(opts, c.opts...) diff --git a/plugins/extractors/frontier/client/client.go b/plugins/extractors/frontier/client/client.go index 897ceed7..dfc58b95 100644 --- a/plugins/extractors/frontier/client/client.go +++ b/plugins/extractors/frontier/client/client.go @@ -9,6 +9,7 @@ import ( grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" sh "github.com/raystack/frontier/proto/v1beta1" + og "github.com/raystack/meteor/metrics/otelgrpc" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -35,18 +36,19 @@ type client struct { conn *grpc.ClientConn } -func (c *client) Connect(ctx context.Context, host string) (err error) { +func (c *client) Connect(ctx context.Context, host string) error { dialTimeoutCtx, dialCancel := context.WithTimeout(ctx, time.Second*2) defer dialCancel() - if c.conn, err = c.createConnection(dialTimeoutCtx, host); err != nil { - err = fmt.Errorf("error creating connection: %w", err) - return + var err error + c.conn, err = c.createConnection(dialTimeoutCtx, host) + if err != nil { + return fmt.Errorf("create connection: %w", err) } c.FrontierServiceClient = sh.NewFrontierServiceClient(c.conn) - return + return nil } func (c *client) Close() error { @@ -54,6 +56,8 @@ func (c *client) Close() error { } func (c *client) createConnection(ctx context.Context, host string) (*grpc.ClientConn, error) { + m := og.NewOtelGRPCMonitor(host) + retryOpts := []grpc_retry.CallOption{ grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100 * time.Millisecond)), grpc_retry.WithMax(GRPCMaxRetry), @@ -70,10 +74,12 @@ func (c *client) createConnection(ctx context.Context, host string) (*grpc.Clien grpc_retry.UnaryClientInterceptor(retryOpts...), otelgrpc.UnaryClientInterceptor(), grpc_prometheus.UnaryClientInterceptor, + m.UnaryClientInterceptor(), )), grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient( otelgrpc.StreamClientInterceptor(), grpc_prometheus.StreamClientInterceptor, + m.StreamClientInterceptor(), )), ) diff --git a/plugins/extractors/metabase/models/models.go b/plugins/extractors/metabase/models/models.go index 53632e6f..1912fe45 100644 --- a/plugins/extractors/metabase/models/models.go +++ b/plugins/extractors/metabase/models/models.go @@ -7,11 +7,6 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" ) -const ( - datasetQueryTypeQuery = "query" - datasetQueryTypeNative = "native" -) - type Dashboard struct { ID int `json:"id"` CreatorID int `json:"creator_id"` diff --git a/plugins/extractors/optimus/client/client.go b/plugins/extractors/optimus/client/client.go index 053a2644..0a84a069 100644 --- a/plugins/extractors/optimus/client/client.go +++ b/plugins/extractors/optimus/client/client.go @@ -2,17 +2,17 @@ package client import ( "context" + "fmt" "time" - "google.golang.org/grpc/credentials/insecure" - grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" - "github.com/pkg/errors" + og "github.com/raystack/meteor/metrics/otelgrpc" pb "github.com/raystack/optimus/protos/raystack/optimus/core/v1beta1" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" ) const ( @@ -42,13 +42,13 @@ type client struct { conn *grpc.ClientConn } -func (c *client) Connect(ctx context.Context, host string, maxSizeInMB int) (err error) { +func (c *client) Connect(ctx context.Context, host string, maxSizeInMB int) error { dialTimeoutCtx, dialCancel := context.WithTimeout(ctx, time.Second*2) defer dialCancel() + var err error if c.conn, err = c.createConnection(dialTimeoutCtx, host, maxSizeInMB); err != nil { - err = errors.Wrap(err, "error creating connection") - return + return fmt.Errorf("create connection: %w", err) } c.NamespaceServiceClient = pb.NewNamespaceServiceClient(c.conn) @@ -56,7 +56,7 @@ func (c *client) Connect(ctx context.Context, host string, maxSizeInMB int) (err c.JobSpecificationServiceClient = pb.NewJobSpecificationServiceClient(c.conn) c.JobRunServiceClient = pb.NewJobRunServiceClient(c.conn) - return + return nil } func (c *client) Close() error { @@ -64,6 +64,8 @@ func (c *client) Close() error { } func (c *client) createConnection(ctx context.Context, host string, maxSizeInMB int) (*grpc.ClientConn, error) { + m := og.NewOtelGRPCMonitor(host) + if maxSizeInMB <= 0 { maxSizeInMB = GRPCMaxClientRecvSizeMB } @@ -84,10 +86,12 @@ func (c *client) createConnection(ctx context.Context, host string, maxSizeInMB grpc_retry.UnaryClientInterceptor(retryOpts...), otelgrpc.UnaryClientInterceptor(), grpc_prometheus.UnaryClientInterceptor, + m.UnaryClientInterceptor(), )), grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient( otelgrpc.StreamClientInterceptor(), grpc_prometheus.StreamClientInterceptor, + m.StreamClientInterceptor(), )), ) diff --git a/plugins/extractors/optimus/optimus.go b/plugins/extractors/optimus/optimus.go index c4ac274c..855d1e00 100644 --- a/plugins/extractors/optimus/optimus.go +++ b/plugins/extractors/optimus/optimus.go @@ -155,7 +155,6 @@ func (e *Extractor) buildJob(ctx context.Context, jobSpec *pb.JobSpecification, "endDate": strOrNil(jobSpec.EndDate), "interval": jobSpec.Interval, "dependsOnPast": jobSpec.DependsOnPast, - "catchUp": jobSpec.CatchUp, "taskName": jobSpec.TaskName, "windowSize": jobSpec.WindowSize, "windowOffset": jobSpec.WindowOffset, diff --git a/plugins/extractors/optimus/testdata/expected.json b/plugins/extractors/optimus/testdata/expected.json index 762693c0..3488dd7b 100644 --- a/plugins/extractors/optimus/testdata/expected.json +++ b/plugins/extractors/optimus/testdata/expected.json @@ -4,7 +4,6 @@ "data": { "@type": "type.googleapis.com/raystack.assets.v1beta2.Job", "attributes": { - "catchUp": false, "dependsOnPast": false, "endDate": null, "interval": "0 19 * * *", @@ -68,7 +67,6 @@ "data": { "@type": "type.googleapis.com/raystack.assets.v1beta2.Job", "attributes": { - "catchUp": true, "dependsOnPast": false, "endDate": null, "interval": "0 19 1 * *", @@ -138,7 +136,6 @@ "data": { "@type": "type.googleapis.com/raystack.assets.v1beta2.Job", "attributes": { - "catchUp": true, "dependsOnPast": false, "endDate": null, "interval": "0 19 1 * *", diff --git a/plugins/extractors/oracle/oracle_test.go b/plugins/extractors/oracle/oracle_test.go index 1bae3e8f..eeddf538 100644 --- a/plugins/extractors/oracle/oracle_test.go +++ b/plugins/extractors/oracle/oracle_test.go @@ -1,29 +1,27 @@ -//go:build plugins -// +build plugins +//go:build !plugins +// +build !plugins package oracle_test import ( "context" + "database/sql" "fmt" "log" "os" "testing" - v1beta2 "github.com/raystack/meteor/models/raystack/assets/v1beta2" - "github.com/raystack/meteor/test/utils" - "google.golang.org/protobuf/types/known/anypb" - "google.golang.org/protobuf/types/known/structpb" - - "database/sql" - "github.com/ory/dockertest/v3" "github.com/ory/dockertest/v3/docker" + v1beta2 "github.com/raystack/meteor/models/raystack/assets/v1beta2" "github.com/raystack/meteor/plugins" "github.com/raystack/meteor/plugins/extractors/oracle" "github.com/raystack/meteor/test/mocks" + "github.com/raystack/meteor/test/utils" _ "github.com/sijms/go-ora/v2" "github.com/stretchr/testify/assert" + "google.golang.org/protobuf/types/known/anypb" + "google.golang.org/protobuf/types/known/structpb" ) var db *sql.DB @@ -82,7 +80,8 @@ func TestInit(t *testing.T) { RawConfig: map[string]interface{}{ "password": "pass", "host": host, - }}) + }, + }) assert.ErrorAs(t, err, &plugins.InvalidConfigError{}) }) @@ -97,7 +96,8 @@ func TestExtract(t *testing.T) { URNScope: urnScope, RawConfig: map[string]interface{}{ "connection_url": fmt.Sprintf("oracle://%s:%s@%s/%s", user, password, host, defaultDB), - }}) + }, + }) if err != nil { t.Fatal(err) } @@ -112,12 +112,15 @@ func TestExtract(t *testing.T) { func setup() (err error) { // using system user to setup the oracle database - var queries = []string{ + queries := []string{ fmt.Sprintf("CREATE USER %s IDENTIFIED BY %s", user, password), fmt.Sprintf("GRANT CREATE SESSION TO %s", user), fmt.Sprintf("GRANT DBA TO %s", user), } err = execute(db, queries) + if err != nil { + return + } userDB, err := sql.Open("oracle", fmt.Sprintf("oracle://%s:%s@%s/%s", user, password, host, defaultDB)) if err != nil { @@ -125,13 +128,13 @@ func setup() (err error) { } defer userDB.Close() - var createTables = []string{ + createTables := []string{ "CREATE TABLE employee (empid integer primary key, name varchar2(30) NOT NULL, salary number(10, 2))", "CREATE TABLE department (id integer primary key, title varchar(20) NOT NULL, budget float(26))", "COMMENT ON column department.title IS 'Department Name'", } - var populateTables = []string{ + populateTables := []string{ "INSERT INTO employee values(10, 'Sameer', 51000.0)", "INSERT INTO employee values(11, 'Jash', 45000.60)", "INSERT INTO employee values(12, 'Jay', 70000.11)", diff --git a/plugins/extractors/snowflake/snowflake_test.go b/plugins/extractors/snowflake/snowflake_test.go index 045cbb84..100adaf4 100644 --- a/plugins/extractors/snowflake/snowflake_test.go +++ b/plugins/extractors/snowflake/snowflake_test.go @@ -9,11 +9,10 @@ import ( "net/url" "testing" - "github.com/raystack/meteor/plugins/extractors/snowflake" - "github.com/dnaeon/go-vcr/v2/cassette" "github.com/dnaeon/go-vcr/v2/recorder" "github.com/raystack/meteor/plugins" + "github.com/raystack/meteor/plugins/extractors/snowflake" "github.com/raystack/meteor/test/mocks" "github.com/raystack/meteor/test/utils" _ "github.com/snowflakedb/gosnowflake" // used to register the snowflake driver @@ -31,7 +30,8 @@ func TestInit(t *testing.T) { URNScope: urnScope, RawConfig: map[string]interface{}{ "invalid_config": "invalid_config_value", - }}) + }, + }) assert.ErrorAs(t, err, &plugins.InvalidConfigError{}) }) } @@ -40,7 +40,6 @@ func TestInit(t *testing.T) { // after that the extraction of dummy data will not be possible and test will fail. // TestExtract tests that the extractor returns the expected result func TestExtract(t *testing.T) { - t.Run("should return mock-data we generated with snowflake", func(t *testing.T) { r, err := recorder.New("fixtures/get_snowflakes_sample_data") if err != nil { @@ -68,7 +67,12 @@ func TestExtract(t *testing.T) { URNScope: urnScope, RawConfig: map[string]interface{}{ "connection_url": "testing:Snowtest0512@lrwfgiz-hi47152/SNOWFLAKE_SAMPLE_DATA", - }}); err != nil { + "exclude": snowflake.Exclude{ + Databases: []string{""}, + Tables: []string{""}, + }, + }, + }); err != nil { t.Fatal(err) } diff --git a/plugins/internal/tengoutil/structmap/structmap.go b/plugins/internal/tengoutil/structmap/structmap.go index 92e6ce33..20bc9e54 100644 --- a/plugins/internal/tengoutil/structmap/structmap.go +++ b/plugins/internal/tengoutil/structmap/structmap.go @@ -74,7 +74,7 @@ func AsStructWithTag(tagName string, input, output interface{}) error { } func checkAssetDataHookFunc() mapstructure.DecodeHookFuncType { - return func(_ reflect.Type, t reflect.Type, data interface{}) (interface{}, error) { + return func(_, t reflect.Type, data interface{}) (interface{}, error) { if t != reflect.TypeOf(v1beta2.Asset{}) && t != reflect.TypeOf(&v1beta2.Asset{}) { return data, nil } @@ -95,7 +95,7 @@ func checkAssetDataHookFunc() mapstructure.DecodeHookFuncType { // stringToTimestampHookFunc returns a DecodeHookFunc that converts // strings to timestamppb.Timestamp. func stringToTimestampHookFunc(layout string) mapstructure.DecodeHookFuncType { - return func(_ reflect.Type, t reflect.Type, data interface{}) (interface{}, error) { + return func(_, t reflect.Type, data interface{}) (interface{}, error) { s, ok := data.(string) if !ok { return data, nil @@ -115,7 +115,7 @@ func stringToTimestampHookFunc(layout string) mapstructure.DecodeHookFuncType { } func timeToTimestampHookFunc() mapstructure.DecodeHookFuncType { - return func(_ reflect.Type, t reflect.Type, data interface{}) (interface{}, error) { + return func(_, t reflect.Type, data interface{}) (interface{}, error) { ts, ok := data.(time.Time) if !ok { return data, nil @@ -129,7 +129,7 @@ func timeToTimestampHookFunc() mapstructure.DecodeHookFuncType { } func mapToAttributesHookFunc() mapstructure.DecodeHookFuncType { - return func(_ reflect.Type, t reflect.Type, data interface{}) (interface{}, error) { + return func(_, t reflect.Type, data interface{}) (interface{}, error) { m, ok := data.(map[string]interface{}) if !ok { return data, nil @@ -148,7 +148,7 @@ func mapToAnyPBHookFunc() mapstructure.DecodeHookFuncType { return nil, fmt.Errorf("mapstructure map to anypb hook: %s: %w", step, err) } - return func(_ reflect.Type, t reflect.Type, data interface{}) (interface{}, error) { + return func(_, t reflect.Type, data interface{}) (interface{}, error) { m, ok := data.(map[string]interface{}) if !ok { return data, nil diff --git a/plugins/sinks/compass/sink_test.go b/plugins/sinks/compass/sink_test.go index 38b2d229..c111435a 100644 --- a/plugins/sinks/compass/sink_test.go +++ b/plugins/sinks/compass/sink_test.go @@ -25,9 +25,7 @@ import ( "google.golang.org/protobuf/types/known/structpb" ) -var ( - host = "http://compass.com" -) +var host = "http://compass.com" // sample metadata var ( @@ -55,7 +53,7 @@ func TestInit(t *testing.T) { func TestSink(t *testing.T) { t.Run("should return error if compass host returns error", func(t *testing.T) { compassError := `{"reason":"no asset found"}` - errMessage := "error sending data: compass returns 404: {\"reason\":\"no asset found\"}" + errMessage := "send data: compass returns 404: {\"reason\":\"no asset found\"}" // setup mock client url := fmt.Sprintf("%s/v1beta1/assets", host) @@ -625,7 +623,7 @@ func (m *mockHTTPClient) Assert(t *testing.T) { headersMap[hdrKey] = strings.Join(hdrVals, ",") } assert.Equal(t, m.Headers, headersMap) - var bodyBytes = []byte("") + bodyBytes := []byte("") if m.req.Body != nil { var err error bodyBytes, err = io.ReadAll(m.req.Body) diff --git a/plugins/sinks/frontier/client/client.go b/plugins/sinks/frontier/client/client.go index 897ceed7..dfc58b95 100644 --- a/plugins/sinks/frontier/client/client.go +++ b/plugins/sinks/frontier/client/client.go @@ -9,6 +9,7 @@ import ( grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" sh "github.com/raystack/frontier/proto/v1beta1" + og "github.com/raystack/meteor/metrics/otelgrpc" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -35,18 +36,19 @@ type client struct { conn *grpc.ClientConn } -func (c *client) Connect(ctx context.Context, host string) (err error) { +func (c *client) Connect(ctx context.Context, host string) error { dialTimeoutCtx, dialCancel := context.WithTimeout(ctx, time.Second*2) defer dialCancel() - if c.conn, err = c.createConnection(dialTimeoutCtx, host); err != nil { - err = fmt.Errorf("error creating connection: %w", err) - return + var err error + c.conn, err = c.createConnection(dialTimeoutCtx, host) + if err != nil { + return fmt.Errorf("create connection: %w", err) } c.FrontierServiceClient = sh.NewFrontierServiceClient(c.conn) - return + return nil } func (c *client) Close() error { @@ -54,6 +56,8 @@ func (c *client) Close() error { } func (c *client) createConnection(ctx context.Context, host string) (*grpc.ClientConn, error) { + m := og.NewOtelGRPCMonitor(host) + retryOpts := []grpc_retry.CallOption{ grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100 * time.Millisecond)), grpc_retry.WithMax(GRPCMaxRetry), @@ -70,10 +74,12 @@ func (c *client) createConnection(ctx context.Context, host string) (*grpc.Clien grpc_retry.UnaryClientInterceptor(retryOpts...), otelgrpc.UnaryClientInterceptor(), grpc_prometheus.UnaryClientInterceptor, + m.UnaryClientInterceptor(), )), grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient( otelgrpc.StreamClientInterceptor(), grpc_prometheus.StreamClientInterceptor, + m.StreamClientInterceptor(), )), ) diff --git a/plugins/sinks/frontier/sink.go b/plugins/sinks/frontier/sink.go index c4ccfb13..7e638317 100644 --- a/plugins/sinks/frontier/sink.go +++ b/plugins/sinks/frontier/sink.go @@ -83,7 +83,7 @@ func (s *Sink) Sink(ctx context.Context, batch []models.Record) error { } if err = s.send(ctx, userRequestBody); err != nil { - return errors.Wrap(err, "error sending data") + return fmt.Errorf("send data: %w", err) } s.logger.Info("successfully sinked record to frontier", "record", asset.Name) @@ -92,10 +92,8 @@ func (s *Sink) Sink(ctx context.Context, batch []models.Record) error { return nil } -func (s *Sink) Close() (err error) { - return - //TODO: Connection closes even when some records are unpiblished - //TODO: return s.client.Close() +func (*Sink) Close() error { + return nil } func (s *Sink) send(ctx context.Context, userRequestBody *sh.UserRequestBody) error { @@ -125,7 +123,7 @@ func (s *Sink) send(ctx context.Context, userRequestBody *sh.UserRequestBody) er return err } } else { - err = fmt.Errorf("not able to parse error returned %v", err) + err = fmt.Errorf("unable to parse error returned: %w", err) } return err @@ -137,7 +135,7 @@ func (s *Sink) buildUserRequestBody(asset *assetsv1beta2.Asset) (*sh.UserRequest var user assetsv1beta2.User err := data.UnmarshalTo(&user) if err != nil { - return &sh.UserRequestBody{}, errors.Wrap(err, "not a User struct") + return &sh.UserRequestBody{}, fmt.Errorf("not a User struct: %w", err) } if user.FullName == "" { diff --git a/recipe/envs.go b/recipe/envs.go index 3d01e2ac..6e9e27ff 100644 --- a/recipe/envs.go +++ b/recipe/envs.go @@ -8,9 +8,7 @@ import ( "github.com/spf13/viper" ) -var ( - recipeEnvVarPrefix = "METEOR_" -) +var recipeEnvVarPrefix = "METEOR_" func populateData(pathToConfig string) map[string]string { viper.AddConfigPath(".") @@ -52,20 +50,16 @@ func populateDataFromLocal() map[string]string { return data } -func mapToMeteorKey(rawKey string) (key string, ok bool) { +func mapToMeteorKey(rawKey string) (string, bool) { // we are doing everything in lowercase for case insensitivity - key = strings.ToLower(rawKey) + key := strings.ToLower(rawKey) meteorPrefix := strings.ToLower(recipeEnvVarPrefix) - keyPrefixLen := len(meteorPrefix) - - isMeteorKeyFormat := len(key) > keyPrefixLen && key[:keyPrefixLen] == meteorPrefix - if !isMeteorKeyFormat { - return + if !strings.HasPrefix(key, meteorPrefix) { + return "", false } - key = key[keyPrefixLen:] // strips prefix - meteor_user_id becomes user_id - ok = true - return + // strips prefix - meteor_user_id becomes user_id + return key[len(meteorPrefix):], true } func loadDataFromYaml(path string) map[string]string { @@ -88,7 +82,7 @@ func loadDataFromYaml(path string) map[string]string { keys := viper.AllKeys() for i := range keys { varConvention := strings.Join(strings.Split(keys[i], "."), "_") - data[varConvention] = viper.Get(keys[i]).(string) + data[varConvention] = viper.GetString(keys[i]) } return data diff --git a/recipe/node.go b/recipe/node.go index f0bb38b3..16f82cc1 100644 --- a/recipe/node.go +++ b/recipe/node.go @@ -40,7 +40,7 @@ func (plug PluginNode) decodeConfig() (map[string]interface{}, error) { } // toRecipe passes the value from RecipeNode to Recipe -func (node RecipeNode) toRecipe() (recipe Recipe, err error) { +func (node RecipeNode) toRecipe() (Recipe, error) { // It supports both tags `name` and `type` for source // till `type` tag gets deprecated if node.Source.Name.IsZero() { @@ -48,21 +48,20 @@ func (node RecipeNode) toRecipe() (recipe Recipe, err error) { } sourceConfig, err := node.Source.decodeConfig() if err != nil { - err = fmt.Errorf("error decoding source config :%w", err) - return + return Recipe{}, fmt.Errorf("decode source config :%w", err) } + processors, err := node.toProcessors() if err != nil { - err = fmt.Errorf("error building processors :%w", err) - return + return Recipe{}, fmt.Errorf("build processors :%w", err) } + sinks, err := node.toSinks() if err != nil { - err = fmt.Errorf("error building sinks :%w", err) - return + return Recipe{}, fmt.Errorf("build sinks :%w", err) } - recipe = Recipe{ + return Recipe{ Name: node.Name.Value, Version: node.Version.Value, Source: PluginRecipe{ @@ -74,41 +73,41 @@ func (node RecipeNode) toRecipe() (recipe Recipe, err error) { Sinks: sinks, Processors: processors, Node: node, - } - - return + }, nil } // toProcessors passes the value of processor PluginNode to its PluginRecipe -func (node RecipeNode) toProcessors() (processors []PluginRecipe, err error) { +func (node RecipeNode) toProcessors() ([]PluginRecipe, error) { + var processors []PluginRecipe for _, processor := range node.Processors { - processorConfig, cfgErr := processor.decodeConfig() - if cfgErr != nil { - err = fmt.Errorf("error decoding processor config :%w", cfgErr) - return + processorConfig, err := processor.decodeConfig() + if err != nil { + return nil, fmt.Errorf("decode processor config :%w", err) } + processors = append(processors, PluginRecipe{ Name: processor.Name.Value, Config: processorConfig, Node: processor, }) } - return + return processors, nil } // toSinks passes the value of sink PluginNode to its PluginRecipe -func (node RecipeNode) toSinks() (sinks []PluginRecipe, err error) { +func (node RecipeNode) toSinks() ([]PluginRecipe, error) { + var sinks []PluginRecipe for _, sink := range node.Sinks { - sinkConfig, cfgErr := sink.decodeConfig() - if cfgErr != nil { - err = fmt.Errorf("error decoding sink config :%w", cfgErr) - return + sinkConfig, err := sink.decodeConfig() + if err != nil { + return nil, fmt.Errorf("decode sink config :%w", err) } + sinks = append(sinks, PluginRecipe{ Name: sink.Name.Value, Config: sinkConfig, Node: sink, }) } - return + return sinks, nil } diff --git a/recipe/reader.go b/recipe/reader.go index ce63db82..76a4d75b 100644 --- a/recipe/reader.go +++ b/recipe/reader.go @@ -19,9 +19,7 @@ type Reader struct { log log.Logger } -var ( - ErrInvalidRecipeVersion = errors.New("recipe version is invalid or not found") -) +var ErrInvalidRecipeVersion = errors.New("recipe version is invalid or not found") // NewReader returns a new Reader. func NewReader(lg log.Logger, pathToConfig string) *Reader { @@ -32,43 +30,42 @@ func NewReader(lg log.Logger, pathToConfig string) *Reader { } // Read loads the list of recipes from a give file or directory path. -func (r *Reader) Read(path string) (recipes []Recipe, err error) { +func (r *Reader) Read(path string) ([]Recipe, error) { fi, err := os.Stat(path) if err != nil { return nil, err } + switch mode := fi.Mode(); { case mode.IsDir(): - recipes, err = r.readDir(r.log, path) - if err != nil { - return nil, err - } + return r.readDir(r.log, path) + case mode.IsRegular(): recipe, err := r.readFile(path) if err != nil { return nil, err } - recipes = append(recipes, recipe) + + return []Recipe{recipe}, nil } - return + + return nil, nil } -func (r *Reader) readFile(path string) (recipe Recipe, err error) { - template, err := template.ParseFiles(path) +func (r *Reader) readFile(path string) (Recipe, error) { + tmpl, err := template.ParseFiles(path) if err != nil { - return + return Recipe{}, err } var buff bytes.Buffer - err = template.Execute(&buff, r.data) - if err != nil { - return + if err := tmpl.Execute(&buff, r.data); err != nil { + return Recipe{}, err } var node RecipeNode - err = yaml.Unmarshal(buff.Bytes(), &node) - if err != nil { - return + if err := yaml.Unmarshal(buff.Bytes(), &node); err != nil { + return Recipe{}, err } if node.Name.Value == "" { @@ -80,23 +77,24 @@ func (r *Reader) readFile(path string) (recipe Recipe, err error) { versions := generator.GetRecipeVersions() err = validateRecipeVersion(node.Version.Value, versions[len(versions)-1]) if err != nil { - return + return Recipe{}, err } - recipe, err = node.toRecipe() + recipe, err := node.toRecipe() if err != nil { - return + return Recipe{}, err } - return + return recipe, nil } -func (r *Reader) readDir(lg log.Logger, path string) (recipes []Recipe, err error) { +func (r *Reader) readDir(lg log.Logger, path string) ([]Recipe, error) { entries, err := os.ReadDir(path) if err != nil { - return + return nil, err } + var recipes []Recipe for _, entry := range entries { x := filepath.Join(path, entry.Name()) recipe, err := r.readFile(x) @@ -108,12 +106,12 @@ func (r *Reader) readDir(lg log.Logger, path string) (recipes []Recipe, err erro recipes = append(recipes, recipe) } - return + return recipes, nil } -func validateRecipeVersion(receivedVersion, expectedVersion string) (err error) { - if strings.Compare(receivedVersion, expectedVersion) == 0 { - return +func validateRecipeVersion(receivedVersion, expectedVersion string) error { + if receivedVersion != expectedVersion { + return ErrInvalidRecipeVersion } - return ErrInvalidRecipeVersion + return nil } diff --git a/registry/sinks.go b/registry/sinks.go index 842e6a68..5f759bdd 100644 --- a/registry/sinks.go +++ b/registry/sinks.go @@ -1,7 +1,8 @@ package registry import ( - "github.com/pkg/errors" + "fmt" + "github.com/raystack/meteor/plugins" ) @@ -19,7 +20,7 @@ func (f *SinkFactory) Get(name string) (plugins.Syncer, error) { } // Info returns information about a Sink. -func (f *SinkFactory) Info(name string) (info plugins.Info, err error) { +func (f *SinkFactory) Info(name string) (plugins.Info, error) { sink, err := f.Get(name) if err != nil { return plugins.Info{}, err @@ -38,10 +39,11 @@ func (f *SinkFactory) List() map[string]plugins.Info { } // Register registers a Sink. -func (f *SinkFactory) Register(name string, fn func() plugins.Syncer) (err error) { +func (f *SinkFactory) Register(name string, fn func() plugins.Syncer) error { if _, ok := f.fnStore[name]; ok { - return errors.Errorf("duplicate syncer: %s", name) + return fmt.Errorf("duplicate syncer: %s", name) } + f.fnStore[name] = fn - return + return nil } diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index 9616817a..0fe9941a 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -15,21 +15,18 @@ import ( "testing" "time" - v1beta2 "github.com/raystack/meteor/models/raystack/assets/v1beta2" - "google.golang.org/protobuf/types/known/anypb" - "github.com/ory/dockertest/v3" "github.com/ory/dockertest/v3/docker" - "github.com/raystack/meteor/test/utils" - - "github.com/pkg/errors" "github.com/raystack/meteor/cmd" + v1beta2 "github.com/raystack/meteor/models/raystack/assets/v1beta2" _ "github.com/raystack/meteor/plugins/extractors" _ "github.com/raystack/meteor/plugins/processors" _ "github.com/raystack/meteor/plugins/sinks" + "github.com/raystack/meteor/test/utils" "github.com/segmentio/kafka-go" "github.com/stretchr/testify/assert" "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" ) var ( @@ -149,11 +146,10 @@ func listenToTopic(ctx context.Context, topic string, data *[]*v1beta2.Asset) er msg, err := reader.ReadMessage(ctx) if err != nil { break - } var convertMsg v1beta2.Asset if err := proto.Unmarshal(msg.Value, &convertMsg); err != nil { - return errors.Wrap(err, "failed to parse kafka message") + return fmt.Errorf("parse kafka message: %w", err) } *data = append(*data, &convertMsg) } @@ -165,7 +161,7 @@ func listenToTopic(ctx context.Context, topic string, data *[]*v1beta2.Asset) er func setupKafka() error { conn, err := kafka.DialLeader(context.TODO(), "tcp", net.JoinHostPort(broker.Host, strconv.Itoa(broker.Port)), testTopic, partition) if err != nil { - return errors.Wrap(err, "failed to setup kafka connection") + return fmt.Errorf("setup kafka connection: %w", err) } defer func(conn *kafka.Conn) { if err := conn.Close(); err != nil { @@ -174,14 +170,14 @@ func setupKafka() error { }(conn) if err := conn.DeleteTopics(testTopic); err != nil { - return errors.Wrap(err, "failed to delete topic") + return fmt.Errorf("delete topic: %w", err) } if err := conn.CreateTopics(kafka.TopicConfig{ Topic: testTopic, NumPartitions: 1, ReplicationFactor: 1, }); err != nil { - return errors.Wrap(err, "failed to create topic") + return fmt.Errorf("create topic: %w", err) } return nil @@ -197,7 +193,7 @@ func setupMySQL() (err error) { fmt.Sprintf(`CREATE USER IF NOT EXISTS '%s'@'%%' IDENTIFIED BY '%s';`, user, pass), fmt.Sprintf(`GRANT ALL PRIVILEGES ON *.* TO '%s'@'%%';`, user), }); err != nil { - return errors.Wrap(err, "failed to create database") + return fmt.Errorf("create database: %w", err) } // create and populate tables @@ -207,7 +203,7 @@ func setupMySQL() (err error) { "CREATE TABLE jobs (job_id int, job varchar(255), department varchar(255));", "INSERT INTO jobs VALUES (2, 'test2', 'test22');", }); err != nil { - return errors.Wrap(err, "failed to populate database") + return fmt.Errorf("populate database: %w", err) } return @@ -245,10 +241,10 @@ func kafkaDockerSetup() (purge func() error, err error) { kafkaRetryFn := func(resource *dockertest.Resource) (err error) { // create client if conn, err = kafka.Dial("tcp", brokerHost); err != nil { - return errors.Wrap(err, "failed to kafka create client") + return fmt.Errorf("kafka create client: %w", err) } if broker, err = conn.Controller(); err != nil { - return errors.Wrap(err, "failed to generate broker request") + return fmt.Errorf("generate broker request: %w", err) } return } @@ -280,7 +276,7 @@ func mysqlDockerSetup() (purge func() error, err error) { mysqlRetryFn := func(resource *dockertest.Resource) (err error) { db, err = sql.Open("mysql", fmt.Sprintf("root:%s@tcp(%s)/", pass, mysqlHost)) if err != nil { - return errors.Wrap(err, "failed to create mysql client") + return fmt.Errorf("create mysql client: %w", err) } return db.Ping() } diff --git a/test/mocks/plugin.go b/test/mocks/plugin.go index e693afea..14dd17e0 100644 --- a/test/mocks/plugin.go +++ b/test/mocks/plugin.go @@ -4,10 +4,9 @@ import ( "context" "github.com/raystack/meteor/models" + v1beta2 "github.com/raystack/meteor/models/raystack/assets/v1beta2" "github.com/raystack/meteor/plugins" "github.com/stretchr/testify/mock" - - v1beta2 "github.com/raystack/meteor/models/raystack/assets/v1beta2" ) type Plugin struct { @@ -99,11 +98,10 @@ func (m *Emitter) Get() []models.Record { return m.data } -func (m *Emitter) GetAllData() (data []*v1beta2.Asset) { - records := m.Get() - for _, r := range records { +func (m *Emitter) GetAllData() []*v1beta2.Asset { + var data []*v1beta2.Asset + for _, r := range m.Get() { data = append(data, r.Data()) } - - return + return data } diff --git a/test/utils/dockertest.go b/test/utils/dockertest.go index 95fb72b5..1478ec8d 100644 --- a/test/utils/dockertest.go +++ b/test/utils/dockertest.go @@ -7,42 +7,42 @@ import ( "github.com/ory/dockertest/v3" ) -//CreateContainer will create a docker container using the RunOptions given +// CreateContainer will create a docker container using the RunOptions given // -//"opts" is the configuration for docker +// "opts" is the configuration for docker // -//"retryOp" is an exponential backoff-retry, because the application in the container might not be ready to accept connections yet -func CreateContainer(opts dockertest.RunOptions, retryOp func(r *dockertest.Resource) error) (purgeFn func() error, err error) { +// "retryOp" is an exponential backoff-retry, because the application in the container might not be ready to accept connections yet +func CreateContainer(opts dockertest.RunOptions, retryOp func(r *dockertest.Resource) error) (func() error, error) { pool, err := dockertest.NewPool("") // exponential backoff-retry, because the application in the container might not be ready to accept connections yet pool.MaxWait = 300 * time.Second if err != nil { - return purgeFn, fmt.Errorf("could not create dockertest pool: %s", err) + return nil, fmt.Errorf("create dockertest pool: %w", err) } resource, err := pool.RunWithOptions(&opts) if err != nil { - return purgeFn, fmt.Errorf("could not start resource: %s", err.Error()) + return nil, fmt.Errorf("start resource: %w", err) } - purgeFn = func() error { + purgeFn := func() error { if err := pool.Purge(resource); err != nil { - return fmt.Errorf("could not purge resource: %s", err) + return fmt.Errorf("purge resource: %w", err) } - return nil } - if err = pool.Retry(func() (err error) { - err = retryOp(resource) - if err != nil { - fmt.Println(fmt.Errorf("retrying: %s", err)) + if err := pool.Retry(func() error { + if err := retryOp(resource); err != nil { + fmt.Printf("retrying: %s\n", err) + return err } - return + return nil }); err != nil { if err := purgeFn(); err != nil { return nil, err } - return purgeFn, fmt.Errorf("could not connect to docker: %s", err.Error()) + return nil, fmt.Errorf("connect to docker: %w", err) } - return + + return purgeFn, nil } diff --git a/utils/error_status.go b/utils/error_status.go index da32bb9f..31db64c2 100644 --- a/utils/error_status.go +++ b/utils/error_status.go @@ -6,6 +6,26 @@ import ( "google.golang.org/grpc/status" ) +var codeToStr = map[codes.Code]string{ + codes.OK: `"OK"`, + codes.Canceled: `"CANCELED"`, + codes.Unknown: `"UNKNOWN"`, + codes.InvalidArgument: `"INVALID_ARGUMENT"`, + codes.DeadlineExceeded: `"DEADLINE_EXCEEDED"`, + codes.NotFound: `"NOT_FOUND"`, + codes.AlreadyExists: `"ALREADY_EXISTS"`, + codes.PermissionDenied: `"PERMISSION_DENIED"`, + codes.ResourceExhausted: `"RESOURCE_EXHAUSTED"`, + codes.FailedPrecondition: `"FAILED_PRECONDITION"`, + codes.Aborted: `"ABORTED"`, + codes.OutOfRange: `"OUT_OF_RANGE"`, + codes.Unimplemented: `"UNIMPLEMENTED"`, + codes.Internal: `"INTERNAL"`, + codes.Unavailable: `"UNAVAILABLE"`, + codes.DataLoss: `"DATA_LOSS"`, + codes.Unauthenticated: `"UNAUTHENTICATED"`, +} + func StatusCode(err error) codes.Code { if err == nil { return codes.OK @@ -20,3 +40,7 @@ func StatusCode(err error) codes.Code { return codes.Unknown } + +func StatusText(err error) string { + return codeToStr[StatusCode(err)] +}