Skip to content

Commit

Permalink
feat: add support for auto partition using flag toggle
Browse files Browse the repository at this point in the history
  • Loading branch information
deryrahman committed Dec 9, 2024
1 parent 9d84622 commit 5a240fa
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 3 deletions.
6 changes: 4 additions & 2 deletions mc2mc/internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Client struct {

// TODO: remove this temporary capability after 15 nov
enablePartitionValue bool
enableAutoPartition bool
}

func NewClient(ctx context.Context, setupFns ...SetupFn) (*Client, error) {
Expand Down Expand Up @@ -64,7 +65,7 @@ func (c *Client) Execute(ctx context.Context, tableID, queryFilePath string) err
if err != nil {
return errors.WithStack(err)
}
if c.enablePartitionValue {
if c.enablePartitionValue && !c.enableAutoPartition {
queryRaw = addPartitionValueColumn(queryRaw)
}

Expand All @@ -76,7 +77,8 @@ func (c *Client) Execute(ctx context.Context, tableID, queryFilePath string) err

// prepare query
queryToExec := c.Loader.GetQuery(tableID, string(queryRaw))
if len(partitionNames) > 0 {
if len(partitionNames) > 0 && !c.enableAutoPartition {
// when table is partitioned and auto partition is disabled, then we need to specify partition columns explicitly
c.logger.Info(fmt.Sprintf("table %s is partitioned by %s", tableID, strings.Join(partitionNames, ", ")))
queryToExec = c.Loader.GetPartitionedQuery(tableID, string(queryRaw), partitionNames)
}
Expand Down
51 changes: 50 additions & 1 deletion mc2mc/internal/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestExecute(t *testing.T) {
})
t.Run("should return nil when everything is successful", func(t *testing.T) {
// arrange
client, err := client.NewClient(context.TODO(), client.SetupLogger("error"), client.SetupLoader("APPEND"))
client, err := client.NewClient(context.TODO(), client.SetupLogger("error"), client.SetupLoader("REPLACE"))
require.NoError(t, err)
client.OdpsClient = &mockOdpsClient{
partitionResult: func() ([]string, error) {
Expand All @@ -70,6 +70,42 @@ func TestExecute(t *testing.T) {
return nil
},
}
client.Loader = &mockLoader{
getQueryFunc: func(tableID, query string) string {
return "INSERT OVERWRITE TABLE project_test.table_test SELECT * FROM table;"
},
getPartitionedQueryFunc: func(tableID, query string, partitionNames []string) string {
assert.True(t, true, "should be called")
return "INSERT OVERWRITE TABLE project_test.table_test PARTITION(event_date) SELECT * FROM table;"
},
}
require.NoError(t, os.WriteFile("/tmp/query.sql", []byte("SELECT * FROM table;"), 0644))
// act
err = client.Execute(context.TODO(), "project_test.table_test", "/tmp/query.sql")
// assert
assert.NoError(t, err)
})
t.Run("should return nil when everything is successful with enable auto partition", func(t *testing.T) {
// arrange
client, err := client.NewClient(context.TODO(), client.SetupLogger("error"), client.SetupLoader("REPLACE"), client.EnableAutoPartition(true))
require.NoError(t, err)
client.OdpsClient = &mockOdpsClient{
partitionResult: func() ([]string, error) {
return []string{"_partition_value"}, nil
},
execSQLResult: func() error {
return nil
},
}
client.Loader = &mockLoader{
getQueryFunc: func(tableID, query string) string {
return "INSERT OVERWRITE TABLE project_test.table_test SELECT * FROM table;"
},
getPartitionedQueryFunc: func(tableID, query string, partitionNames []string) string {
assert.False(t, true, "should not be called")
return "INSERT OVERWRITE TABLE project_test.table_test PARTITION(event_date) SELECT * FROM table;"
},
}
require.NoError(t, os.WriteFile("/tmp/query.sql", []byte("SELECT * FROM table;"), 0644))
// act
err = client.Execute(context.TODO(), "project_test.table_test", "/tmp/query.sql")
Expand All @@ -90,3 +126,16 @@ func (m *mockOdpsClient) GetPartitionNames(ctx context.Context, tableID string)
func (m *mockOdpsClient) ExecSQL(ctx context.Context, query string) error {
return m.execSQLResult()
}

type mockLoader struct {
getQueryFunc func(tableID, query string) string
getPartitionedQueryFunc func(tableID, query string, partitionNames []string) string
}

func (m *mockLoader) GetQuery(tableID, query string) string {
return m.getQueryFunc(tableID, query)
}

func (m *mockLoader) GetPartitionedQuery(tableID, query string, partitionNames []string) string {
return m.getPartitionedQueryFunc(tableID, query, partitionNames)
}
7 changes: 7 additions & 0 deletions mc2mc/internal/client/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,10 @@ func EnablePartitionValue(enabled bool) SetupFn {
return nil
}
}

func EnableAutoPartition(enabled bool) SetupFn {
return func(c *Client) error {
c.enableAutoPartition = enabled
return nil
}
}
2 changes: 2 additions & 0 deletions mc2mc/internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type Config struct {
ScheduledTime string
// TODO: remove this temporary support after 15 nov 2024
DevEnablePartitionValue bool
DevEnableAutoPartition bool
}

type maxComputeCredentials struct {
Expand All @@ -41,6 +42,7 @@ func NewConfig() (*Config, error) {
ScheduledTime: getEnv("SCHEDULED_TIME", ""),
// TODO: delete this after 15 nov
DevEnablePartitionValue: getEnv("DEV__ENABLE_PARTITION_VALUE", "false") == "true",
DevEnableAutoPartition: getEnv("DEV__ENABLE_AUTO_PARTITION", "false") == "true",
}
// ali-odps-go-sdk related config
scvAcc := getEnv("MC_SERVICE_ACCOUNT", "")
Expand Down
1 change: 1 addition & 0 deletions mc2mc/mc2mc.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func mc2mc() error {
client.SetupODPSClient(cfg.GenOdps()),
client.SetupLoader(cfg.LoadMethod),
client.EnablePartitionValue(cfg.DevEnablePartitionValue),
client.EnableAutoPartition(cfg.DevEnableAutoPartition),
)
if err != nil {
return errors.WithStack(err)
Expand Down

0 comments on commit 5a240fa

Please sign in to comment.