From c0e8fbb8206dbd53ef9b24b48ac8e68c3c958544 Mon Sep 17 00:00:00 2001 From: Dery Rahman Ahaddienata Date: Mon, 30 Sep 2024 08:50:30 +0700 Subject: [PATCH 01/11] test: add client test --- max2max/go.mod | 9 +- max2max/go.sum | 1 + max2max/internal/client/client.go | 39 +++------ max2max/internal/client/client_test.go | 112 +++++++++++++++++++++++++ max2max/internal/client/odps.go | 43 ++++++++++ max2max/internal/loader/factory.go | 16 ++-- 6 files changed, 184 insertions(+), 36 deletions(-) create mode 100644 max2max/internal/client/client_test.go create mode 100644 max2max/internal/client/odps.go diff --git a/max2max/go.mod b/max2max/go.mod index 05d18b0..1f1374d 100644 --- a/max2max/go.mod +++ b/max2max/go.mod @@ -2,18 +2,23 @@ module github.com/goto/maxcompute-transformation go 1.22.3 -require github.com/aliyun/aliyun-odps-go-sdk v0.3.4 +require ( + github.com/aliyun/aliyun-odps-go-sdk v0.3.4 + github.com/stretchr/testify v1.9.0 +) require ( + github.com/davecgh/go-spew v1.1.1 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/flatbuffers v23.5.26+incompatible // indirect github.com/google/uuid v1.3.0 // indirect github.com/klauspost/compress v1.15.9 // indirect github.com/pierrec/lz4/v4 v4.1.18 // indirect github.com/pkg/errors v0.9.1 // indirect - github.com/stretchr/testify v1.9.0 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect google.golang.org/protobuf v1.31.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/max2max/go.sum b/max2max/go.sum index 40ec016..dabc72a 100644 --- a/max2max/go.sum +++ b/max2max/go.sum @@ -258,6 +258,7 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0 google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA= gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= diff --git a/max2max/internal/client/client.go b/max2max/internal/client/client.go index c47e580..dff1783 100644 --- a/max2max/internal/client/client.go +++ b/max2max/internal/client/client.go @@ -14,19 +14,24 @@ type Loader interface { GetPartitionedQuery(tableID, query string, partitionName []string) string } -type client struct { +type OdpsClient interface { + GetPartitionNames(tableID string) ([]string, error) + ExecSQL(query string) error +} + +type Client struct { logger *slog.Logger - odpsClient *odps.Odps + OdpsClient OdpsClient } -func NewClient(logger *slog.Logger, odpsClient *odps.Odps) *client { - return &client{ +func NewClient(logger *slog.Logger, odpsClient *odps.Odps) *Client { + return &Client{ logger: logger, - odpsClient: odpsClient, + OdpsClient: NewODPSClient(odpsClient), } } -func (c *client) Execute(loader Loader, tableID, queryFilePath string) error { +func (c *Client) Execute(loader Loader, tableID, queryFilePath string) error { // read query from filepath c.logger.Info(fmt.Sprintf("executing query from %s", queryFilePath)) queryRaw, err := os.ReadFile(queryFilePath) @@ -36,7 +41,7 @@ func (c *client) Execute(loader Loader, tableID, queryFilePath string) error { // check if table is partitioned c.logger.Info(fmt.Sprintf("checking if table %s is partitioned", tableID)) - partitionNames, err := c.getPartitionNames(tableID) + partitionNames, err := c.OdpsClient.GetPartitionNames(tableID) if err != nil { return err } @@ -50,28 +55,10 @@ func (c *client) Execute(loader Loader, tableID, queryFilePath string) error { // execute query with odps client c.logger.Info(fmt.Sprintf("execute: %s", queryToExec)) - taskIns, err := c.odpsClient.ExecSQl(queryToExec) - if err != nil { + if err := c.OdpsClient.ExecSQL(queryToExec); err != nil { return err } - // wait execution success - c.logger.Info(fmt.Sprintf("taskId: %s", taskIns.Id())) - if err := taskIns.WaitForSuccess(); err != nil { - return err - } c.logger.Info("execution done") return nil } - -func (c *client) getPartitionNames(tableID string) ([]string, error) { - table := c.odpsClient.Table(tableID) - if err := table.Load(); err != nil { - return nil, err - } - var partitionNames []string - for _, partition := range table.Schema().PartitionColumns { - partitionNames = append(partitionNames, partition.Name) - } - return partitionNames, nil -} diff --git a/max2max/internal/client/client_test.go b/max2max/internal/client/client_test.go new file mode 100644 index 0000000..d2a552a --- /dev/null +++ b/max2max/internal/client/client_test.go @@ -0,0 +1,112 @@ +package client_test + +import ( + "fmt" + "log/slog" + "os" + "testing" + + "github.com/goto/maxcompute-transformation/internal/client" + "github.com/stretchr/testify/assert" +) + +func TestExecute(t *testing.T) { + t.Run("should return error when reading query file fails", func(t *testing.T) { + // arrange + client := client.NewClient(slog.Default(), nil) + client.OdpsClient = &mockOdpsClient{} + // act + err := client.Execute(nil, "", "./nonexistentfile") + // assert + assert.Error(t, err) + }) + t.Run("should return error when getting partition name fails", func(t *testing.T) { + // arrange + client := client.NewClient(slog.Default(), nil) + client.OdpsClient = &mockOdpsClient{ + partitionResult: func() ([]string, error) { + return nil, fmt.Errorf("error get partition name") + }, + } + assert.NoError(t, os.WriteFile("/tmp/query.sql", []byte("SELECT * FROM table;"), 0644)) + // act + err := client.Execute(nil, "project_test.table_test", "/tmp/query.sql") + // assert + assert.Error(t, err) + assert.ErrorContains(t, err, "error get partition name") + }) + t.Run("should return error when executing query fails", func(t *testing.T) { + // arrange + client := client.NewClient(slog.Default(), nil) + client.OdpsClient = &mockOdpsClient{ + partitionResult: func() ([]string, error) { + return nil, nil + }, + execSQLResult: func() error { + return fmt.Errorf("error exec sql") + }, + } + loader := &mockLoader{ + getQueryResult: func() string { + return "INSERT INTO table SELECT * FROM table;" + }, + } + assert.NoError(t, os.WriteFile("/tmp/query.sql", []byte("SELECT * FROM table;"), 0644)) + // act + err := client.Execute(loader, "project_test.table_test", "/tmp/query.sql") + // assert + assert.Error(t, err) + assert.ErrorContains(t, err, "error exec sql") + }) + t.Run("should return nil when everything is successful", func(t *testing.T) { + // arrange + client := client.NewClient(slog.Default(), nil) + client.OdpsClient = &mockOdpsClient{ + partitionResult: func() ([]string, error) { + return []string{"event_date"}, nil + }, + execSQLResult: func() error { + return nil + }, + } + loader := &mockLoader{ + getQueryResult: func() string { + return "INSERT INTO table SELECT * FROM table;" + }, + getPartitionedQueryResult: func() string { + return "INSERT INTO table PARTITION (event_date) SELECT * FROM table;" + }, + } + assert.NoError(t, os.WriteFile("/tmp/query.sql", []byte("SELECT * FROM table;"), 0644)) + // act + err := client.Execute(loader, "project_test.table_test", "/tmp/query.sql") + // assert + assert.NoError(t, err) + }) +} + +type mockOdpsClient struct { + partitionResult func() ([]string, error) + execSQLResult func() error +} + +func (m *mockOdpsClient) GetPartitionNames(tableID string) ([]string, error) { + return m.partitionResult() +} + +func (m *mockOdpsClient) ExecSQL(query string) error { + return m.execSQLResult() +} + +type mockLoader struct { + getQueryResult func() string + getPartitionedQueryResult func() string +} + +func (m *mockLoader) GetQuery(tableID, query string) string { + return m.getQueryResult() +} + +func (m *mockLoader) GetPartitionedQuery(tableID, query string, partitionName []string) string { + return m.getPartitionedQueryResult() +} diff --git a/max2max/internal/client/odps.go b/max2max/internal/client/odps.go new file mode 100644 index 0000000..0c80895 --- /dev/null +++ b/max2max/internal/client/odps.go @@ -0,0 +1,43 @@ +package client + +import ( + "fmt" + "log/slog" + + "github.com/aliyun/aliyun-odps-go-sdk/odps" +) + +type odpsClient struct { + logger *slog.Logger + client *odps.Odps +} + +func NewODPSClient(client *odps.Odps) *odpsClient { + return &odpsClient{ + client: client, + } +} + +// ExecSQL executes the given query in syncronous mode (blocking) +func (c *odpsClient) ExecSQL(query string) error { + taskIns, err := c.client.ExecSQl(query) + if err != nil { + return err + } + + // wait execution success + c.logger.Info(fmt.Sprintf("taskId: %s", taskIns.Id())) + return taskIns.WaitForSuccess() +} + +func (c *odpsClient) GetPartitionNames(tableID string) ([]string, error) { + table := c.client.Table(tableID) + if err := table.Load(); err != nil { + return nil, err + } + var partitionNames []string + for _, partition := range table.Schema().PartitionColumns { + partitionNames = append(partitionNames, partition.Name) + } + return partitionNames, nil +} diff --git a/max2max/internal/loader/factory.go b/max2max/internal/loader/factory.go index e9c052f..fb7ff9e 100644 --- a/max2max/internal/loader/factory.go +++ b/max2max/internal/loader/factory.go @@ -14,14 +14,14 @@ func GetLoader(name string, logger *slog.Logger) (Loader, error) { switch name { case APPEND: return NewAppendLoader(logger), nil - case REPLACE: - return NewReplaceLoader(logger), nil - case REPLACE_ALL: - return NewReplaceAllLoader(logger), nil - case MERGE: - return NewMergeLoader(logger), nil - case MERGE_REPLACE: - return NewMergeReplaceLoader(logger), nil + // case REPLACE: + // return NewReplaceLoader(logger), nil + // case REPLACE_ALL: + // return NewReplaceAllLoader(logger), nil + // case MERGE: + // return NewMergeLoader(logger), nil + // case MERGE_REPLACE: + // return NewMergeReplaceLoader(logger), nil default: return nil, fmt.Errorf("loader %s not found", name) } From 6c35e315dac37a6c53d7314f38e529756768ec2a Mon Sep 17 00:00:00 2001 From: Dery Rahman Ahaddienata Date: Mon, 30 Sep 2024 09:16:00 +0700 Subject: [PATCH 02/11] feat: setup opentelemetry metrics --- max2max/internal/client/client.go | 4 ++ max2max/internal/client/opentelemetry.go | 55 ++++++++++++++++++++++++ max2max/main.go | 1 + 3 files changed, 60 insertions(+) create mode 100644 max2max/internal/client/opentelemetry.go diff --git a/max2max/internal/client/client.go b/max2max/internal/client/client.go index dff1783..06685ef 100644 --- a/max2max/internal/client/client.go +++ b/max2max/internal/client/client.go @@ -62,3 +62,7 @@ func (c *Client) Execute(loader Loader, tableID, queryFilePath string) error { c.logger.Info("execution done") return nil } + +func (c *Client) Close() { + // any cleanup +} diff --git a/max2max/internal/client/opentelemetry.go b/max2max/internal/client/opentelemetry.go new file mode 100644 index 0000000..809c406 --- /dev/null +++ b/max2max/internal/client/opentelemetry.go @@ -0,0 +1,55 @@ +package client + +import ( + "context" + "time" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" + "go.opentelemetry.io/otel/metric/global" + "go.opentelemetry.io/otel/sdk/resource" + "go.opentelemetry.io/otel/sdk/trace" +) + +func initOpenTelemetry() (func(context.Context) error, error) { + ctx := context.Background() + + // Create OTLP trace exporter + traceExporter, err := otlptracehttp.New(ctx) + if err != nil { + return nil, err + } + + // Create trace provider + traceProvider := trace.NewTracerProvider( + trace.WithBatcher(traceExporter), + trace.WithResource(resource.NewWithAttributes( + attribute.String("service.name", "example-service"), + )), + ) + otel.SetTracerProvider(traceProvider) + + // Create metric controller + metricController := basic.New( + simple.NewWithExactDistribution(), + metric.WithResource(resource.NewWithAttributes( + attribute.String("service.name", "example-service"), + )), + metric.WithReader(metric.NewPeriodicReader( + metric.NewExportPipeline( + metric.NewSimpleSelector(), + aggregation.CumulativeTemporalitySelector(), + ), + time.Minute, + )), + ) + global.SetMeterProvider(metricController.MeterProvider()) + + return func(ctx context.Context) error { + if err := traceProvider.Shutdown(ctx); err != nil { + return err + } + return metricController.Stop(ctx) + }, nil +} diff --git a/max2max/main.go b/max2max/main.go index 2053abe..3bc5034 100644 --- a/max2max/main.go +++ b/max2max/main.go @@ -30,6 +30,7 @@ func main() { } // initiate client client := client.NewClient(logger, cfg.GenOdps()) + defer client.Close() // execute query err = client.Execute(loader, cfg.DestinationTableID, cfg.QueryFilePath) From 538031721b789f8a078d2fd0fcfe6e72634aa82b Mon Sep 17 00:00:00 2001 From: Dery Rahman Ahaddienata Date: Mon, 30 Sep 2024 09:24:13 +0700 Subject: [PATCH 03/11] chore: change package name to transformers --- max2max/go.mod | 4 ++-- max2max/go.sum | 4 ++-- max2max/internal/client/client_test.go | 2 +- max2max/main.go | 8 ++++---- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/max2max/go.mod b/max2max/go.mod index 1f1374d..eed2089 100644 --- a/max2max/go.mod +++ b/max2max/go.mod @@ -1,9 +1,9 @@ -module github.com/goto/maxcompute-transformation +module github.com/goto/transformers/max2max go 1.22.3 require ( - github.com/aliyun/aliyun-odps-go-sdk v0.3.4 + github.com/aliyun/aliyun-odps-go-sdk v0.3.5 github.com/stretchr/testify v1.9.0 ) diff --git a/max2max/go.sum b/max2max/go.sum index dabc72a..fe2e860 100644 --- a/max2max/go.sum +++ b/max2max/go.sum @@ -9,8 +9,8 @@ github.com/ajstarks/deck v0.0.0-20200831202436-30c9fc6549a9/go.mod h1:JynElWSGnm github.com/ajstarks/deck/generate v0.0.0-20210309230005-c3f852c02e19/go.mod h1:T13YZdzov6OU0A1+RfKZiZN9ca6VeKdBdyDV+BY97Tk= github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw= github.com/ajstarks/svgo v0.0.0-20211024235047-1546f124cd8b/go.mod h1:1KcenG0jGWcpt8ov532z81sp/kMMUG485J2InIOyADM= -github.com/aliyun/aliyun-odps-go-sdk v0.3.4 h1:IwidtZJUmFjlwBRb/24LGsYn/PSeIAcV7r5Ia09dvkE= -github.com/aliyun/aliyun-odps-go-sdk v0.3.4/go.mod h1:o2yLh138hfeBZThn+rorDVNhoaFsPwFSF+CgE69yaw8= +github.com/aliyun/aliyun-odps-go-sdk v0.3.5 h1:nQp/USaiTHKMORaErwGKGGhT6igtn8qBXk/B7S9+c8k= +github.com/aliyun/aliyun-odps-go-sdk v0.3.5/go.mod h1:o2yLh138hfeBZThn+rorDVNhoaFsPwFSF+CgE69yaw8= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/boombuler/barcode v1.0.0/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8= github.com/boombuler/barcode v1.0.1/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8= diff --git a/max2max/internal/client/client_test.go b/max2max/internal/client/client_test.go index d2a552a..712447c 100644 --- a/max2max/internal/client/client_test.go +++ b/max2max/internal/client/client_test.go @@ -6,7 +6,7 @@ import ( "os" "testing" - "github.com/goto/maxcompute-transformation/internal/client" + "github.com/goto/transformers/max2max/internal/client" "github.com/stretchr/testify/assert" ) diff --git a/max2max/main.go b/max2max/main.go index 3bc5034..ad70666 100644 --- a/max2max/main.go +++ b/max2max/main.go @@ -2,10 +2,10 @@ package main import ( _ "github.com/aliyun/aliyun-odps-go-sdk/sqldriver" - "github.com/goto/maxcompute-transformation/internal/client" - "github.com/goto/maxcompute-transformation/internal/config" - "github.com/goto/maxcompute-transformation/internal/loader" - "github.com/goto/maxcompute-transformation/internal/logger" + "github.com/goto/transformers/max2max/internal/client" + "github.com/goto/transformers/max2max/internal/config" + "github.com/goto/transformers/max2max/internal/loader" + "github.com/goto/transformers/max2max/internal/logger" ) // TODO: From fd1702c300295a8668ca702a2f8c7fb867885c05 Mon Sep 17 00:00:00 2001 From: Dery Rahman Ahaddienata Date: Mon, 30 Sep 2024 10:18:29 +0700 Subject: [PATCH 04/11] chore: add opentelemetry metrics exporter --- max2max/go.mod | 21 +++++++++- max2max/go.sum | 41 +++++++++++++++++++ max2max/internal/client/client.go | 32 +++++++++------ max2max/internal/client/client_test.go | 26 ++++++------ max2max/internal/client/odps.go | 1 + max2max/internal/client/opentelemetry.go | 50 ++++++------------------ max2max/internal/client/setup.go | 40 +++++++++++++++++++ max2max/internal/config/config.go | 11 ++++-- max2max/main.go | 9 ++++- 9 files changed, 164 insertions(+), 67 deletions(-) create mode 100644 max2max/internal/client/setup.go diff --git a/max2max/go.mod b/max2max/go.mod index eed2089..ecafa58 100644 --- a/max2max/go.mod +++ b/max2max/go.mod @@ -8,17 +8,34 @@ require ( ) require ( + github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/go-logr/logr v1.4.2 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/flatbuffers v23.5.26+incompatible // indirect - github.com/google/uuid v1.3.0 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect github.com/klauspost/compress v1.15.9 // indirect github.com/pierrec/lz4/v4 v4.1.18 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + go.opentelemetry.io/otel v1.30.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.30.0 // indirect + go.opentelemetry.io/otel/metric v1.30.0 // indirect + go.opentelemetry.io/otel/sdk v1.30.0 // indirect + go.opentelemetry.io/otel/sdk/metric v1.30.0 // indirect + go.opentelemetry.io/otel/trace v1.30.0 // indirect + go.opentelemetry.io/proto/otlp v1.3.1 // indirect golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 // indirect + golang.org/x/net v0.29.0 // indirect + golang.org/x/sys v0.25.0 // indirect + golang.org/x/text v0.18.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect - google.golang.org/protobuf v1.31.0 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20240924160255-9d4c2d233b61 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240924160255-9d4c2d233b61 // indirect + google.golang.org/grpc v1.67.0 // indirect + google.golang.org/protobuf v1.34.2 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/max2max/go.sum b/max2max/go.sum index fe2e860..037c7f2 100644 --- a/max2max/go.sum +++ b/max2max/go.sum @@ -14,6 +14,8 @@ github.com/aliyun/aliyun-odps-go-sdk v0.3.5/go.mod h1:o2yLh138hfeBZThn+rorDVNhoa github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/boombuler/barcode v1.0.0/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8= github.com/boombuler/barcode v1.0.1/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8= +github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= +github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= @@ -46,6 +48,11 @@ github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9 github.com/go-latex/latex v0.0.0-20210118124228-b3d85cf34e07/go.mod h1:CO1AlKB2CSIqUrmQPqA0gdRIlnLEY0gK5JGjh37zN5U= github.com/go-latex/latex v0.0.0-20210823091927-c0d11ff05a81/go.mod h1:SX0U8uGpxhq9o2S/CELCSUxEWWAuoCUcVCQWv7G2OCk= github.com/go-latex/latex v0.0.0-20230307184459-12ec69307ad9/go.mod h1:gWuR/CrFDDeVRFQwHPvsv9soJVB/iqymhuZQuJ3a9OM= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-pdf/fpdf v0.5.0/go.mod h1:HzcnA+A23uwogo0tp9yU+l3V+KXhiESpt1PMayhOh5M= github.com/go-pdf/fpdf v0.6.0/go.mod h1:HzcnA+A23uwogo0tp9yU+l3V+KXhiESpt1PMayhOh5M= github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k= @@ -78,7 +85,12 @@ github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/grpc-ecosystem/grpc-gateway v1.16.0 h1:gmcG1KaJ57LophUzW0Hy8NmPhnMZb4M0+kPpLofRdBo= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 h1:asbCHRVmodnJTuQ3qamDwqVOIjwqUPTYmYuemVOx+Ys= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0/go.mod h1:ggCgvZ2r7uOoQjOyu2Y1NhHmEPPzzuhWgcza5M1Ji1I= github.com/jung-kurt/gofpdf v1.0.0/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes= github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= @@ -111,7 +123,21 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +go.opentelemetry.io/otel v1.30.0 h1:F2t8sK4qf1fAmY9ua4ohFS/K+FUuOPemHUIXHtktrts= +go.opentelemetry.io/otel v1.30.0/go.mod h1:tFw4Br9b7fOS+uEao81PJjVMjW/5fvNCbpsDIXqP0pc= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.30.0 h1:WypxHH02KX2poqqbaadmkMYalGyy/vil4HE4PM4nRJc= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.30.0/go.mod h1:U79SV99vtvGSEBeeHnpgGJfTsnsdkWLpPN/CcHAzBSI= +go.opentelemetry.io/otel/metric v1.30.0 h1:4xNulvn9gjzo4hjg+wzIKG7iNFEaBMX00Qd4QIZs7+w= +go.opentelemetry.io/otel/metric v1.30.0/go.mod h1:aXTfST94tswhWEb+5QjlSqG+cZlmyXy/u8jFpor3WqQ= +go.opentelemetry.io/otel/sdk v1.30.0 h1:cHdik6irO49R5IysVhdn8oaiR9m8XluDaJAs4DfOrYE= +go.opentelemetry.io/otel/sdk v1.30.0/go.mod h1:p14X4Ok8S+sygzblytT1nqG98QG2KYKv++HE0LY/mhg= +go.opentelemetry.io/otel/sdk/metric v1.30.0 h1:QJLT8Pe11jyHBHfSAgYH7kEmT24eX792jZO1bo4BXkM= +go.opentelemetry.io/otel/sdk/metric v1.30.0/go.mod h1:waS6P3YqFNzeP01kuo/MBBYqaoBJl7efRQHOaydhy1Y= +go.opentelemetry.io/otel/trace v1.30.0 h1:7UBkkYzeg3C7kQX8VAidWh2biiQbtAKjyIML8dQ9wmc= +go.opentelemetry.io/otel/trace v1.30.0/go.mod h1:5EyKqTzzmyqB9bwtCCq6pDLktPK6fmGf/Dph+8VI02o= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= +go.opentelemetry.io/proto/otlp v1.3.1 h1:TrMUixzpM0yuc/znrFTP9MMRh8trP93mkCiDVeXrui0= +go.opentelemetry.io/proto/otlp v1.3.1/go.mod h1:0X1WI4de4ZsLrrJNLAQbFeLCm3T7yBkR0XqQ7niQU+8= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -166,6 +192,8 @@ golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= +golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo= +golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -190,6 +218,8 @@ golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= +golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= @@ -203,6 +233,8 @@ golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= +golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -237,7 +269,12 @@ google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7 google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 h1:+kGHl1aib/qcwaRi1CbqBZ1rk19r85MNUf8HaBghugY= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= +google.golang.org/genproto/googleapis/api v0.0.0-20240924160255-9d4c2d233b61 h1:pAjq8XSSzXoP9ya73v/w+9QEAAJNluLrpmMq5qFJQNY= +google.golang.org/genproto/googleapis/api v0.0.0-20240924160255-9d4c2d233b61/go.mod h1:O6rP0uBq4k0mdi/b4ZEMAZjkhYWhS815kCvaMha4VN8= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240924160255-9d4c2d233b61 h1:N9BgCIAUvn/M+p4NJccWPWb3BWh88+zyL0ll9HgbEeM= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240924160255-9d4c2d233b61/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= @@ -245,6 +282,8 @@ google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8 google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0= google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= google.golang.org/grpc v1.44.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU= +google.golang.org/grpc v1.67.0 h1:IdH9y6PF5MPSdAntIcpjQ+tXO41pcQsfZV2RxtQgVcw= +google.golang.org/grpc v1.67.0/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= @@ -258,6 +297,8 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0 google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= +google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA= diff --git a/max2max/internal/client/client.go b/max2max/internal/client/client.go index 06685ef..f9131b4 100644 --- a/max2max/internal/client/client.go +++ b/max2max/internal/client/client.go @@ -1,12 +1,11 @@ package client import ( + "errors" "fmt" "log/slog" "os" "strings" - - "github.com/aliyun/aliyun-odps-go-sdk/odps" ) type Loader interface { @@ -20,15 +19,30 @@ type OdpsClient interface { } type Client struct { - logger *slog.Logger OdpsClient OdpsClient + + logger *slog.Logger + shutdownFns []func() error +} + +func NewClient(setupFns ...SetupFn) (*Client, error) { + c := &Client{ + shutdownFns: make([]func() error, 0), + } + for _, setupFn := range setupFns { + if err := setupFn(c); err != nil { + return nil, err + } + } + return c, nil } -func NewClient(logger *slog.Logger, odpsClient *odps.Odps) *Client { - return &Client{ - logger: logger, - OdpsClient: NewODPSClient(odpsClient), +func (c *Client) Close() error { + var err error + for _, fn := range c.shutdownFns { + err = errors.Join(err, fn()) } + return err } func (c *Client) Execute(loader Loader, tableID, queryFilePath string) error { @@ -62,7 +76,3 @@ func (c *Client) Execute(loader Loader, tableID, queryFilePath string) error { c.logger.Info("execution done") return nil } - -func (c *Client) Close() { - // any cleanup -} diff --git a/max2max/internal/client/client_test.go b/max2max/internal/client/client_test.go index 712447c..ab0750a 100644 --- a/max2max/internal/client/client_test.go +++ b/max2max/internal/client/client_test.go @@ -2,27 +2,29 @@ package client_test import ( "fmt" - "log/slog" "os" "testing" "github.com/goto/transformers/max2max/internal/client" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestExecute(t *testing.T) { t.Run("should return error when reading query file fails", func(t *testing.T) { // arrange - client := client.NewClient(slog.Default(), nil) + client, err := client.NewClient(client.SetupLogger("error")) + require.NoError(t, err) client.OdpsClient = &mockOdpsClient{} // act - err := client.Execute(nil, "", "./nonexistentfile") + err = client.Execute(nil, "", "./nonexistentfile") // assert assert.Error(t, err) }) t.Run("should return error when getting partition name fails", func(t *testing.T) { // arrange - client := client.NewClient(slog.Default(), nil) + client, err := client.NewClient(client.SetupLogger("error")) + require.NoError(t, err) client.OdpsClient = &mockOdpsClient{ partitionResult: func() ([]string, error) { return nil, fmt.Errorf("error get partition name") @@ -30,14 +32,15 @@ func TestExecute(t *testing.T) { } assert.NoError(t, os.WriteFile("/tmp/query.sql", []byte("SELECT * FROM table;"), 0644)) // act - err := client.Execute(nil, "project_test.table_test", "/tmp/query.sql") + err = client.Execute(nil, "project_test.table_test", "/tmp/query.sql") // assert assert.Error(t, err) assert.ErrorContains(t, err, "error get partition name") }) t.Run("should return error when executing query fails", func(t *testing.T) { // arrange - client := client.NewClient(slog.Default(), nil) + client, err := client.NewClient(client.SetupLogger("error")) + require.NoError(t, err) client.OdpsClient = &mockOdpsClient{ partitionResult: func() ([]string, error) { return nil, nil @@ -51,16 +54,17 @@ func TestExecute(t *testing.T) { return "INSERT INTO table SELECT * FROM table;" }, } - assert.NoError(t, os.WriteFile("/tmp/query.sql", []byte("SELECT * FROM table;"), 0644)) + require.NoError(t, os.WriteFile("/tmp/query.sql", []byte("SELECT * FROM table;"), 0644)) // act - err := client.Execute(loader, "project_test.table_test", "/tmp/query.sql") + err = client.Execute(loader, "project_test.table_test", "/tmp/query.sql") // assert assert.Error(t, err) assert.ErrorContains(t, err, "error exec sql") }) t.Run("should return nil when everything is successful", func(t *testing.T) { // arrange - client := client.NewClient(slog.Default(), nil) + client, err := client.NewClient(client.SetupLogger("error")) + require.NoError(t, err) client.OdpsClient = &mockOdpsClient{ partitionResult: func() ([]string, error) { return []string{"event_date"}, nil @@ -77,9 +81,9 @@ func TestExecute(t *testing.T) { return "INSERT INTO table PARTITION (event_date) SELECT * FROM table;" }, } - assert.NoError(t, os.WriteFile("/tmp/query.sql", []byte("SELECT * FROM table;"), 0644)) + require.NoError(t, os.WriteFile("/tmp/query.sql", []byte("SELECT * FROM table;"), 0644)) // act - err := client.Execute(loader, "project_test.table_test", "/tmp/query.sql") + err = client.Execute(loader, "project_test.table_test", "/tmp/query.sql") // assert assert.NoError(t, err) }) diff --git a/max2max/internal/client/odps.go b/max2max/internal/client/odps.go index 0c80895..31042b7 100644 --- a/max2max/internal/client/odps.go +++ b/max2max/internal/client/odps.go @@ -19,6 +19,7 @@ func NewODPSClient(client *odps.Odps) *odpsClient { } // ExecSQL executes the given query in syncronous mode (blocking) +// TODO: change the execution mode to async and do graceful shutdown func (c *odpsClient) ExecSQL(query string) error { taskIns, err := c.client.ExecSQl(query) if err != nil { diff --git a/max2max/internal/client/opentelemetry.go b/max2max/internal/client/opentelemetry.go index 809c406..fc19e0f 100644 --- a/max2max/internal/client/opentelemetry.go +++ b/max2max/internal/client/opentelemetry.go @@ -2,54 +2,28 @@ package client import ( "context" - "time" "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" - "go.opentelemetry.io/otel/metric/global" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" + "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/resource" - "go.opentelemetry.io/otel/sdk/trace" ) -func initOpenTelemetry() (func(context.Context) error, error) { - ctx := context.Background() - - // Create OTLP trace exporter - traceExporter, err := otlptracehttp.New(ctx) +func setupOTelSDK(collectorGRPCEndpoint string) (shutdown func() error, err error) { + ctx := context.Background() // TODO: use context from main + metricExporter, err := otlpmetricgrpc.New(ctx, otlpmetricgrpc.WithEndpoint(collectorGRPCEndpoint)) if err != nil { return nil, err } - // Create trace provider - traceProvider := trace.NewTracerProvider( - trace.WithBatcher(traceExporter), - trace.WithResource(resource.NewWithAttributes( - attribute.String("service.name", "example-service"), - )), - ) - otel.SetTracerProvider(traceProvider) - - // Create metric controller - metricController := basic.New( - simple.NewWithExactDistribution(), - metric.WithResource(resource.NewWithAttributes( - attribute.String("service.name", "example-service"), - )), - metric.WithReader(metric.NewPeriodicReader( - metric.NewExportPipeline( - metric.NewSimpleSelector(), - aggregation.CumulativeTemporalitySelector(), - ), - time.Minute, - )), + // for now, we only need metric provider + meterProvider := metric.NewMeterProvider( + metric.WithResource(resource.Default()), // TODO: add resource specific to job name and plugin name + metric.WithReader(metric.NewPeriodicReader(metricExporter)), ) - global.SetMeterProvider(metricController.MeterProvider()) + otel.SetMeterProvider(meterProvider) - return func(ctx context.Context) error { - if err := traceProvider.Shutdown(ctx); err != nil { - return err - } - return metricController.Stop(ctx) + return func() error { + return meterProvider.Shutdown(ctx) }, nil } diff --git a/max2max/internal/client/setup.go b/max2max/internal/client/setup.go new file mode 100644 index 0000000..f893702 --- /dev/null +++ b/max2max/internal/client/setup.go @@ -0,0 +1,40 @@ +package client + +import ( + "github.com/aliyun/aliyun-odps-go-sdk/odps" + "github.com/goto/transformers/max2max/internal/logger" +) + +type SetupFn func(c *Client) error + +func SetupLogger(logLevel string) SetupFn { + return func(c *Client) error { + logger, err := logger.NewLogger(logLevel) + if err != nil { + return err + } + c.logger = logger + return nil + } +} + +func SetupODPSClient(odpsClient *odps.Odps) SetupFn { + return func(c *Client) error { + c.OdpsClient = NewODPSClient(odpsClient) + return nil + } +} + +func SetupOTelSDK(collectorGRPCEndpoint string) SetupFn { + return func(c *Client) error { + if collectorGRPCEndpoint == "" { + return nil + } + shutdownFn, err := setupOTelSDK(collectorGRPCEndpoint) + if err != nil { + return err + } + c.shutdownFns = append(c.shutdownFns, shutdownFn) + return nil + } +} diff --git a/max2max/internal/config/config.go b/max2max/internal/config/config.go index 36f275c..5c88708 100644 --- a/max2max/internal/config/config.go +++ b/max2max/internal/config/config.go @@ -8,10 +8,11 @@ import ( type Config struct { *odps.Config - LogLevel string - LoadMethod string - QueryFilePath string - DestinationTableID string + LogLevel string + LoadMethod string + QueryFilePath string + DestinationTableID string + OtelCollectorGRPCEndpoint string } type maxComputeCredentials struct { @@ -29,6 +30,8 @@ func NewConfig() (*Config, error) { LoadMethod: getEnv("LOAD_METHOD", "APPEND"), QueryFilePath: getEnv("QUERY_FILE_PATH", ""), DestinationTableID: getEnv("DESTINATION_TABLE_ID", ""), + // system related config + OtelCollectorGRPCEndpoint: getEnv("OTEL_COLLECTOR_GRPC_ENDPOINT", ""), } // ali-odps-go-sdk related config scvAcc := getEnv("SERVICE_ACCOUNT", "") diff --git a/max2max/main.go b/max2max/main.go index ad70666..834f2e1 100644 --- a/max2max/main.go +++ b/max2max/main.go @@ -29,7 +29,14 @@ func main() { panic(err) } // initiate client - client := client.NewClient(logger, cfg.GenOdps()) + client, err := client.NewClient( + client.SetupLogger(cfg.LogLevel), + client.SetupOTelSDK(cfg.OtelCollectorGRPCEndpoint), + client.SetupODPSClient(cfg.GenOdps()), + ) + if err != nil { + panic(err) + } defer client.Close() // execute query From 26bd68c2648f16890e3ff1c64bcc0e3e83d5f97c Mon Sep 17 00:00:00 2001 From: Dery Rahman Ahaddienata Date: Mon, 30 Sep 2024 10:41:12 +0700 Subject: [PATCH 05/11] chore: add sample metric in append method --- max2max/internal/loader/append.go | 31 ++++++++++++++++++++++++++---- max2max/internal/loader/factory.go | 2 +- max2max/main.go | 1 - 3 files changed, 28 insertions(+), 6 deletions(-) diff --git a/max2max/internal/loader/append.go b/max2max/internal/loader/append.go index 6fdeaed..2460279 100644 --- a/max2max/internal/loader/append.go +++ b/max2max/internal/loader/append.go @@ -1,25 +1,48 @@ package loader import ( + "context" "fmt" "log/slog" "strings" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" ) type appendLoader struct { - logger *slog.Logger + logger *slog.Logger + meter metric.Meter + queryCounter metric.Int64Counter } -func NewAppendLoader(logger *slog.Logger) *appendLoader { - return &appendLoader{ - logger: logger, +func NewAppendLoader(logger *slog.Logger) (*appendLoader, error) { + meter := otel.Meter("loader") + queryCounter, err := meter.Int64Counter("query.count") + if err != nil { + return nil, err } + + return &appendLoader{ + logger: logger, + meter: meter, + queryCounter: queryCounter, + }, nil } func (l *appendLoader) GetQuery(tableID, query string) string { + l.queryCounter.Add(context.Background(), 1, metric.WithAttributes( + attribute.String("method", "GetQuery"), + attribute.String("tableID", tableID), + )) return fmt.Sprintf("INSERT INTO TABLE %s %s", tableID, query) } func (l *appendLoader) GetPartitionedQuery(tableID, query string, partitionNames []string) string { + l.queryCounter.Add(context.Background(), 1, metric.WithAttributes( + attribute.String("method", "GetPartitionedQuery"), + attribute.String("tableID", tableID), + )) return fmt.Sprintf("INSERT INTO TABLE %s PARTITION (%s) %s", tableID, strings.Join(partitionNames, ", "), query) } diff --git a/max2max/internal/loader/factory.go b/max2max/internal/loader/factory.go index fb7ff9e..c222069 100644 --- a/max2max/internal/loader/factory.go +++ b/max2max/internal/loader/factory.go @@ -13,7 +13,7 @@ type Loader interface { func GetLoader(name string, logger *slog.Logger) (Loader, error) { switch name { case APPEND: - return NewAppendLoader(logger), nil + return NewAppendLoader(logger) // case REPLACE: // return NewReplaceLoader(logger), nil // case REPLACE_ALL: diff --git a/max2max/main.go b/max2max/main.go index 834f2e1..6bbdfb0 100644 --- a/max2max/main.go +++ b/max2max/main.go @@ -11,7 +11,6 @@ import ( // TODO: // - graceful shutdown // - error handling -// - instrumentation func main() { // load config cfg, err := config.NewConfig() From c3dd02d8277a9b560cd9f88273f312642635796e Mon Sep 17 00:00:00 2001 From: Dery Rahman Ahaddienata Date: Mon, 30 Sep 2024 13:14:03 +0700 Subject: [PATCH 06/11] feat: use attributes by job name and scheduled time --- max2max/internal/client/opentelemetry.go | 10 ++++++++-- max2max/internal/client/setup.go | 4 ++-- max2max/internal/config/config.go | 4 ++++ max2max/internal/config/util.go | 21 +++++++++++++++++++++ max2max/main.go | 2 +- 5 files changed, 36 insertions(+), 5 deletions(-) diff --git a/max2max/internal/client/opentelemetry.go b/max2max/internal/client/opentelemetry.go index fc19e0f..25fbd9e 100644 --- a/max2max/internal/client/opentelemetry.go +++ b/max2max/internal/client/opentelemetry.go @@ -4,12 +4,13 @@ import ( "context" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/resource" ) -func setupOTelSDK(collectorGRPCEndpoint string) (shutdown func() error, err error) { +func setupOTelSDK(collectorGRPCEndpoint string, jobName, scheduledTime string) (shutdown func() error, err error) { ctx := context.Background() // TODO: use context from main metricExporter, err := otlpmetricgrpc.New(ctx, otlpmetricgrpc.WithEndpoint(collectorGRPCEndpoint)) if err != nil { @@ -18,7 +19,12 @@ func setupOTelSDK(collectorGRPCEndpoint string) (shutdown func() error, err erro // for now, we only need metric provider meterProvider := metric.NewMeterProvider( - metric.WithResource(resource.Default()), // TODO: add resource specific to job name and plugin name + metric.WithResource(resource.NewWithAttributes( + resource.Default().SchemaURL(), + attribute.String("plugin.name", "max2max"), + attribute.String("job.name", jobName), + attribute.String("job.scheduled_time", scheduledTime), + )), metric.WithReader(metric.NewPeriodicReader(metricExporter)), ) otel.SetMeterProvider(meterProvider) diff --git a/max2max/internal/client/setup.go b/max2max/internal/client/setup.go index f893702..aab9454 100644 --- a/max2max/internal/client/setup.go +++ b/max2max/internal/client/setup.go @@ -25,12 +25,12 @@ func SetupODPSClient(odpsClient *odps.Odps) SetupFn { } } -func SetupOTelSDK(collectorGRPCEndpoint string) SetupFn { +func SetupOTelSDK(collectorGRPCEndpoint, jobName, scheduledTime string) SetupFn { return func(c *Client) error { if collectorGRPCEndpoint == "" { return nil } - shutdownFn, err := setupOTelSDK(collectorGRPCEndpoint) + shutdownFn, err := setupOTelSDK(collectorGRPCEndpoint, jobName, scheduledTime) if err != nil { return err } diff --git a/max2max/internal/config/config.go b/max2max/internal/config/config.go index 5c88708..88b24c5 100644 --- a/max2max/internal/config/config.go +++ b/max2max/internal/config/config.go @@ -13,6 +13,8 @@ type Config struct { QueryFilePath string DestinationTableID string OtelCollectorGRPCEndpoint string + JobName string + ScheduledTime string } type maxComputeCredentials struct { @@ -32,6 +34,8 @@ func NewConfig() (*Config, error) { DestinationTableID: getEnv("DESTINATION_TABLE_ID", ""), // system related config OtelCollectorGRPCEndpoint: getEnv("OTEL_COLLECTOR_GRPC_ENDPOINT", ""), + JobName: getJobName(), + ScheduledTime: getEnv("SCHEDULED_TIME", ""), } // ali-odps-go-sdk related config scvAcc := getEnv("SERVICE_ACCOUNT", "") diff --git a/max2max/internal/config/util.go b/max2max/internal/config/util.go index 7d6cd4e..c888329 100644 --- a/max2max/internal/config/util.go +++ b/max2max/internal/config/util.go @@ -2,6 +2,7 @@ package config import ( "os" + "strings" "time" ) @@ -16,3 +17,23 @@ func getEnvDuration(key, fallback string) time.Duration { result, _ := time.ParseDuration(getEnv(key, fallback)) return result } + +// specific to parse job name from JOB_LABELS environment variable +// later on this should be refactored properly +func getJobName() string { + return parseLabels("job_name") +} + +// specific parsing for JOB_LABELS environment variable +// later on this should be refactored properly +func parseLabels(key string) string { + labels := strings.Split(getEnv("JOB_LABELS", ""), ",") + // parse label from JOB_LABELS based on key + for _, label := range labels { + parsed := strings.Split(label, "=") + if len(parsed) == 2 && parsed[0] == key { + return parsed[1] + } + } + return "" +} diff --git a/max2max/main.go b/max2max/main.go index 6bbdfb0..c3354f8 100644 --- a/max2max/main.go +++ b/max2max/main.go @@ -30,7 +30,7 @@ func main() { // initiate client client, err := client.NewClient( client.SetupLogger(cfg.LogLevel), - client.SetupOTelSDK(cfg.OtelCollectorGRPCEndpoint), + client.SetupOTelSDK(cfg.OtelCollectorGRPCEndpoint, cfg.JobName, cfg.ScheduledTime), client.SetupODPSClient(cfg.GenOdps()), ) if err != nil { From fbc988d1d787d6ee9882b4d9a74591f7a48db4d1 Mon Sep 17 00:00:00 2001 From: Dery Rahman Ahaddienata Date: Mon, 30 Sep 2024 13:14:23 +0700 Subject: [PATCH 07/11] feat: use default query file path --- max2max/internal/config/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/max2max/internal/config/config.go b/max2max/internal/config/config.go index 88b24c5..f438804 100644 --- a/max2max/internal/config/config.go +++ b/max2max/internal/config/config.go @@ -30,7 +30,7 @@ func NewConfig() (*Config, error) { // max2max related config LogLevel: getEnv("LOG_LEVEL", "INFO"), LoadMethod: getEnv("LOAD_METHOD", "APPEND"), - QueryFilePath: getEnv("QUERY_FILE_PATH", ""), + QueryFilePath: getEnv("QUERY_FILE_PATH", "/data/in/query.sql"), DestinationTableID: getEnv("DESTINATION_TABLE_ID", ""), // system related config OtelCollectorGRPCEndpoint: getEnv("OTEL_COLLECTOR_GRPC_ENDPOINT", ""), From 1c60259a2c7ba82b9c7cfc924f3c331d45d35cec Mon Sep 17 00:00:00 2001 From: Dery Rahman Ahaddienata Date: Mon, 30 Sep 2024 14:51:25 +0700 Subject: [PATCH 08/11] feat: add logger --- max2max/internal/client/odps.go | 3 ++- max2max/internal/client/setup.go | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/max2max/internal/client/odps.go b/max2max/internal/client/odps.go index 31042b7..c478fad 100644 --- a/max2max/internal/client/odps.go +++ b/max2max/internal/client/odps.go @@ -12,8 +12,9 @@ type odpsClient struct { client *odps.Odps } -func NewODPSClient(client *odps.Odps) *odpsClient { +func NewODPSClient(logger *slog.Logger, client *odps.Odps) *odpsClient { return &odpsClient{ + logger: logger, client: client, } } diff --git a/max2max/internal/client/setup.go b/max2max/internal/client/setup.go index aab9454..a850048 100644 --- a/max2max/internal/client/setup.go +++ b/max2max/internal/client/setup.go @@ -20,7 +20,7 @@ func SetupLogger(logLevel string) SetupFn { func SetupODPSClient(odpsClient *odps.Odps) SetupFn { return func(c *Client) error { - c.OdpsClient = NewODPSClient(odpsClient) + c.OdpsClient = NewODPSClient(c.logger, odpsClient) return nil } } From f3221ba2dbda837175ebb9b4ae08a899ace83a84 Mon Sep 17 00:00:00 2001 From: Dery Rahman Ahaddienata Date: Mon, 30 Sep 2024 16:10:01 +0700 Subject: [PATCH 09/11] refactor: clean up unecessary metrics --- max2max/internal/loader/append.go | 27 ++------------------------- 1 file changed, 2 insertions(+), 25 deletions(-) diff --git a/max2max/internal/loader/append.go b/max2max/internal/loader/append.go index 2460279..efd8938 100644 --- a/max2max/internal/loader/append.go +++ b/max2max/internal/loader/append.go @@ -1,48 +1,25 @@ package loader import ( - "context" "fmt" "log/slog" "strings" - - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/metric" ) type appendLoader struct { - logger *slog.Logger - meter metric.Meter - queryCounter metric.Int64Counter + logger *slog.Logger } func NewAppendLoader(logger *slog.Logger) (*appendLoader, error) { - meter := otel.Meter("loader") - queryCounter, err := meter.Int64Counter("query.count") - if err != nil { - return nil, err - } - return &appendLoader{ - logger: logger, - meter: meter, - queryCounter: queryCounter, + logger: logger, }, nil } func (l *appendLoader) GetQuery(tableID, query string) string { - l.queryCounter.Add(context.Background(), 1, metric.WithAttributes( - attribute.String("method", "GetQuery"), - attribute.String("tableID", tableID), - )) return fmt.Sprintf("INSERT INTO TABLE %s %s", tableID, query) } func (l *appendLoader) GetPartitionedQuery(tableID, query string, partitionNames []string) string { - l.queryCounter.Add(context.Background(), 1, metric.WithAttributes( - attribute.String("method", "GetPartitionedQuery"), - attribute.String("tableID", tableID), - )) return fmt.Sprintf("INSERT INTO TABLE %s PARTITION (%s) %s", tableID, strings.Join(partitionNames, ", "), query) } From bb36d49ef06d5625083ae8202837473fac524f72 Mon Sep 17 00:00:00 2001 From: Dery Rahman Ahaddienata Date: Mon, 30 Sep 2024 16:43:20 +0700 Subject: [PATCH 10/11] fix: allow insecure connection opentelemetry --- max2max/internal/client/opentelemetry.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/max2max/internal/client/opentelemetry.go b/max2max/internal/client/opentelemetry.go index 25fbd9e..c7e53f1 100644 --- a/max2max/internal/client/opentelemetry.go +++ b/max2max/internal/client/opentelemetry.go @@ -12,7 +12,10 @@ import ( func setupOTelSDK(collectorGRPCEndpoint string, jobName, scheduledTime string) (shutdown func() error, err error) { ctx := context.Background() // TODO: use context from main - metricExporter, err := otlpmetricgrpc.New(ctx, otlpmetricgrpc.WithEndpoint(collectorGRPCEndpoint)) + metricExporter, err := otlpmetricgrpc.New(ctx, + otlpmetricgrpc.WithEndpoint(collectorGRPCEndpoint), + otlpmetricgrpc.WithInsecure(), + ) if err != nil { return nil, err } From 37cc6df3b01dc8aab2c05e1d5b28be930c965a0e Mon Sep 17 00:00:00 2001 From: Dery Rahman Ahaddienata Date: Mon, 30 Sep 2024 16:47:37 +0700 Subject: [PATCH 11/11] fix: releasing version and releasing arc --- .github/workflows/release.yml | 6 ++++-- max2max/Dockerfile | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index bf3a5d3..4a49e5e 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -25,8 +25,9 @@ jobs: - name: Extract tag version id: vars run: | - # Extract the tag name from GITHUB_REF, remove 'refs/tags/bq2bq/' prefix + # Extract the tag name from GITHUB_REF, remove 'refs/tags/bq2bq/v' prefix TAG="${GITHUB_REF#refs/tags/bq2bq/}" + TAG="${TAG#v}" echo "Tag name: $TAG" echo "::set-output name=tag::$TAG" # Build and push the Docker image to Docker Hub @@ -70,8 +71,9 @@ jobs: - name: Extract tag version id: vars run: | - # Extract the tag name from GITHUB_REF, remove 'refs/tags/max2max/' prefix + # Extract the tag name from GITHUB_REF, remove 'refs/tags/max2max/v' prefix TAG="${GITHUB_REF#refs/tags/max2max/}" + TAG="${TAG#v}" echo "Tag name: $TAG" echo "::set-output name=tag::$TAG" # Build and push the Docker image to Docker Hub diff --git a/max2max/Dockerfile b/max2max/Dockerfile index a467178..c54b4c9 100644 --- a/max2max/Dockerfile +++ b/max2max/Dockerfile @@ -1,4 +1,4 @@ -FROM amd64/alpine:3 +FROM alpine:3 RUN apk --no-cache add tzdata COPY ./build/max2max /usr/local/bin/max2max