Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat : API to sync external Resources #309

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions core/resource/handler/v1beta1/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type ResourceService interface {
ChangeNamespace(ctx context.Context, datastore resource.Store, resourceFullName string, oldTenant, newTenant tenant.Tenant) error
Get(ctx context.Context, tnnt tenant.Tenant, store resource.Store, resourceName string) (*resource.Resource, error)
GetAll(ctx context.Context, tnnt tenant.Tenant, store resource.Store) ([]*resource.Resource, error)
GetAllExternal(ctx context.Context, tnnt tenant.Tenant, store resource.Store) ([]*resource.Resource, error)
Deploy(ctx context.Context, tnnt tenant.Tenant, store resource.Store, resources []*resource.Resource, logWriter writer.LogWriter) error
SyncResources(ctx context.Context, tnnt tenant.Tenant, store resource.Store, names []string) (*resource.SyncResponse, error)
}
Expand Down Expand Up @@ -182,6 +183,33 @@ func (rh ResourceHandler) ListResourceSpecification(ctx context.Context, req *pb
}, nil
}

func (rh ResourceHandler) SyncExternalTables(ctx context.Context, req *pb.SyncExternalTablesRequest) (*pb.SyncExternalTablesResponse, error) {
tnnt, err := tenant.NewTenant(req.GetProjectName(), req.GetNamespaceName())
if err != nil {
rh.l.Error("invalid tenant information request project [%s] namespace [%s]: %s", req.GetProjectName(), req.GetNamespaceName(), err)
return nil, errors.GRPCErr(err, "failed to list resource for ")
}

resources, err := rh.service.GetAllExternal(ctx, tnnt, resource.Bigquery)
if err != nil {
rh.l.Error("error getting all resources: %s", err)
return nil, errors.GRPCErr(err, "failed to list resource for "+resource.Bigquery.String())
}

for _, res := range resources {
spec := res.Spec()
source := spec["source"].(map[string]any)
urls := source["uris"].([]string)
destination := req.GetDestination()
rh.l.Info("Sync Sheets to OSS for resource: %s, SheetLinks: %s, to: %s \n", res.Name(), urls, destination)
}

return &pb.SyncExternalTablesResponse{
Success: true,
Message: fmt.Sprintf("successfully synced [%d] resources", len(resources)),
}, nil
}

