diff --git a/ext/bucket/oss/oss.go b/ext/bucket/oss/oss.go new file mode 100644 index 0000000000..7cf186c44a --- /dev/null +++ b/ext/bucket/oss/oss.go @@ -0,0 +1,46 @@ +package bucket + +import ( + "encoding/json" + "errors" + + "github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss" + "github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/credentials" +) + +type OSSCredentials struct { + AccessID string `json:"access_key_id"` + AccessKey string `json:"access_key_secret"` + Endpoint string `json:"endpoint"` + ProjectName string `json:"project_name"` + Region string `json:"region"` + SecurityToken string `json:"security_token"` +} + +func NewOssClient(creds string) (*oss.Client, error) { + cred, err := toOSSCredentials(creds) + if err != nil { + return nil, err + } + + credProvider := credentials.NewStaticCredentialsProvider(cred.AccessID, cred.AccessKey, cred.SecurityToken) + cfg := oss.LoadDefaultConfig(). + WithCredentialsProvider(credProvider). + WithEndpoint(cred.Endpoint). + WithRegion(cred.Region) + + if cfg.CredentialsProvider == nil { + return nil, errors.New("OSS: credentials provider is required") + } + + return oss.NewClient(cfg), nil +} + +func toOSSCredentials(creds string) (OSSCredentials, error) { + var cred OSSCredentials + if err := json.Unmarshal([]byte(creds), &cred); err != nil { + return OSSCredentials{}, err + } + + return cred, nil +} diff --git a/ext/scheduler/airflow/bucket/oss.go b/ext/scheduler/airflow/bucket/oss.go index a9595e7f2e..84a25483a2 100644 --- a/ext/scheduler/airflow/bucket/oss.go +++ b/ext/scheduler/airflow/bucket/oss.go @@ -2,62 +2,32 @@ package bucket import ( "context" - "encoding/json" "net/url" - "github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss" - "github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/credentials" "go.opentelemetry.io/otel" + "gocloud.dev/blob" "github.com/goto/optimus/core/tenant" + oss "github.com/goto/optimus/ext/bucket/oss" "github.com/goto/optimus/ext/scheduler/airflow" "github.com/goto/optimus/ext/scheduler/airflow/bucket/ossblob" ) -type ossCredentials struct { - AccessID string `json:"access_key_id"` - AccessKey string `json:"access_key_secret"` - Endpoint string `json:"endpoint"` - ProjectName string `json:"project_name"` - Region string `json:"region"` - SecurityToken string `json:"security_token"` -} - func (f *Factory) GetOSSBucket(ctx context.Context, tnnt tenant.Tenant, parsedURL *url.URL) (airflow.Bucket, error) { spanCtx, span := otel.Tracer("airflow/bucketFactory").Start(ctx, "GetOSSBucket") defer span.End() - cred, err := f.getOSSCredentials(spanCtx, tnnt) + cred, err := f.secretsGetter.Get(spanCtx, tnnt.ProjectName(), tnnt.NamespaceName().String(), tenant.SecretStorageKey) if err != nil { return nil, err } - credProvider := credentials.NewStaticCredentialsProvider(cred.AccessID, cred.AccessKey, cred.SecurityToken) - - cfg := oss.LoadDefaultConfig(). - WithCredentialsProvider(credProvider). - WithEndpoint(cred.Endpoint). - WithRegion(cred.Region) - - client, err := ossblob.OpenBucket(ctx, cfg, parsedURL.Host) + ossClient, err := oss.NewOssClient(cred.Value()) if err != nil { return nil, err } - return client, nil -} - -func (f *Factory) getOSSCredentials(ctx context.Context, tnnt tenant.Tenant) (ossCredentials, error) { - // Get credentials from secret manager - secret, err := f.secretsGetter.Get(ctx, tnnt.ProjectName(), tnnt.NamespaceName().String(), tenant.SecretStorageKey) - if err != nil { - return ossCredentials{}, err - } - - var cred ossCredentials - if err := json.Unmarshal([]byte(secret.Value()), &cred); err != nil { - return ossCredentials{}, err - } + driver := ossblob.NewOSSBucket(ossClient, parsedURL.Host) - return cred, nil + return blob.NewBucket(driver), nil } diff --git a/ext/scheduler/airflow/bucket/ossblob/ossblob.go b/ext/scheduler/airflow/bucket/ossblob/ossblob.go index 15d3d62ad7..5801201f0f 100644 --- a/ext/scheduler/airflow/bucket/ossblob/ossblob.go +++ b/ext/scheduler/airflow/bucket/ossblob/ossblob.go @@ -8,7 +8,6 @@ import ( "strings" "github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss" - "gocloud.dev/blob" "gocloud.dev/blob/driver" "gocloud.dev/gcerrors" ) @@ -371,31 +370,6 @@ func (b *ossBucket) Copy(ctx context.Context, dstKey, srcKey string, _ *driver.C return err } -func openBucket(_ context.Context, cfg *oss.Config, bucketName string) (*ossBucket, error) { - if cfg == nil { - return nil, errors.New("ossblob.openBucket: oss config are required") - } - if cfg.CredentialsProvider == nil { - return nil, errors.New("ossblob.openBucket: credentials provider is required") - } - if bucketName == "" { - return nil, errors.New("ossblob.openBucket: bucketName is required") - } - - client := oss.NewClient(cfg) - - return NewOSSBucket(client, bucketName), nil -} - -func OpenBucket(ctx context.Context, cfg *oss.Config, bucketName string) (*blob.Bucket, error) { - drv, err := openBucket(ctx, cfg, bucketName) - if err != nil { - return nil, err - } - - return blob.NewBucket(drv), nil -} - func safeGet[T any](obj *T) T { var zero T if obj == nil { diff --git a/ext/scheduler/airflow/bucket/ossblob/ossblob_test.go b/ext/scheduler/airflow/bucket/ossblob/ossblob_test.go index ecdccb2328..42fc8d31d6 100644 --- a/ext/scheduler/airflow/bucket/ossblob/ossblob_test.go +++ b/ext/scheduler/airflow/bucket/ossblob/ossblob_test.go @@ -1,7 +1,6 @@ package ossblob_test import ( - "context" "errors" "io" "net/http" @@ -9,7 +8,6 @@ import ( "testing" "github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss" - "github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/credentials" "github.com/stretchr/testify/assert" "gocloud.dev/gcerrors" @@ -98,32 +96,3 @@ func TestOSSBucket(t *testing.T) { assert.NoError(t, b.Close()) }) } - -func TestOpenBucket(t *testing.T) { - ctx := context.Background() - cfg := &oss.Config{} - bucketName := "test-bucket" - - t.Run("openBucket with nil config", func(t *testing.T) { - _, err := ossblob.OpenBucket(ctx, nil, bucketName) - assert.Error(t, err) - }) - - t.Run("openBucket with nil credentials provider", func(t *testing.T) { - cfg.CredentialsProvider = nil - _, err := ossblob.OpenBucket(ctx, cfg, bucketName) - assert.Error(t, err) - }) - - t.Run("openBucket with empty bucket name", func(t *testing.T) { - _, err := ossblob.OpenBucket(ctx, cfg, "") - assert.Error(t, err) - }) - - t.Run("openBucket with valid config", func(t *testing.T) { - cfg.CredentialsProvider = &credentials.StaticCredentialsProvider{} - b, err := ossblob.OpenBucket(ctx, cfg, bucketName) - assert.NoError(t, err) - assert.NotNil(t, b) - }) -} diff --git a/ext/sheets/csv/csv.go b/ext/sheets/csv/csv.go new file mode 100644 index 0000000000..79facb68de --- /dev/null +++ b/ext/sheets/csv/csv.go @@ -0,0 +1,46 @@ +package csv + +import ( + "encoding/csv" + "strings" +) + +func FromRecords(data [][]interface{}) (string, error) { + if len(data) == 0 { + return "", nil + } + + lenRecords := len(data[0]) + var allRecords [][]string + for _, row := range data { + var currRow []string + i := 0 + for _, r1 := range row { + i++ + s, ok := r1.(string) + if !ok { + s = "" + } + currRow = append(currRow, s) + } + for i < lenRecords { + currRow = append(currRow, "") + i++ + } + allRecords = append(allRecords, currRow) + } + + return FromData(allRecords) +} + +func FromData(records [][]string) (string, error) { + out := new(strings.Builder) + w := csv.NewWriter(out) + + err := w.WriteAll(records) + if err != nil { + return "", err + } + + return out.String(), nil +} diff --git a/ext/sheets/gsheet/gsheet.go b/ext/sheets/gsheet/gsheet.go new file mode 100644 index 0000000000..8b272a5385 --- /dev/null +++ b/ext/sheets/gsheet/gsheet.go @@ -0,0 +1,59 @@ +package gsheet + +import ( + "context" + "errors" + "fmt" + + "google.golang.org/api/option" + "google.golang.org/api/sheets/v4" + + "github.com/goto/optimus/ext/sheets/csv" +) + +const ( + readRange = "Sheet1" +) + +type GSheets struct { + srv *sheets.Service +} + +func NewGSheets(ctx context.Context, creds string) (*GSheets, error) { + srv, err := sheets.NewService(ctx, option.WithCredentialsJSON([]byte(creds))) + if err != nil { + return nil, fmt.Errorf("not able to create sheets service err: %w", err) + } + + return &GSheets{srv: srv}, nil +} + +func (gs *GSheets) GetAsCSV(url, sheetRange string) (string, error) { + info, err := FromURL(url) + if err != nil { + return "", err + } + + if sheetRange == "" { + sheetRange = readRange + } + content, err := gs.getSheetContent(info.SheetID, sheetRange) + if err != nil { + return "", err + } + + return csv.FromRecords(content) +} + +func (gs *GSheets) getSheetContent(sheetID, sheetRange string) ([][]interface{}, error) { + resp, err := gs.srv.Spreadsheets.Values.Get(sheetID, sheetRange).Do() + if err != nil { + return nil, err + } + + if len(resp.Values) == 0 { + return nil, errors.New("no data found in the sheet") + } + + return resp.Values, nil +} diff --git a/ext/sheets/gsheet/sheet_info.go b/ext/sheets/gsheet/sheet_info.go new file mode 100644 index 0000000000..80363da503 --- /dev/null +++ b/ext/sheets/gsheet/sheet_info.go @@ -0,0 +1,34 @@ +package gsheet + +import ( + "errors" + "regexp" +) + +var ( + sheetIDRegex = regexp.MustCompile(`spreadsheets/d/([^/]*)`) + gidRegex = regexp.MustCompile(`gid=([0-9]*)`) +) + +type SheetsInfo struct { + SheetID string + GID string +} + +func FromURL(u1 string) (*SheetsInfo, error) { + res := sheetIDRegex.FindStringSubmatch(u1) + if len(res) < 2 || res[1] == "" { + return nil, errors.New("not able to get spreadsheetID") + } + + gid := "" + res2 := gidRegex.FindStringSubmatch(u1) + if len(res2) > 1 && res2[1] != "" { + gid = res2[1] + } + + return &SheetsInfo{ + SheetID: res[1], + GID: gid, + }, nil +} diff --git a/ext/sheets/gsheet/sheet_info_test.go b/ext/sheets/gsheet/sheet_info_test.go new file mode 100644 index 0000000000..ac99c39c1e --- /dev/null +++ b/ext/sheets/gsheet/sheet_info_test.go @@ -0,0 +1,35 @@ +package gsheet_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/goto/optimus/ext/sheets/gsheet" +) + +func TestSheetInfo(t *testing.T) { + t.Run("return error when id missing", func(t *testing.T) { + u1 := "https://docs.google.com/spreadsheets/d" + + _, err := gsheet.FromURL(u1) + assert.Error(t, err) + assert.ErrorContains(t, err, "not able to get spreadsheetID") + }) + t.Run("return sheet info with id", func(t *testing.T) { + u1 := "https://docs.google.com/spreadsheets/d/abcedefgh/edit?usp=sharing" + + info, err := gsheet.FromURL(u1) + assert.Nil(t, err) + assert.Equal(t, info.SheetID, "abcedefgh") + assert.Equal(t, info.GID, "") + }) + t.Run("return sheet info with sid and gid", func(t *testing.T) { + u1 := "https://docs.google.com/spreadsheets/d/abcdeghi/edit#gid=3726" + + info, err := gsheet.FromURL(u1) + assert.Nil(t, err) + assert.Equal(t, info.SheetID, "abcdeghi") + assert.Equal(t, info.GID, "3726") + }) +} diff --git a/ext/store/maxcompute/client.go b/ext/store/maxcompute/client.go index 7bc4b0c92e..024a3efcf0 100644 --- a/ext/store/maxcompute/client.go +++ b/ext/store/maxcompute/client.go @@ -52,6 +52,14 @@ func (c *MaxComputeClient) TableHandleFrom(projectSchema ProjectSchema) TableRes return NewTableHandle(c, s, t) } +func (c *MaxComputeClient) ExternalTableHandleFrom(projectSchema ProjectSchema) TableResourceHandle { + c.SetDefaultProjectName(projectSchema.Project) + c.SetCurrentSchemaName(projectSchema.Schema) + s := c.Schemas() + t := c.Tables() + return NewExternalTableHandle(c, s, t) +} + func (c *MaxComputeClient) ViewHandleFrom(projectSchema ProjectSchema) TableResourceHandle { c.SetDefaultProjectName(projectSchema.Project) c.SetCurrentSchemaName(projectSchema.Schema) diff --git a/ext/store/maxcompute/external_table.go b/ext/store/maxcompute/external_table.go new file mode 100644 index 0000000000..b36b2e5a40 --- /dev/null +++ b/ext/store/maxcompute/external_table.go @@ -0,0 +1,93 @@ +package maxcompute + +import ( + "strings" + + "github.com/aliyun/aliyun-odps-go-sdk/odps" + "github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema" + + "github.com/goto/optimus/core/resource" + "github.com/goto/optimus/internal/errors" +) + +type McExternalTable interface { + CreateExternal( + schema tableschema.TableSchema, + createIfNotExists bool, + serdeProperties map[string]string, + jars []string, + hints, alias map[string]string, + ) error + BatchLoadTables(tableNames []string) ([]*odps.Table, error) +} + +type ExternalTableHandle struct { + mcSQLExecutor McSQLExecutor + mcSchema McSchema + mcExternalTable McExternalTable +} + +func (e ExternalTableHandle) Create(res *resource.Resource) error { + table, err := ConvertSpecTo[ExternalTable](res) + if err != nil { + return err + } + + _, table.Name, err = getCompleteComponentName(res) + if err != nil { + return err + } + + tableSchema, err := buildExternalTableSchema(table) + if err != nil { + return errors.AddErrContext(err, EntityExternalTable, "failed to build table schema to create for "+res.FullName()) + } + + err = e.mcExternalTable.CreateExternal(tableSchema, false, table.Source.SerdeProperties, table.Source.Jars, table.Hints, nil) + if err != nil { + if strings.Contains(err.Error(), "Table or view already exists") { + return errors.AlreadyExists(EntityExternalTable, "external table already exists on maxcompute: "+res.FullName()) + } + return errors.InternalError(EntityExternalTable, "error while creating table on maxcompute", err) + } + return nil +} + +func (ExternalTableHandle) Update(_ *resource.Resource) error { + // TODO implement me + panic("implement me") +} + +func (e ExternalTableHandle) Exists(tableName string) bool { + _, err := e.mcExternalTable.BatchLoadTables([]string{tableName}) + return err == nil +} + +func NewExternalTableHandle(mcSQLExecutor McSQLExecutor, mcSchema McSchema, mcExternalTable McExternalTable) *ExternalTableHandle { + return &ExternalTableHandle{mcSQLExecutor: mcSQLExecutor, mcSchema: mcSchema, mcExternalTable: mcExternalTable} +} + +func buildExternalTableSchema(t *ExternalTable) (tableschema.TableSchema, error) { + handler := handlerForFormat(t.Source.SourceType) + + builder := tableschema.NewSchemaBuilder() + builder. + Name(t.Name.String()). + Comment(t.Description). + StorageHandler(handler). + Location(t.Source.Location). + TblProperties(t.Source.TableProperties) + + err := externalTableColumns(t, builder) + if err != nil { + return tableschema.TableSchema{}, err + } + + return builder.Build(), nil +} + +func externalTableColumns(t *ExternalTable, schemaBuilder *tableschema.SchemaBuilder) error { + partitionColNames := map[string]struct{}{} + + return t.Schema.ToMaxComputeColumns(partitionColNames, nil, schemaBuilder) +} diff --git a/ext/store/maxcompute/external_table_options.go b/ext/store/maxcompute/external_table_options.go new file mode 100644 index 0000000000..cc86d32369 --- /dev/null +++ b/ext/store/maxcompute/external_table_options.go @@ -0,0 +1,51 @@ +package maxcompute + +import "strings" + +const ( + CSV string = "CSV" + TSV string = "TSV" + JSON string = "JSON" + TxtFile string = "TEXTFILE" + RcFile string = "RCFILE" + ORC string = "ORC" + OrcFile string = "ORCFILE" + SeqFile string = "SEQUENCEFILE" + Parquet string = "PARQUET" + Avro string = "AVRO" + GoogleSheet string = "GOOGLE_SHEETS" + LarkSheets string = "LARK_SHEETS" +) + +func handlerForFormat(format string) string { + switch strings.ToUpper(format) { + // the built-in text extractor for CSV and TSV + case GoogleSheet: + return "com.aliyun.odps.CsvStorageHandler" + case CSV: + return "com.aliyun.odps.CsvStorageHandler" + case TSV: + return "com.aliyun.odps.TsvStorageHandler" + + // Extractors for inbuilt Open Source Data Formats + case JSON: + return "org.apache.hive.hcatalog.data.JsonSerDe" + // case "CUSTOM_CSV": + // return "org.apache.hadoop.hive.serde2.OpenCSVSerde" + case SeqFile: + return "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe" + case TxtFile: + return "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe" + case RcFile: + return "org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe" + case ORC: + return "org.apache.hadoop.hive.ql.io.orc.OrcSerde" + case OrcFile: + return "org.apache.hadoop.hive.ql.io.orc.OrcSerde" + case Parquet: + return "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe" + case Avro: + return "org.apache.hadoop.hive.serde2.avro.AvroSerDe" + } + return "" +} diff --git a/ext/store/maxcompute/external_table_spec.go b/ext/store/maxcompute/external_table_spec.go new file mode 100644 index 0000000000..594991e416 --- /dev/null +++ b/ext/store/maxcompute/external_table_spec.go @@ -0,0 +1,72 @@ +package maxcompute + +import ( + "github.com/goto/optimus/core/resource" + "github.com/goto/optimus/internal/errors" +) + +const ( + EntityExternalTable = "resource_external_table" +) + +type ExternalTable struct { + Name resource.Name + + Description string `mapstructure:"description,omitempty"` + Schema Schema `mapstructure:"schema,omitempty"` + Source *ExternalSource `mapstructure:"source,omitempty"` + + Hints map[string]string `mapstructure:"hints,omitempty"` +} + +func (e *ExternalTable) FullName() string { + return e.Name.String() +} + +func (e *ExternalTable) Validate() error { + if len(e.Schema) > 0 { + err := e.Schema.Validate() + if err != nil { + return errors.AddErrContext(err, EntityExternalTable, "error in schema for "+e.FullName()) + } + } + + if e.Source == nil { + return errors.InvalidArgument(EntityExternalTable, "empty external table source for "+e.FullName()) + } + if err := e.Source.Validate(); err != nil { + return errors.AddErrContext(err, EntityExternalTable, "error in source for "+e.FullName()) + } + return nil +} + +type ExternalSource struct { + SourceType string `mapstructure:"type,omitempty"` + SourceURIs []string `mapstructure:"uris,omitempty"` + + // Additional configs for CSV, GoogleSheets, LarkSheets formats. + SerdeProperties map[string]string `mapstructure:"serde_properties"` + TableProperties map[string]string `mapstructure:"table_properties"` + + SyncInterval int64 `mapstructure:"sync_interval_in_hrs,omitempty"` + Jars []string `mapstructure:"jars,omitempty"` + Location string `mapstructure:"location,omitempty"` + Range string `mapstructure:"range,omitempty"` +} + +func (e ExternalSource) Validate() error { + if e.SourceType == "" { + return errors.InvalidArgument(EntityExternalTable, "source type is empty") + } + // TODO: Enable sourceURI validation with sheets + // if len(e.SourceURIs) == 0 { + // return errors.InvalidArgument(EntityExternalTable, "source uri list is empty") + //} + // for _, uri := range e.SourceURIs { + // if uri == "" { + // return errors.InvalidArgument(EntityExternalTable, "uri is empty") + // } + //} + + return nil +} diff --git a/ext/store/maxcompute/external_table_spec_test.go b/ext/store/maxcompute/external_table_spec_test.go new file mode 100644 index 0000000000..1ee153627e --- /dev/null +++ b/ext/store/maxcompute/external_table_spec_test.go @@ -0,0 +1,114 @@ +package maxcompute_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/goto/optimus/ext/store/maxcompute" +) + +func TestRelationalExternalTable(t *testing.T) { + t.Run("when invalid", func(t *testing.T) { + t.Run("fails validation when schema is invalid", func(t *testing.T) { + et := maxcompute.ExternalTable{ + Name: "t-optimus.playground.test-sheet", + Description: "", + Schema: maxcompute.Schema{{ + Name: "", Type: "table", + }}, + } + err := et.Validate() + assert.NotNil(t, err) + assert.ErrorContains(t, err, "error in schema for t-optimus.playground.test-sheet") + }) + t.Run("fails validation when source is invalid", func(t *testing.T) { + et := maxcompute.ExternalTable{ + Name: "t-optimus.playground.test-sheet", + Description: "", + Schema: maxcompute.Schema{ + {Name: "id", Type: "string"}, + }, + Source: &maxcompute.ExternalSource{SourceType: ""}, + } + err := et.Validate() + assert.NotNil(t, err) + assert.ErrorContains(t, err, "error in source for t-optimus.playground.test-sheet") + }) + }) + t.Run("passes validations for with empty schema", func(t *testing.T) { + et := maxcompute.ExternalTable{ + Name: "t-optimus.playground.test-sheet", + Description: "", + Source: &maxcompute.ExternalSource{ + SourceType: "GOOGLE_SHEETS", + SourceURIs: []string{"https://google.com/sheets"}, + }, + } + err := et.Validate() + assert.Nil(t, err) + + assert.Equal(t, "t-optimus.playground.test-sheet", et.FullName()) + }) + t.Run("passes validations for valid configuration", func(t *testing.T) { + et := maxcompute.ExternalTable{ + Name: "t-optimus.playground.test-sheet", + Description: "", + Schema: maxcompute.Schema{ + {Name: "id", Type: "string"}, + }, + Source: &maxcompute.ExternalSource{ + SourceType: "GOOGLE_SHEETS", + SourceURIs: []string{"https://google.com/sheets"}, + }, + } + err := et.Validate() + assert.Nil(t, err) + + assert.Equal(t, "t-optimus.playground.test-sheet", et.FullName()) + }) +} + +func TestExternalSourceValidate(t *testing.T) { + t.Run("when valid", func(t *testing.T) { + t.Run("returns error on source type", func(t *testing.T) { + es := maxcompute.ExternalSource{ + SourceType: "", + SourceURIs: []string{}, + } + + err := es.Validate() + assert.NotNil(t, err) + assert.ErrorContains(t, err, "source type is empty") + }) + //t.Run("returns error when uri list is empty", func(t *testing.T) { + // es := maxcompute.ExternalSource{ + // SourceType: "GOOGLE_SHEETS", + // SourceURIs: []string{}, + // } + // + // err := es.Validate() + // assert.NotNil(t, err) + // assert.ErrorContains(t, err, "source uri list is empty") + //}) + //t.Run("returns error when uri is invalid", func(t *testing.T) { + // es := maxcompute.ExternalSource{ + // SourceType: "GOOGLE_SHEETS", + // SourceURIs: []string{""}, + // } + // + // err := es.Validate() + // assert.NotNil(t, err) + // assert.ErrorContains(t, err, "uri is empty") + //}) + }) + t.Run("returns no error when valid", func(t *testing.T) { + es := maxcompute.ExternalSource{ + SourceType: "CSV", + SourceURIs: []string{"https://google.com/sheets"}, + } + + err := es.Validate() + assert.Nil(t, err) + }) +} diff --git a/ext/store/maxcompute/external_table_test.go b/ext/store/maxcompute/external_table_test.go new file mode 100644 index 0000000000..b845dd4780 --- /dev/null +++ b/ext/store/maxcompute/external_table_test.go @@ -0,0 +1,183 @@ +package maxcompute_test + +import ( + "errors" + "testing" + + "github.com/aliyun/aliyun-odps-go-sdk/odps" + "github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + + "github.com/goto/optimus/core/resource" + "github.com/goto/optimus/core/tenant" + "github.com/goto/optimus/ext/store/maxcompute" +) + +func TestExternalTableHandle(t *testing.T) { + projectName, schemaName, tableName := "proj", "schema", "test_table" + fullName := projectName + "." + schemaName + "." + tableName + mcStore := resource.MaxCompute + tnnt, _ := tenant.NewTenant(projectName, "ns") + metadata := resource.Metadata{ + Version: 1, + Description: "resource description", + Labels: map[string]string{"owner": "optimus"}, + } + spec := map[string]any{ + "description": "test create", + "schema": []map[string]any{ + { + "name": "customer_id", + "type": "STRING", + }, + }, + "source": map[string]any{ + "type": "csv", + "location": "oss://my_bucket", + }, + } + var emptyJars []string + + t.Run("Create", func(t *testing.T) { + t.Run("returns error when cannot convert spec", func(t *testing.T) { + table := new(mockExternalTable) + schema := new(mockMaxComputeSchema) + odpsIns := new(mockOdpsIns) + tableHandle := maxcompute.NewExternalTableHandle(odpsIns, schema, table) + + spec := map[string]any{ + "description": []string{"test create"}, + "schema": []map[string]any{ + { + "name": "customer_id", + "type": "STRING", + }, + }, + } + res, err := resource.NewResource(fullName, maxcompute.KindExternalTable, mcStore, tnnt, &metadata, spec) + assert.Nil(t, err) + + err = tableHandle.Create(res) + assert.NotNil(t, err) + assert.ErrorContains(t, err, "not able to decode spec for "+fullName) + }) + t.Run("returns error when table name is empty", func(t *testing.T) { + table := new(mockExternalTable) + schema := new(mockMaxComputeSchema) + odpsIns := new(mockOdpsIns) + tableHandle := maxcompute.NewExternalTableHandle(odpsIns, schema, table) + + res, err := resource.NewResource(projectName+"."+schemaName, maxcompute.KindExternalTable, mcStore, tnnt, &metadata, spec) + assert.Nil(t, err) + + err = tableHandle.Create(res) + assert.NotNil(t, err) + assert.ErrorContains(t, err, "invalid resource name: "+projectName+"."+schemaName) + }) + + t.Run("returns error when table already present on maxcompute", func(t *testing.T) { + existTableErr := errors.New("Table or view already exists - table or view proj.test_table is already defined") + table := new(mockExternalTable) + table.On("CreateExternal", mock.Anything, false, emptyStringMap, emptyJars, emptyStringMap, emptyStringMap).Return(existTableErr) + defer table.AssertExpectations(t) + schema := new(mockMaxComputeSchema) + + defer schema.AssertExpectations(t) + odpsIns := new(mockOdpsIns) + + defer odpsIns.AssertExpectations(t) + tableHandle := maxcompute.NewExternalTableHandle(odpsIns, schema, table) + + res, err := resource.NewResource(fullName, maxcompute.KindExternalTable, mcStore, tnnt, &metadata, spec) + assert.Nil(t, err) + + err = tableHandle.Create(res) + assert.NotNil(t, err) + assert.ErrorContains(t, err, "table already exists on maxcompute: "+fullName) + }) + t.Run("returns error when table creation returns error", func(t *testing.T) { + table := new(mockExternalTable) + table.On("CreateExternal", mock.Anything, false, emptyStringMap, emptyJars, emptyStringMap, emptyStringMap).Return(errors.New("some error")) + defer table.AssertExpectations(t) + schema := new(mockMaxComputeSchema) + + defer schema.AssertExpectations(t) + odpsIns := new(mockOdpsIns) + + defer odpsIns.AssertExpectations(t) + tableHandle := maxcompute.NewExternalTableHandle(odpsIns, schema, table) + + res, err := resource.NewResource(fullName, maxcompute.KindExternalTable, mcStore, tnnt, &metadata, spec) + assert.Nil(t, err) + + err = tableHandle.Create(res) + assert.NotNil(t, err) + assert.ErrorContains(t, err, "error while creating table on maxcompute") + }) + t.Run("return success when create the external table", func(t *testing.T) { + table := new(mockExternalTable) + table.On("CreateExternal", mock.Anything, false, emptyStringMap, emptyJars, emptyStringMap, emptyStringMap).Return(nil) + defer table.AssertExpectations(t) + schema := new(mockMaxComputeSchema) + + defer schema.AssertExpectations(t) + odpsIns := new(mockOdpsIns) + + defer odpsIns.AssertExpectations(t) + tableHandle := maxcompute.NewExternalTableHandle(odpsIns, schema, table) + + res, err := resource.NewResource(fullName, maxcompute.KindExternalTable, mcStore, tnnt, &metadata, spec) + assert.Nil(t, err) + + err = tableHandle.Create(res) + assert.Nil(t, err) + }) + }) + + t.Run("Exists", func(t *testing.T) { + t.Run("returns false when error in checking existing tables", func(t *testing.T) { + table := new(mockExternalTable) + table.On("BatchLoadTables", mock.Anything).Return(nil, errors.New("error in get")) + defer table.AssertExpectations(t) + + schema := new(mockMaxComputeSchema) + odpsIns := new(mockOdpsIns) + tableHandle := maxcompute.NewExternalTableHandle(odpsIns, schema, table) + + exists := tableHandle.Exists(tableName) + assert.False(t, exists) + }) + t.Run("returns true when checking existing tables", func(t *testing.T) { + odpsIns := new(mockOdpsIns) + schema := new(mockMaxComputeSchema) + + table := new(mockExternalTable) + extTab := odps.NewTable(nil, projectName, schemaName, tableName) + table.On("BatchLoadTables", mock.Anything).Return([]*odps.Table{extTab}, nil) + defer table.AssertExpectations(t) + + tableHandle := maxcompute.NewExternalTableHandle(odpsIns, schema, table) + + exists := tableHandle.Exists(tableName) + assert.True(t, exists) + }) + }) +} + +type mockExternalTable struct { + mock.Mock +} + +func (m *mockExternalTable) CreateExternal(schema tableschema.TableSchema, createIfNotExists bool, serdeProperties map[string]string, jars []string, hints, alias map[string]string) error { + args := m.Called(schema, createIfNotExists, serdeProperties, jars, hints, alias) + return args.Error(0) +} + +func (m *mockExternalTable) BatchLoadTables(tableNames []string) ([]*odps.Table, error) { + args := m.Called(tableNames) + if args.Get(0) == nil { + return nil, args.Error(1) + } + return args.Get(0).([]*odps.Table), args.Error(1) +} diff --git a/ext/store/maxcompute/maxcompute.go b/ext/store/maxcompute/maxcompute.go index 9787ba0059..0f036dd133 100644 --- a/ext/store/maxcompute/maxcompute.go +++ b/ext/store/maxcompute/maxcompute.go @@ -32,6 +32,7 @@ type TableResourceHandle interface { type Client interface { TableHandleFrom(projectSchema ProjectSchema) TableResourceHandle ViewHandleFrom(projectSchema ProjectSchema) TableResourceHandle + ExternalTableHandleFrom(schema ProjectSchema) TableResourceHandle } type ClientProvider interface { @@ -42,9 +43,14 @@ type SecretProvider interface { GetSecret(ctx context.Context, tnnt tenant.Tenant, key string) (*tenant.PlainTextSecret, error) } +type TenantDetailsGetter interface { + GetDetails(ctx context.Context, tnnt tenant.Tenant) (*tenant.WithDetails, error) +} + type MaxCompute struct { secretProvider SecretProvider clientProvider ClientProvider + tenantGetter TenantDetailsGetter } func (m MaxCompute) Create(ctx context.Context, res *resource.Resource) error { @@ -75,6 +81,16 @@ func (m MaxCompute) Create(ctx context.Context, res *resource.Resource) error { handle := odpsClient.ViewHandleFrom(projectSchema) return handle.Create(res) + case KindExternalTable: + syncer := NewSyncer(m.secretProvider, m.tenantGetter) + err = syncer.Sync(ctx, res) + if err != nil { + return err + } + + handle := odpsClient.ExternalTableHandleFrom(projectSchema) + return handle.Create(res) + default: return errors.InvalidArgument(store, "invalid kind for maxcompute resource "+res.Kind()) } @@ -108,6 +124,10 @@ func (m MaxCompute) Update(ctx context.Context, res *resource.Resource) error { handle := odpsClient.ViewHandleFrom(projectSchema) return handle.Update(res) + case KindExternalTable: + handle := odpsClient.ExternalTableHandleFrom(projectSchema) + return handle.Update(res) + default: return errors.InvalidArgument(store, "invalid kind for maxcompute resource "+res.Kind()) } @@ -135,6 +155,14 @@ func (MaxCompute) Validate(r *resource.Resource) error { view.Name = r.Name() return view.Validate() + case KindExternalTable: + extTable, err := ConvertSpecTo[ExternalTable](r) + if err != nil { + return err + } + extTable.Name = r.Name() + return extTable.Validate() + default: return errors.InvalidArgument(resource.EntityResource, "unknown kind") } @@ -178,8 +206,9 @@ func (m MaxCompute) Exist(ctx context.Context, tnnt tenant.Tenant, urn resource. } kindToHandleFn := map[string]func(projectSchema ProjectSchema) TableResourceHandle{ - KindTable: client.TableHandleFrom, - KindView: client.ViewHandleFrom, + KindTable: client.TableHandleFrom, + KindView: client.ViewHandleFrom, + KindExternalTable: client.ExternalTableHandleFrom, } for _, resourceHandleFn := range kindToHandleFn { @@ -202,9 +231,10 @@ func startChildSpan(ctx context.Context, name string) (context.Context, trace.Sp return tracer.Start(ctx, name) } -func NewMaxComputeDataStore(secretProvider SecretProvider, clientProvider ClientProvider) *MaxCompute { +func NewMaxComputeDataStore(secretProvider SecretProvider, clientProvider ClientProvider, tenantProvider TenantDetailsGetter) *MaxCompute { return &MaxCompute{ secretProvider: secretProvider, clientProvider: clientProvider, + tenantGetter: tenantProvider, } } diff --git a/ext/store/maxcompute/maxcompute_test.go b/ext/store/maxcompute/maxcompute_test.go index fb57034c1c..b9fdb7dcaa 100644 --- a/ext/store/maxcompute/maxcompute_test.go +++ b/ext/store/maxcompute/maxcompute_test.go @@ -35,7 +35,7 @@ func TestMaxComputeStore(t *testing.T) { defer secretProvider.AssertExpectations(t) clientProvider := new(mockClientProvider) - mcStore := maxcompute.NewMaxComputeDataStore(secretProvider, clientProvider) + mcStore := maxcompute.NewMaxComputeDataStore(secretProvider, clientProvider, nil) res, err := resource.NewResource(fullName, maxcompute.KindTable, store, tnnt, &metadata, spec) assert.Nil(t, err) @@ -53,7 +53,7 @@ func TestMaxComputeStore(t *testing.T) { clientProvider := new(mockClientProvider) clientProvider.On("Get", pts.Value()).Return(nil, errors.New("error in client")) defer clientProvider.AssertExpectations(t) - mcStore := maxcompute.NewMaxComputeDataStore(secretProvider, clientProvider) + mcStore := maxcompute.NewMaxComputeDataStore(secretProvider, clientProvider, nil) res, err := resource.NewResource(fullName, maxcompute.KindTable, store, tnnt, &metadata, spec) assert.Nil(t, err) @@ -74,7 +74,7 @@ func TestMaxComputeStore(t *testing.T) { clientProvider := new(mockClientProvider) clientProvider.On("Get", pts.Value()).Return(client, nil) defer clientProvider.AssertExpectations(t) - mcStore := maxcompute.NewMaxComputeDataStore(secretProvider, clientProvider) + mcStore := maxcompute.NewMaxComputeDataStore(secretProvider, clientProvider, nil) res, err := resource.NewResource(projectName, maxcompute.KindTable, store, tnnt, &metadata, spec) assert.Nil(t, err) @@ -96,7 +96,7 @@ func TestMaxComputeStore(t *testing.T) { clientProvider.On("Get", pts.Value()).Return(client, nil) defer clientProvider.AssertExpectations(t) - mcStore := maxcompute.NewMaxComputeDataStore(secretProvider, clientProvider) + mcStore := maxcompute.NewMaxComputeDataStore(secretProvider, clientProvider, nil) res, err := resource.NewResource(fullName, "unknown", store, tnnt, &metadata, spec) assert.Nil(t, err) @@ -126,7 +126,7 @@ func TestMaxComputeStore(t *testing.T) { clientProvider.On("Get", pts.Value()).Return(client, nil) defer clientProvider.AssertExpectations(t) - mcStore := maxcompute.NewMaxComputeDataStore(secretProvider, clientProvider) + mcStore := maxcompute.NewMaxComputeDataStore(secretProvider, clientProvider, nil) err = mcStore.Create(ctx, res) assert.Nil(t, err) }) @@ -151,7 +151,7 @@ func TestMaxComputeStore(t *testing.T) { clientProvider.On("Get", pts.Value()).Return(client, nil) defer clientProvider.AssertExpectations(t) - mcStore := maxcompute.NewMaxComputeDataStore(secretProvider, clientProvider) + mcStore := maxcompute.NewMaxComputeDataStore(secretProvider, clientProvider, nil) err = mcStore.Create(ctx, res) assert.Nil(t, err) }) @@ -164,7 +164,7 @@ func TestMaxComputeStore(t *testing.T) { defer secretProvider.AssertExpectations(t) clientProvider := new(mockClientProvider) - mcStore := maxcompute.NewMaxComputeDataStore(secretProvider, clientProvider) + mcStore := maxcompute.NewMaxComputeDataStore(secretProvider, clientProvider, nil) res, err := resource.NewResource(fullName, maxcompute.KindTable, store, tnnt, &metadata, spec) assert.Nil(t, err) @@ -182,7 +182,7 @@ func TestMaxComputeStore(t *testing.T) { clientProvider := new(mockClientProvider) clientProvider.On("Get", pts.Value()).Return(nil, errors.New("error in client")) defer clientProvider.AssertExpectations(t) - mcStore := maxcompute.NewMaxComputeDataStore(secretProvider, clientProvider) + mcStore := maxcompute.NewMaxComputeDataStore(secretProvider, clientProvider, nil) res, err := resource.NewResource(fullName, maxcompute.KindTable, store, tnnt, &metadata, spec) assert.Nil(t, err) @@ -203,7 +203,7 @@ func TestMaxComputeStore(t *testing.T) { clientProvider := new(mockClientProvider) clientProvider.On("Get", pts.Value()).Return(client, nil) defer clientProvider.AssertExpectations(t) - mcStore := maxcompute.NewMaxComputeDataStore(secretProvider, clientProvider) + mcStore := maxcompute.NewMaxComputeDataStore(secretProvider, clientProvider, nil) res, err := resource.NewResource(projectName, maxcompute.KindTable, store, tnnt, &metadata, spec) assert.Nil(t, err) @@ -225,7 +225,7 @@ func TestMaxComputeStore(t *testing.T) { clientProvider.On("Get", pts.Value()).Return(client, nil) defer clientProvider.AssertExpectations(t) - mcStore := maxcompute.NewMaxComputeDataStore(secretProvider, clientProvider) + mcStore := maxcompute.NewMaxComputeDataStore(secretProvider, clientProvider, nil) res, err := resource.NewResource(fullName, "unknown", store, tnnt, &metadata, spec) assert.Nil(t, err) @@ -255,7 +255,7 @@ func TestMaxComputeStore(t *testing.T) { clientProvider.On("Get", pts.Value()).Return(client, nil) defer clientProvider.AssertExpectations(t) - mcStore := maxcompute.NewMaxComputeDataStore(secretProvider, clientProvider) + mcStore := maxcompute.NewMaxComputeDataStore(secretProvider, clientProvider, nil) err = mcStore.Update(ctx, res) assert.Nil(t, err) }) @@ -280,7 +280,7 @@ func TestMaxComputeStore(t *testing.T) { clientProvider.On("Get", pts.Value()).Return(client, nil) defer clientProvider.AssertExpectations(t) - mcStore := maxcompute.NewMaxComputeDataStore(secretProvider, clientProvider) + mcStore := maxcompute.NewMaxComputeDataStore(secretProvider, clientProvider, nil) err = mcStore.Update(ctx, res) assert.Nil(t, err) }) @@ -294,7 +294,7 @@ func TestMaxComputeStore(t *testing.T) { res, err := resource.NewResource(fullName, "unknown", store, tnnt, &metadata, invalidSpec) assert.Nil(t, err) - mcStore := maxcompute.NewMaxComputeDataStore(nil, nil) + mcStore := maxcompute.NewMaxComputeDataStore(nil, nil, nil) err = mcStore.Validate(res) assert.NotNil(t, err) assert.ErrorContains(t, err, "unknown kind") @@ -305,7 +305,7 @@ func TestMaxComputeStore(t *testing.T) { assert.Nil(t, err) assert.Equal(t, fullName, res.FullName()) - mcStore := maxcompute.NewMaxComputeDataStore(nil, nil) + mcStore := maxcompute.NewMaxComputeDataStore(nil, nil, nil) err = mcStore.Validate(res) assert.NotNil(t, err) assert.ErrorContains(t, err, "not able to decode spec for "+fullName) @@ -315,7 +315,7 @@ func TestMaxComputeStore(t *testing.T) { assert.Nil(t, err) assert.Equal(t, fullName, res.FullName()) - mcStore := maxcompute.NewMaxComputeDataStore(nil, nil) + mcStore := maxcompute.NewMaxComputeDataStore(nil, nil, nil) err = mcStore.Validate(res) assert.NotNil(t, err) assert.ErrorContains(t, err, "empty schema for table "+fullName) @@ -327,7 +327,7 @@ func TestMaxComputeStore(t *testing.T) { assert.Nil(t, err) assert.Equal(t, fullName, res.FullName()) - mcStore := maxcompute.NewMaxComputeDataStore(nil, nil) + mcStore := maxcompute.NewMaxComputeDataStore(nil, nil, nil) err = mcStore.Validate(res) assert.NotNil(t, err) assert.ErrorContains(t, err, "not able to decode spec for "+fullName) @@ -337,7 +337,7 @@ func TestMaxComputeStore(t *testing.T) { assert.Nil(t, err) assert.Equal(t, fullName, res.FullName()) - mcStore := maxcompute.NewMaxComputeDataStore(nil, nil) + mcStore := maxcompute.NewMaxComputeDataStore(nil, nil, nil) err = mcStore.Validate(res) assert.NotNil(t, err) assert.ErrorContains(t, err, "view query is empty for "+fullName) @@ -358,7 +358,7 @@ func TestMaxComputeStore(t *testing.T) { res, err := resource.NewResource(projectName+"."+schemaName+"."+tableName, maxcompute.KindTable, store, tnnt, &metadata, spec) assert.NoError(t, err) - mcStore := maxcompute.NewMaxComputeDataStore(nil, nil) + mcStore := maxcompute.NewMaxComputeDataStore(nil, nil, nil) actualURN, err := mcStore.GetURN(res) assert.NoError(t, err) assert.Equal(t, expectedURN, actualURN) @@ -372,7 +372,7 @@ func TestMaxComputeStore(t *testing.T) { clientProvider := new(mockClientProvider) defer clientProvider.AssertExpectations(t) - mcStore := maxcompute.NewMaxComputeDataStore(secretProvider, clientProvider) + mcStore := maxcompute.NewMaxComputeDataStore(secretProvider, clientProvider, nil) urn, err := resource.NewURN("random_store", "project.table") assert.NoError(t, err) @@ -390,7 +390,7 @@ func TestMaxComputeStore(t *testing.T) { clientProvider := new(mockClientProvider) defer clientProvider.AssertExpectations(t) - mcStore := maxcompute.NewMaxComputeDataStore(secretProvider, clientProvider) + mcStore := maxcompute.NewMaxComputeDataStore(secretProvider, clientProvider, nil) urn, err := resource.NewURN("maxcompute", "project.schema.table") assert.NoError(t, err) @@ -408,7 +408,7 @@ func TestMaxComputeStore(t *testing.T) { clientProvider.On("Get", pts.Value()).Return(nil, errors.New("error in client")) defer clientProvider.AssertExpectations(t) - mcStore := maxcompute.NewMaxComputeDataStore(secretProvider, clientProvider) + mcStore := maxcompute.NewMaxComputeDataStore(secretProvider, clientProvider, nil) urn, err := resource.NewURN("maxcompute", "project.schema.table") assert.NoError(t, err) @@ -430,7 +430,7 @@ func TestMaxComputeStore(t *testing.T) { clientProvider.On("Get", pts.Value()).Return(client, nil) defer clientProvider.AssertExpectations(t) - mcStore := maxcompute.NewMaxComputeDataStore(secretProvider, clientProvider) + mcStore := maxcompute.NewMaxComputeDataStore(secretProvider, clientProvider, nil) urn, err := resource.NewURN("maxcompute", projectName) assert.NoError(t, err) @@ -454,7 +454,7 @@ func TestMaxComputeStore(t *testing.T) { tableHandle := new(mockTableResourceHandle) defer tableHandle.AssertExpectations(t) - mcStore := maxcompute.NewMaxComputeDataStore(secretProvider, clientProvider) + mcStore := maxcompute.NewMaxComputeDataStore(secretProvider, clientProvider, nil) urn, err := resource.NewURN("maxcompute", "project.table") assert.NoError(t, err) @@ -478,7 +478,7 @@ func TestMaxComputeStore(t *testing.T) { tableHandle := new(mockTableResourceHandle) defer tableHandle.AssertExpectations(t) - mcStore := maxcompute.NewMaxComputeDataStore(secretProvider, clientProvider) + mcStore := maxcompute.NewMaxComputeDataStore(secretProvider, clientProvider, nil) urn, err := resource.NewURN("maxcompute", "project.schema.") assert.NoError(t, err) @@ -506,7 +506,7 @@ func TestMaxComputeStore(t *testing.T) { viewHandle.AssertExpectations(t) }() - mcStore := maxcompute.NewMaxComputeDataStore(secretProvider, clientProvider) + mcStore := maxcompute.NewMaxComputeDataStore(secretProvider, clientProvider, nil) urn, err := resource.NewURN("maxcompute", "project.schema.table") assert.NoError(t, err) @@ -539,7 +539,7 @@ func TestMaxComputeStore(t *testing.T) { viewHandle.AssertExpectations(t) }() - mcStore := maxcompute.NewMaxComputeDataStore(secretProvider, clientProvider) + mcStore := maxcompute.NewMaxComputeDataStore(secretProvider, clientProvider, nil) urn, err := resource.NewURN("maxcompute", "project.schema.table") assert.NoError(t, err) @@ -548,6 +548,8 @@ func TestMaxComputeStore(t *testing.T) { tableHandle.On("Exists", mock.Anything).Return(false).Maybe() client.On("ViewHandleFrom", mock.Anything).Return(viewHandle).Maybe() viewHandle.On("Exists", mock.Anything).Return(false).Maybe() + client.On("ExternalTableHandleFrom", mock.Anything).Return(viewHandle).Maybe() + viewHandle.On("Exists", mock.Anything).Return(false).Maybe() actualExist, actualError := mcStore.Exist(ctx, tnnt, urn) assert.False(t, actualExist) @@ -589,6 +591,11 @@ func (m *mockClient) ViewHandleFrom(projectSchema maxcompute.ProjectSchema) maxc return args.Get(0).(maxcompute.TableResourceHandle) } +func (m *mockClient) ExternalTableHandleFrom(schema maxcompute.ProjectSchema) maxcompute.TableResourceHandle { + args := m.Called(schema) + return args.Get(0).(maxcompute.TableResourceHandle) +} + type mockClientProvider struct { mock.Mock } diff --git a/ext/store/maxcompute/schema.go b/ext/store/maxcompute/schema.go index a69949a0e2..86f11eca03 100644 --- a/ext/store/maxcompute/schema.go +++ b/ext/store/maxcompute/schema.go @@ -15,9 +15,10 @@ import ( const ( resourceSchema = "maxcompute_schema" - KindTable string = "table" - KindView string = "view" - KindSchema string = "schema" + KindTable string = "table" + KindView string = "view" + KindSchema string = "schema" + KindExternalTable string = "external_table" ) type Schema []*Field @@ -277,7 +278,7 @@ func (f *Field) validateNode(checkName bool) error { return mu.ToErr() } -func ConvertSpecTo[T Table | View](res *resource.Resource) (*T, error) { +func ConvertSpecTo[T any](res *resource.Resource) (*T, error) { var spec T if err := mapstructure.Decode(res.Spec(), &spec); err != nil { msg := fmt.Sprintf("%s: not able to decode spec for %s", err, res.FullName()) diff --git a/ext/store/maxcompute/sheet_sync.go b/ext/store/maxcompute/sheet_sync.go new file mode 100644 index 0000000000..5c68a90979 --- /dev/null +++ b/ext/store/maxcompute/sheet_sync.go @@ -0,0 +1,145 @@ +package maxcompute + +import ( + "context" + "errors" + "fmt" + "strings" + "time" + + "github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss" + + "github.com/goto/optimus/core/resource" + "github.com/goto/optimus/core/tenant" + bucket "github.com/goto/optimus/ext/bucket/oss" + "github.com/goto/optimus/ext/sheets/gsheet" +) + +const ( + GsheetCredsKey = "GOOGLE_SHEETS_ACCOUNT" + OSSCredsKey = "OSS_CREDS" + ExtLocation = "" + putTimeOut = time.Second * 10 +) + +type SyncerService struct { + secretProvider SecretProvider + tenantGetter TenantDetailsGetter +} + +func NewSyncer(secretProvider SecretProvider, tenantProvider TenantDetailsGetter) *SyncerService { + return &SyncerService{secretProvider: secretProvider, tenantGetter: tenantProvider} +} + +func (s *SyncerService) Sync(ctx context.Context, res *resource.Resource) error { + // Check if external table is for sheets + et, err := ConvertSpecTo[ExternalTable](res) + if err != nil { + return err + } + + if et.Source.SourceType != GoogleSheet { + return nil + } + + if len(et.Source.SourceURIs) == 0 { + return errors.New("source URI is empty for Google Sheet") + } + uri := et.Source.SourceURIs[0] + + // Get sheet content + content, err := s.getGsheet(ctx, res.Tenant(), uri, et.Source.Range) + if err != nil { + return err + } + + bucketName, err := s.getBucketName(ctx, res, et) + if err != nil { + return err + } + objectKey, err := s.getObjectKey(ctx, res, et) + if err != nil { + return err + } + + return s.writeContentToLocation(ctx, res.Tenant(), bucketName, objectKey, content) +} + +func (s *SyncerService) getGsheet(ctx context.Context, tnnt tenant.Tenant, sheetURI, sheetRange string) (string, error) { + secret, err := s.secretProvider.GetSecret(ctx, tnnt, GsheetCredsKey) + if err != nil { + return "", err + } + + sheets, err := gsheet.NewGSheets(ctx, secret.Value()) + if err != nil { + return "", err + } + return sheets.GetAsCSV(sheetURI, sheetRange) +} + +func (s *SyncerService) getBucketName(ctx context.Context, res *resource.Resource, et *ExternalTable) (string, error) { + location, err := s.getLocation(ctx, res, et) + if err != nil { + return "", err + } + parts := strings.Split(location, "/") + if len(parts) > 3 { // nolint:mnd + bucketName := parts[3] + return bucketName, nil + } + return "", errors.New("unable to get bucketName from Location") +} + +func (s *SyncerService) getObjectKey(ctx context.Context, res *resource.Resource, et *ExternalTable) (string, error) { + components, err := getURNComponent(res) + if err != nil { + return "", err + } + location, err := s.getLocation(ctx, res, et) + if err != nil { + return "", err + } + parts := strings.Split(location, "/") + if len(parts) > 4 { // nolint:mnd + path := strings.Join(parts[4:], "/") + return fmt.Sprintf("%s%s.csv", path, components.Name), nil + } + return "", errors.New("unable to get object path from location") +} + +func (s *SyncerService) getLocation(ctx context.Context, res *resource.Resource, et *ExternalTable) (string, error) { + location := et.Source.Location + if location == "" { + details, err := s.tenantGetter.GetDetails(ctx, res.Tenant()) + if err != nil { + return "", err + } + loc, err := details.GetConfig(ExtLocation) + if err != nil { + return "", err + } + location = loc + } + return location, nil +} + +func (s *SyncerService) writeContentToLocation(ctx context.Context, tnnt tenant.Tenant, bucketName, objectKey, content string) error { + // Setup oss bucket writer + creds, err := s.secretProvider.GetSecret(ctx, tnnt, OSSCredsKey) + if err != nil { + return err + } + ossClient, err := bucket.NewOssClient(creds.Value()) + if err != nil { + return err + } + + _, err = ossClient.PutObject(ctx, &oss.PutObjectRequest{ + Bucket: &bucketName, + Key: &objectKey, + ContentType: oss.Ptr("text/csv"), + Body: strings.NewReader(content), + }) + return err +} diff --git a/ext/store/maxcompute/table_test.go b/ext/store/maxcompute/table_test.go index 3120427ee7..c8fd167cf0 100644 --- a/ext/store/maxcompute/table_test.go +++ b/ext/store/maxcompute/table_test.go @@ -29,6 +29,9 @@ func (m *mockMaxComputeTable) Create(schema tableschema.TableSchema, createIfNot func (m *mockMaxComputeTable) BatchLoadTables(tableNames []string) ([]*odps.Table, error) { args := m.Called(tableNames) + if args.Get(0) == nil { + return nil, args.Error(1) + } return args.Get(0).([]*odps.Table), args.Error(1) } diff --git a/ext/store/maxcompute/view_test.go b/ext/store/maxcompute/view_test.go index 06b81ca3a9..ab4fc3654d 100644 --- a/ext/store/maxcompute/view_test.go +++ b/ext/store/maxcompute/view_test.go @@ -29,10 +29,6 @@ func TestViewHandle(t *testing.T) { odpsInstance := odps.NewInstance(odps.NewOdps(account.NewAliyunAccount(accessID, accessKey), endpoint), projectName, "") - normalTables := []*odps.Table{ - odps.NewTable(odps.NewOdps(account.NewAliyunAccount(accessID, accessKey), endpoint), projectName, schemaName, tableName), - } - t.Run("Create", func(t *testing.T) { t.Run("returns error when cannot convert spec", func(t *testing.T) { table := new(mockMaxComputeTable) @@ -234,7 +230,7 @@ func TestViewHandle(t *testing.T) { t.Run("Exists", func(t *testing.T) { t.Run("returns false when error in checking existing view", func(t *testing.T) { table := new(mockMaxComputeTable) - table.On("BatchLoadTables", mock.Anything).Return([]*odps.Table{}, errors.New("error in get")) + table.On("BatchLoadTables", mock.Anything).Return(nil, errors.New("error in get")) defer table.AssertExpectations(t) schema := new(mockMaxComputeSchema) odpsIns := new(mockOdpsIns) @@ -245,7 +241,8 @@ func TestViewHandle(t *testing.T) { }) t.Run("returns true when checking existing tables", func(t *testing.T) { table := new(mockMaxComputeTable) - table.On("BatchLoadTables", mock.Anything).Return(normalTables, nil) + v1 := odps.NewTable(nil, projectName, schemaName, tableName) + table.On("BatchLoadTables", mock.Anything).Return([]*odps.Table{v1}, nil) defer table.AssertExpectations(t) schema := new(mockMaxComputeSchema) odpsIns := new(mockOdpsIns) diff --git a/go.mod b/go.mod index d10d6e325b..55bb22fa05 100644 --- a/go.mod +++ b/go.mod @@ -9,8 +9,8 @@ require ( github.com/AlecAivazis/survey/v2 v2.3.6 github.com/MakeNowJust/heredoc v1.0.0 github.com/PagerDuty/go-pagerduty v1.5.1 - github.com/aliyun/alibabacloud-oss-go-sdk-v2 v1.1.1 - github.com/aliyun/aliyun-odps-go-sdk v0.3.13 + github.com/aliyun/alibabacloud-oss-go-sdk-v2 v1.1.3 + github.com/aliyun/aliyun-odps-go-sdk v0.3.17 github.com/briandowns/spinner v1.18.0 github.com/charmbracelet/bubbles v0.13.0 github.com/charmbracelet/bubbletea v0.22.1 diff --git a/go.sum b/go.sum index 5242d3814a..799c075c1a 100644 --- a/go.sum +++ b/go.sum @@ -214,8 +214,12 @@ github.com/alibabacloud-go/tea v1.2.2 h1:aTsR6Rl3ANWPfqeQugPglfurloyBJY85eFy7Gc1 github.com/alibabacloud-go/tea v1.2.2/go.mod h1:CF3vOzEMAG+bR4WOql8gc2G9H3EkH3ZLAQdpmpXMgwk= github.com/aliyun/alibabacloud-oss-go-sdk-v2 v1.1.1 h1:Sc2T9vs8SCq0DErL/QIY3ZVx8F+dm+DIv4RFP9NLwC4= github.com/aliyun/alibabacloud-oss-go-sdk-v2 v1.1.1/go.mod h1:FTzydeQVmR24FI0D6XWUOMKckjXehM/jgMn1xC+DA9M= +github.com/aliyun/alibabacloud-oss-go-sdk-v2 v1.1.3 h1:grJyLSdRJtfxKKhCTWSeJhnOQsp2WoLNdK8XA5FE9oo= +github.com/aliyun/alibabacloud-oss-go-sdk-v2 v1.1.3/go.mod h1:FTzydeQVmR24FI0D6XWUOMKckjXehM/jgMn1xC+DA9M= github.com/aliyun/aliyun-odps-go-sdk v0.3.13 h1:frPJCVxhlHceH8077tNJQEuLjPBIbnzq0FE/1HtxrFY= github.com/aliyun/aliyun-odps-go-sdk v0.3.13/go.mod h1:t/tgF/iN5aAs/gLL7sEI8/qdax4NuFCKEjO3OJbHZqI= +github.com/aliyun/aliyun-odps-go-sdk v0.3.17 h1:1dSUL31Z/xbg42j8DpOemyyMgYb9giJfRpLI7YEmw/Y= +github.com/aliyun/aliyun-odps-go-sdk v0.3.17/go.mod h1:h3n3Jy9qCcq9GhKakuF7Y47W1EP71hfTDx8MCEeQYbA= github.com/aliyun/credentials-go v1.3.10 h1:45Xxrae/evfzQL9V10zL3xX31eqgLWEaIdCoPipOEQA= github.com/aliyun/credentials-go v1.3.10/go.mod h1:Jm6d+xIgwJVLVWT561vy67ZRP4lPTQxMbEYRuT2Ti1U= github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129 h1:MzBOUgng9orim59UnfUTLRjMpd09C5uEVQ6RPGeCaVI= @@ -1483,6 +1487,7 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s= diff --git a/server/optimus.go b/server/optimus.go index a182aef854..c8f450898e 100644 --- a/server/optimus.go +++ b/server/optimus.go @@ -396,7 +396,7 @@ func (s *OptimusServer) setupHandlers() error { resourceManager.RegisterDatastore(rModel.Bigquery, bigqueryStore) mcClientProvider := mcStore.NewClientProvider() - maxComputeStore := mcStore.NewMaxComputeDataStore(tenantService, mcClientProvider) + maxComputeStore := mcStore.NewMaxComputeDataStore(tenantService, mcClientProvider, tenantService) resourceManager.RegisterDatastore(rModel.MaxCompute, maxComputeStore) // Tenant Handlers