Skip to content

Commit

Permalink
test: add client test
Browse files Browse the repository at this point in the history
  • Loading branch information
deryrahman committed Sep 30, 2024
1 parent 5f3867f commit 13df94b
Show file tree
Hide file tree
Showing 6 changed files with 184 additions and 36 deletions.
9 changes: 7 additions & 2 deletions max2max/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,23 @@ module github.com/goto/maxcompute-transformation

go 1.22.3

require github.com/aliyun/aliyun-odps-go-sdk v0.3.4
require (
github.com/aliyun/aliyun-odps-go-sdk v0.3.4
github.com/stretchr/testify v1.9.0
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/flatbuffers v23.5.26+incompatible // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/klauspost/compress v1.15.9 // indirect
github.com/pierrec/lz4/v4 v4.1.18 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/stretchr/testify v1.9.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
1 change: 1 addition & 0 deletions max2max/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA=
gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
Expand Down
39 changes: 13 additions & 26 deletions max2max/internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,24 @@ type Loader interface {
GetPartitionedQuery(tableID, query string, partitionName []string) string
}

type client struct {
type OdpsClient interface {
GetPartitionNames(tableID string) ([]string, error)
ExecSQL(query string) error
}

type Client struct {
logger *slog.Logger
odpsClient *odps.Odps
OdpsClient OdpsClient
}

func NewClient(logger *slog.Logger, odpsClient *odps.Odps) *client {
return &client{
func NewClient(logger *slog.Logger, odpsClient *odps.Odps) *Client {
return &Client{
logger: logger,
odpsClient: odpsClient,
OdpsClient: NewODPSClient(odpsClient),
}
}

func (c *client) Execute(loader Loader, tableID, queryFilePath string) error {
func (c *Client) Execute(loader Loader, tableID, queryFilePath string) error {
// read query from filepath
c.logger.Info(fmt.Sprintf("executing query from %s", queryFilePath))
queryRaw, err := os.ReadFile(queryFilePath)
Expand All @@ -36,7 +41,7 @@ func (c *client) Execute(loader Loader, tableID, queryFilePath string) error {

// check if table is partitioned
c.logger.Info(fmt.Sprintf("checking if table %s is partitioned", tableID))
partitionNames, err := c.getPartitionNames(tableID)
partitionNames, err := c.OdpsClient.GetPartitionNames(tableID)
if err != nil {
return err
}
Expand All @@ -50,28 +55,10 @@ func (c *client) Execute(loader Loader, tableID, queryFilePath string) error {

// execute query with odps client
c.logger.Info(fmt.Sprintf("execute: %s", queryToExec))
taskIns, err := c.odpsClient.ExecSQl(queryToExec)
if err != nil {
if err := c.OdpsClient.ExecSQL(queryToExec); err != nil {
return err
}

// wait execution success
c.logger.Info(fmt.Sprintf("taskId: %s", taskIns.Id()))
if err := taskIns.WaitForSuccess(); err != nil {
return err
}
c.logger.Info("execution done")
return nil
}

func (c *client) getPartitionNames(tableID string) ([]string, error) {
table := c.odpsClient.Table(tableID)
if err := table.Load(); err != nil {
return nil, err
}
var partitionNames []string
for _, partition := range table.Schema().PartitionColumns {
partitionNames = append(partitionNames, partition.Name)
}
return partitionNames, nil
}
112 changes: 112 additions & 0 deletions max2max/internal/client/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package client_test

import (
"fmt"
"log/slog"
"os"
"testing"

"github.com/goto/maxcompute-transformation/internal/client"
"github.com/stretchr/testify/assert"
)

func TestExecute(t *testing.T) {
t.Run("should return error when reading query file fails", func(t *testing.T) {
// arrange
client := client.NewClient(slog.Default(), nil)
client.OdpsClient = &mockOdpsClient{}
// act
err := client.Execute(nil, "", "./nonexistentfile")
// assert
assert.Error(t, err)
})
t.Run("should return error when getting partition name fails", func(t *testing.T) {
// arrange
client := client.NewClient(slog.Default(), nil)
client.OdpsClient = &mockOdpsClient{
partitionResult: func() ([]string, error) {
return nil, fmt.Errorf("error get partition name")
},
}
assert.NoError(t, os.WriteFile("/tmp/query.sql", []byte("SELECT * FROM table;"), 0644))
// act
err := client.Execute(nil, "project_test.table_test", "/tmp/query.sql")
// assert
assert.Error(t, err)
assert.ErrorContains(t, err, "error get partition name")
})
t.Run("should return error when executing query fails", func(t *testing.T) {
// arrange
client := client.NewClient(slog.Default(), nil)
client.OdpsClient = &mockOdpsClient{
partitionResult: func() ([]string, error) {
return nil, nil
},
execSQLResult: func() error {
return fmt.Errorf("error exec sql")
},
}
loader := &mockLoader{
getQueryResult: func() string {
return "INSERT INTO table SELECT * FROM table;"
},
}
assert.NoError(t, os.WriteFile("/tmp/query.sql", []byte("SELECT * FROM table;"), 0644))
// act
err := client.Execute(loader, "project_test.table_test", "/tmp/query.sql")
// assert
assert.Error(t, err)
assert.ErrorContains(t, err, "error exec sql")
})
t.Run("should return nil when everything is successful", func(t *testing.T) {
// arrange
client := client.NewClient(slog.Default(), nil)
client.OdpsClient = &mockOdpsClient{
partitionResult: func() ([]string, error) {
return []string{"event_date"}, nil
},
execSQLResult: func() error {
return nil
},
}
loader := &mockLoader{
getQueryResult: func() string {
return "INSERT INTO table SELECT * FROM table;"
},
getPartitionedQueryResult: func() string {
return "INSERT INTO table PARTITION (event_date) SELECT * FROM table;"
},
}
assert.NoError(t, os.WriteFile("/tmp/query.sql", []byte("SELECT * FROM table;"), 0644))
// act
err := client.Execute(loader, "project_test.table_test", "/tmp/query.sql")
// assert
assert.NoError(t, err)
})
}

type mockOdpsClient struct {
partitionResult func() ([]string, error)
execSQLResult func() error
}

func (m *mockOdpsClient) GetPartitionNames(tableID string) ([]string, error) {
return m.partitionResult()
}

func (m *mockOdpsClient) ExecSQL(query string) error {
return m.execSQLResult()
}

type mockLoader struct {
getQueryResult func() string
getPartitionedQueryResult func() string
}

func (m *mockLoader) GetQuery(tableID, query string) string {
return m.getQueryResult()
}

func (m *mockLoader) GetPartitionedQuery(tableID, query string, partitionName []string) string {
return m.getPartitionedQueryResult()
}
43 changes: 43 additions & 0 deletions max2max/internal/client/odps.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package client

import (
"fmt"
"log/slog"

"github.com/aliyun/aliyun-odps-go-sdk/odps"
)

type odpsClient struct {
logger *slog.Logger
client *odps.Odps
}

func NewODPSClient(client *odps.Odps) *odpsClient {
return &odpsClient{
client: client,
}
}

// ExecSQL executes the given query in syncronous mode (blocking)
func (c *odpsClient) ExecSQL(query string) error {
taskIns, err := c.client.ExecSQl(query)
if err != nil {
return err
}

// wait execution success
c.logger.Info(fmt.Sprintf("taskId: %s", taskIns.Id()))
return taskIns.WaitForSuccess()
}

func (c *odpsClient) GetPartitionNames(tableID string) ([]string, error) {
table := c.client.Table(tableID)
if err := table.Load(); err != nil {
return nil, err
}
var partitionNames []string
for _, partition := range table.Schema().PartitionColumns {
partitionNames = append(partitionNames, partition.Name)
}
return partitionNames, nil
}
16 changes: 8 additions & 8 deletions max2max/internal/loader/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ func GetLoader(name string, logger *slog.Logger) (Loader, error) {
switch name {
case APPEND:
return NewAppendLoader(logger), nil
case REPLACE:
return NewReplaceLoader(logger), nil
case REPLACE_ALL:
return NewReplaceAllLoader(logger), nil
case MERGE:
return NewMergeLoader(logger), nil
case MERGE_REPLACE:
return NewMergeReplaceLoader(logger), nil
// case REPLACE:
// return NewReplaceLoader(logger), nil
// case REPLACE_ALL:
// return NewReplaceAllLoader(logger), nil
// case MERGE:
// return NewMergeLoader(logger), nil
// case MERGE_REPLACE:
// return NewMergeReplaceLoader(logger), nil
default:
return nil, fmt.Errorf("loader %s not found", name)
}
Expand Down

0 comments on commit 13df94b

Please sign in to comment.