func (rh ResourceHandler) CreateResource(ctx context.Context, req *pb.CreateResourceRequest) (*pb.CreateResourceResponse, error) {
tnnt, err := tenant.NewTenant(req.GetProjectName(), req.GetNamespaceName())
if err != nil {
Expand Down
9 changes: 9 additions & 0 deletions core/resource/handler/v1beta1/resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1244,6 +1244,15 @@ func (r *resourceService) GetAll(ctx context.Context, tnnt tenant.Tenant, store
return resources, args.Error(1)
}

func (r *resourceService) GetAllExternal(ctx context.Context, tnnt tenant.Tenant, store resource.Store) ([]*resource.Resource, error) {
args := r.Called(ctx, tnnt, store)
var rs []*resource.Resource
if args.Get(0) != nil {
rs = args.Get(0).([]*resource.Resource)
}
return rs, args.Error(1)
}

func (r *resourceService) Deploy(ctx context.Context, tnnt tenant.Tenant, store resource.Store, resources []*resource.Resource, logWriter writer.LogWriter) error {
args := r.Called(ctx, tnnt, store, resources, logWriter)
return args.Error(0)
Expand Down
5 changes: 5 additions & 0 deletions core/resource/service/resource_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type ResourceRepository interface {
ChangeNamespace(ctx context.Context, res *resource.Resource, newTenant tenant.Tenant) error
ReadByFullName(ctx context.Context, tnnt tenant.Tenant, store resource.Store, fullName string, onlyActive bool) (*resource.Resource, error)
ReadAll(ctx context.Context, tnnt tenant.Tenant, store resource.Store, onlyActive bool) ([]*resource.Resource, error)
GetAllExternal(ctx context.Context, tnnt tenant.Tenant, store resource.Store) ([]*resource.Resource, error)
GetResources(ctx context.Context, tnnt tenant.Tenant, store resource.Store, names []string) ([]*resource.Resource, error)
ReadByURN(ctx context.Context, tnnt tenant.Tenant, urn resource.URN) (*resource.Resource, error)
}
Expand Down Expand Up @@ -377,6 +378,10 @@ func (rs ResourceService) GetAll(ctx context.Context, tnnt tenant.Tenant, store
return rs.repo.ReadAll(ctx, tnnt, store, true)
}

func (rs ResourceService) GetAllExternal(ctx context.Context, tnnt tenant.Tenant, store resource.Store) ([]*resource.Resource, error) { // nolint:gocritic
return rs.repo.ReadAll(ctx, tnnt, store, true)
}

func (rs ResourceService) SyncResources(ctx context.Context, tnnt tenant.Tenant, store resource.Store, names []string) (*resource.SyncResponse, error) { // nolint:gocritic
resources, err := rs.repo.GetResources(ctx, tnnt, store, names)
if err != nil {
Expand Down
8 changes: 8 additions & 0 deletions core/resource/service/resource_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1797,6 +1797,14 @@ func (m *mockResourceRepository) ReadByFullName(ctx context.Context, tnnt tenant
return args.Get(0).(*resource.Resource), args.Error(1)
}

func (m *mockResourceRepository) GetAllExternal(ctx context.Context, tnnt tenant.Tenant, store resource.Store) ([]*resource.Resource, error) {
args := m.Called(ctx, tnnt, store)
if args.Get(0) == nil {
return nil, args.Error(1)
}
return args.Get(0).([]*resource.Resource), args.Error(1)
}

func (m *mockResourceRepository) Update(ctx context.Context, res *resource.Resource) error {
return m.Called(ctx, res).Error(0)
}
Expand Down
46 changes: 46 additions & 0 deletions ext/format/csv/csv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package csv

import (
"encoding/csv"
"strings"
)

func FromRecords(data [][]interface{}) (string, error) {
if len(data) == 0 {
return "", nil
}

lenRecords := len(data[0])
var allRecords [][]string
for _, row := range data {
var currRow []string
i := 0
for _, r1 := range row {
i++
s, ok := r1.(string)
if !ok {
s = ""
}
currRow = append(currRow, s)
}
for i < lenRecords {
currRow = append(currRow, "")
i++
}
allRecords = append(allRecords, currRow)
}

return FromData(allRecords)
}

func FromData(records [][]string) (string, error) {
out := new(strings.Builder)
w := csv.NewWriter(out)

err := w.WriteAll(records)
if err != nil {
return "", err
}

return out.String(), nil
}
55 changes: 55 additions & 0 deletions ext/sheets/gsheet/gsheet.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package gsheet

import (
"context"
"errors"

"google.golang.org/api/option"
"google.golang.org/api/sheets/v4"

"github.com/goto/optimus/ext/format/csv"
)

const (
readRange = "Sheet1"
)

type GSheets struct {
srv *sheets.Service
}

func NewGSheets(ctx context.Context, creds string) (*GSheets, error) {
srv, err := sheets.NewService(ctx, option.WithCredentialsJSON([]byte(creds)))
if err != nil {
return nil, errors.New("not able to create sheets service")
}

return &GSheets{srv: srv}, nil
}

func (gs *GSheets) GetAsCSV(url string) (string, error) {
info, err := FromURL(url)
if err != nil {
return "", err
}

content, err := gs.getSheetContent(info.SheetID)
if err != nil {
return "", err
}

return csv.FromRecords(content)
}

func (gs *GSheets) getSheetContent(sheetID string) ([][]interface{}, error) {
resp, err := gs.srv.Spreadsheets.Values.Get(sheetID, readRange).Do()
if err != nil {
return nil, err
}

if len(resp.Values) == 0 {
return nil, errors.New("no data found in the sheet")
}

return resp.Values, nil
}
34 changes: 34 additions & 0 deletions ext/sheets/gsheet/sheet_info.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package gsheet

import (
"errors"
"regexp"
)

var (
sheetIDRegex = regexp.MustCompile(`spreadsheets/d/([^/]*)`)
gidRegex = regexp.MustCompile(`gid=([0-9]*)`)
)

type SheetsInfo struct {
SheetID string
GID string
}

func FromURL(u1 string) (*SheetsInfo, error) {
res := sheetIDRegex.FindStringSubmatch(u1)
if len(res) < 2 || res[1] == "" {
return nil, errors.New("not able to get spreadsheetID")
}

gid := ""
res2 := gidRegex.FindStringSubmatch(u1)
if len(res2) > 1 && res2[1] != "" {
gid = res2[1]
}

return &SheetsInfo{
SheetID: res[1],
GID: gid,
}, nil
}
35 changes: 35 additions & 0 deletions ext/sheets/gsheet/sheet_info_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package gsheet_test

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/goto/optimus/ext/sheets/gsheet"
)

func TestSheetInfo(t *testing.T) {
t.Run("return error when id missing", func(t *testing.T) {
u1 := "https://docs.google.com/spreadsheets/d"

_, err := gsheet.FromURL(u1)
assert.Error(t, err)
assert.ErrorContains(t, err, "not able to get spreadsheetID")
})
t.Run("return sheet info with id", func(t *testing.T) {
u1 := "https://docs.google.com/spreadsheets/d/abcedefgh/edit?usp=sharing"

info, err := gsheet.FromURL(u1)
assert.Nil(t, err)
assert.Equal(t, info.SheetID, "abcedefgh")
assert.Equal(t, info.GID, "")
})
t.Run("return sheet info with sid and gid", func(t *testing.T) {
u1 := "https://docs.google.com/spreadsheets/d/abcdeghi/edit#gid=3726"

info, err := gsheet.FromURL(u1)
assert.Nil(t, err)
assert.Equal(t, info.SheetID, "abcdeghi")
assert.Equal(t, info.GID, "3726")
})
}
29 changes: 29 additions & 0 deletions internal/store/postgres/resource/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,35 @@ func (r Repository) ReadAll(ctx context.Context, tnnt tenant.Tenant, store resou
return resources, nil
}

func (r Repository) GetAllExternal(ctx context.Context, tnnt tenant.Tenant, store resource.Store) ([]*resource.Resource, error) {
getAllResources := `SELECT ` + resourceColumns + ` FROM resource WHERE project_name = $1 and namespace_name = $2 and store = $3 and kind = 'external_table'`
args := []any{tnnt.ProjectName(), tnnt.NamespaceName(), store}

rows, err := r.db.Query(ctx, getAllResources, args...)
if err != nil {
return nil, errors.Wrap(resource.EntityResource, "error in GetAllExternal", err)
}
defer rows.Close()

var resources []*resource.Resource
for rows.Next() {
var res Resource
err = rows.Scan(&res.ID, &res.FullName, &res.Kind, &res.Store, &res.Status, &res.URN,
&res.ProjectName, &res.NamespaceName, &res.Metadata, &res.Spec, &res.CreatedAt, &res.UpdatedAt)
if err != nil {
return nil, errors.Wrap(resource.EntityResource, "error in GetAll", err)
}

resourceModel, err := FromModelToResource(&res)
if err != nil {
return nil, err
}
resources = append(resources, resourceModel)
}

return resources, nil
}

func (r Repository) GetResources(ctx context.Context, tnnt tenant.Tenant, store resource.Store, names []string) ([]*resource.Resource, error) {
getAllResources := `SELECT ` + resourceColumns + ` FROM resource WHERE project_name = $1 and namespace_name = $2 and
store = $3 AND full_name = any ($4) AND status NOT IN ($5, $6)`
Expand Down
Loading
Loading