Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: External table support #310

Merged
merged 23 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions ext/bucket/oss/oss.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package bucket

import (
"encoding/json"
"errors"

"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss"
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/credentials"
)

type OSSCredentials struct {
AccessID string `json:"access_key_id"`
AccessKey string `json:"access_key_secret"`
Endpoint string `json:"endpoint"`
ProjectName string `json:"project_name"`
Region string `json:"region"`
SecurityToken string `json:"security_token"`
}

func NewOssClient(creds string) (*oss.Client, error) {
cred, err := toOSSCredentials(creds)
if err != nil {
return nil, err
}

credProvider := credentials.NewStaticCredentialsProvider(cred.AccessID, cred.AccessKey, cred.SecurityToken)
cfg := oss.LoadDefaultConfig().
WithCredentialsProvider(credProvider).
WithEndpoint(cred.Endpoint).
WithRegion(cred.Region)

if cfg.CredentialsProvider == nil {
return nil, errors.New("OSS: credentials provider is required")
}

return oss.NewClient(cfg), nil
}

func toOSSCredentials(creds string) (OSSCredentials, error) {
var cred OSSCredentials
if err := json.Unmarshal([]byte(creds), &cred); err != nil {
return OSSCredentials{}, err
}

return cred, nil
}
42 changes: 6 additions & 36 deletions ext/scheduler/airflow/bucket/oss.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,62 +2,32 @@ package bucket

import (
"context"
"encoding/json"
"net/url"

"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss"
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/credentials"
"go.opentelemetry.io/otel"
"gocloud.dev/blob"

"github.com/goto/optimus/core/tenant"
oss "github.com/goto/optimus/ext/bucket/oss"
"github.com/goto/optimus/ext/scheduler/airflow"
"github.com/goto/optimus/ext/scheduler/airflow/bucket/ossblob"
)

type ossCredentials struct {
AccessID string `json:"access_key_id"`
AccessKey string `json:"access_key_secret"`
Endpoint string `json:"endpoint"`
ProjectName string `json:"project_name"`
Region string `json:"region"`
SecurityToken string `json:"security_token"`
}

func (f *Factory) GetOSSBucket(ctx context.Context, tnnt tenant.Tenant, parsedURL *url.URL) (airflow.Bucket, error) {
spanCtx, span := otel.Tracer("airflow/bucketFactory").Start(ctx, "GetOSSBucket")
defer span.End()

cred, err := f.getOSSCredentials(spanCtx, tnnt)
cred, err := f.secretsGetter.Get(spanCtx, tnnt.ProjectName(), tnnt.NamespaceName().String(), tenant.SecretStorageKey)
if err != nil {
return nil, err
}

credProvider := credentials.NewStaticCredentialsProvider(cred.AccessID, cred.AccessKey, cred.SecurityToken)

cfg := oss.LoadDefaultConfig().
WithCredentialsProvider(credProvider).
WithEndpoint(cred.Endpoint).
WithRegion(cred.Region)

client, err := ossblob.OpenBucket(ctx, cfg, parsedURL.Host)
ossClient, err := oss.NewOssClient(cred.Value())
if err != nil {
return nil, err
}

return client, nil
}

func (f *Factory) getOSSCredentials(ctx context.Context, tnnt tenant.Tenant) (ossCredentials, error) {
// Get credentials from secret manager
secret, err := f.secretsGetter.Get(ctx, tnnt.ProjectName(), tnnt.NamespaceName().String(), tenant.SecretStorageKey)
if err != nil {
return ossCredentials{}, err
}

var cred ossCredentials
if err := json.Unmarshal([]byte(secret.Value()), &cred); err != nil {
return ossCredentials{}, err
}
driver := ossblob.NewOSSBucket(ossClient, parsedURL.Host)

return cred, nil
return blob.NewBucket(driver), nil
}
26 changes: 0 additions & 26 deletions ext/scheduler/airflow/bucket/ossblob/ossblob.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"strings"

"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss"
"gocloud.dev/blob"
"gocloud.dev/blob/driver"
"gocloud.dev/gcerrors"
)
Expand Down Expand Up @@ -371,31 +370,6 @@ func (b *ossBucket) Copy(ctx context.Context, dstKey, srcKey string, _ *driver.C
return err
}

func openBucket(_ context.Context, cfg *oss.Config, bucketName string) (*ossBucket, error) {
if cfg == nil {
return nil, errors.New("ossblob.openBucket: oss config are required")
}
if cfg.CredentialsProvider == nil {
return nil, errors.New("ossblob.openBucket: credentials provider is required")
}
if bucketName == "" {
return nil, errors.New("ossblob.openBucket: bucketName is required")
}

client := oss.NewClient(cfg)

return NewOSSBucket(client, bucketName), nil
}

func OpenBucket(ctx context.Context, cfg *oss.Config, bucketName string) (*blob.Bucket, error) {
drv, err := openBucket(ctx, cfg, bucketName)
if err != nil {
return nil, err
}

return blob.NewBucket(drv), nil
}

func safeGet[T any](obj *T) T {
var zero T
if obj == nil {
Expand Down
31 changes: 0 additions & 31 deletions ext/scheduler/airflow/bucket/ossblob/ossblob_test.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
package ossblob_test

import (
"context"
"errors"
"io"
"net/http"
"strings"
"testing"

"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss"
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/credentials"
"github.com/stretchr/testify/assert"
"gocloud.dev/gcerrors"

Expand Down Expand Up @@ -98,32 +96,3 @@ func TestOSSBucket(t *testing.T) {
assert.NoError(t, b.Close())
})
}

func TestOpenBucket(t *testing.T) {
ctx := context.Background()
cfg := &oss.Config{}
bucketName := "test-bucket"

t.Run("openBucket with nil config", func(t *testing.T) {
_, err := ossblob.OpenBucket(ctx, nil, bucketName)
assert.Error(t, err)
})

t.Run("openBucket with nil credentials provider", func(t *testing.T) {
cfg.CredentialsProvider = nil
_, err := ossblob.OpenBucket(ctx, cfg, bucketName)
assert.Error(t, err)
})

t.Run("openBucket with empty bucket name", func(t *testing.T) {
_, err := ossblob.OpenBucket(ctx, cfg, "")
assert.Error(t, err)
})

t.Run("openBucket with valid config", func(t *testing.T) {
cfg.CredentialsProvider = &credentials.StaticCredentialsProvider{}
b, err := ossblob.OpenBucket(ctx, cfg, bucketName)
assert.NoError(t, err)
assert.NotNil(t, b)
})
}
46 changes: 46 additions & 0 deletions ext/sheets/csv/csv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package csv

import (
"encoding/csv"
"strings"
)

func FromRecords(data [][]interface{}) (string, error) {
if len(data) == 0 {
return "", nil
}

lenRecords := len(data[0])
var allRecords [][]string
for _, row := range data {
var currRow []string
i := 0
for _, r1 := range row {
i++
s, ok := r1.(string)
if !ok {
s = ""
}
currRow = append(currRow, s)
}
for i < lenRecords {
currRow = append(currRow, "")
i++
}
allRecords = append(allRecords, currRow)
}

return FromData(allRecords)
}

func FromData(records [][]string) (string, error) {
out := new(strings.Builder)
w := csv.NewWriter(out)

err := w.WriteAll(records)
if err != nil {
return "", err
}

return out.String(), nil
}
59 changes: 59 additions & 0 deletions ext/sheets/gsheet/gsheet.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package gsheet

import (
"context"
"errors"
"fmt"

"google.golang.org/api/option"
"google.golang.org/api/sheets/v4"

"github.com/goto/optimus/ext/sheets/csv"
)

const (
readRange = "Sheet1"
)

type GSheets struct {
srv *sheets.Service
}

func NewGSheets(ctx context.Context, creds string) (*GSheets, error) {
srv, err := sheets.NewService(ctx, option.WithCredentialsJSON([]byte(creds)))
if err != nil {
return nil, fmt.Errorf("not able to create sheets service err: %w", err)
}

return &GSheets{srv: srv}, nil
}

func (gs *GSheets) GetAsCSV(url, sheetRange string) (string, error) {
info, err := FromURL(url)
if err != nil {
return "", err
}

if sheetRange == "" {
sheetRange = readRange
}
content, err := gs.getSheetContent(info.SheetID, sheetRange)
if err != nil {
return "", err
}

return csv.FromRecords(content)
}

func (gs *GSheets) getSheetContent(sheetID, sheetRange string) ([][]interface{}, error) {
resp, err := gs.srv.Spreadsheets.Values.Get(sheetID, sheetRange).Do()
if err != nil {
return nil, err
}

if len(resp.Values) == 0 {
return nil, errors.New("no data found in the sheet")
}

return resp.Values, nil
}
34 changes: 34 additions & 0 deletions ext/sheets/gsheet/sheet_info.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package gsheet

import (
"errors"
"regexp"
)

var (
sheetIDRegex = regexp.MustCompile(`spreadsheets/d/([^/]*)`)
gidRegex = regexp.MustCompile(`gid=([0-9]*)`)
)

type SheetsInfo struct {
SheetID string
GID string
}

func FromURL(u1 string) (*SheetsInfo, error) {
res := sheetIDRegex.FindStringSubmatch(u1)
if len(res) < 2 || res[1] == "" {
return nil, errors.New("not able to get spreadsheetID")
}

gid := ""
res2 := gidRegex.FindStringSubmatch(u1)
if len(res2) > 1 && res2[1] != "" {
gid = res2[1]
}

return &SheetsInfo{
SheetID: res[1],
GID: gid,
}, nil
}
35 changes: 35 additions & 0 deletions ext/sheets/gsheet/sheet_info_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package gsheet_test

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/goto/optimus/ext/sheets/gsheet"
)

func TestSheetInfo(t *testing.T) {
t.Run("return error when id missing", func(t *testing.T) {
u1 := "https://docs.google.com/spreadsheets/d"

_, err := gsheet.FromURL(u1)
assert.Error(t, err)
assert.ErrorContains(t, err, "not able to get spreadsheetID")
})
t.Run("return sheet info with id", func(t *testing.T) {
u1 := "https://docs.google.com/spreadsheets/d/abcedefgh/edit?usp=sharing"

info, err := gsheet.FromURL(u1)
assert.Nil(t, err)
assert.Equal(t, info.SheetID, "abcedefgh")
assert.Equal(t, info.GID, "")
})
t.Run("return sheet info with sid and gid", func(t *testing.T) {
u1 := "https://docs.google.com/spreadsheets/d/abcdeghi/edit#gid=3726"

info, err := gsheet.FromURL(u1)
assert.Nil(t, err)
assert.Equal(t, info.SheetID, "abcdeghi")
assert.Equal(t, info.GID, "3726")
})
}
8 changes: 8 additions & 0 deletions ext/store/maxcompute/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ func (c *MaxComputeClient) TableHandleFrom(projectSchema ProjectSchema) TableRes
return NewTableHandle(c, s, t)
}

func (c *MaxComputeClient) ExternalTableHandleFrom(projectSchema ProjectSchema) TableResourceHandle {
c.SetDefaultProjectName(projectSchema.Project)
c.SetCurrentSchemaName(projectSchema.Schema)
s := c.Schemas()
t := c.Tables()
return NewExternalTableHandle(c, s, t)
}

func (c *MaxComputeClient) ViewHandleFrom(projectSchema ProjectSchema) TableResourceHandle {
c.SetDefaultProjectName(projectSchema.Project)
c.SetCurrentSchemaName(projectSchema.Schema)
Expand Down
Loading
Loading