Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: partitiontime to use dstart env #50

Merged
merged 1 commit into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions mc2mc/internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"log/slog"
"os"
"strings"
"time"

"github.com/pkg/errors"

Expand Down Expand Up @@ -59,7 +60,7 @@ func (c *Client) Close() error {
return errors.WithStack(err)
}

func (c *Client) Execute(ctx context.Context, tableID, queryFilePath string) error {
func (c *Client) Execute(ctx context.Context, tableID, queryFilePath string, dstart string) error {
// read query from filepath
c.logger.Info(fmt.Sprintf("executing query from %s", queryFilePath))
queryRaw, err := os.ReadFile(queryFilePath)
Expand All @@ -73,8 +74,14 @@ func (c *Client) Execute(ctx context.Context, tableID, queryFilePath string) err
if err != nil {
return errors.WithStack(err)
}
// construct query with ordered columns and BQ pseudo columns for ingestion time
queryRaw = constructQueryWithOrderedColumnsWithBQIngestionTime(queryRaw, columnNames)
// convert time 2024-11-04T00:00:00Z to 2024-11-04 00:00:00
start, err := time.Parse(time.RFC3339, dstart)
if err != nil {
return errors.WithStack(err)
}
dstart = start.Format(time.DateTime)
// construct query with ordered columns and BQ pseudo columns for ingestion time (based on dstart)
queryRaw = constructQueryWithOrderedColumnsWithBQIngestionTime(queryRaw, columnNames, dstart)
}

if c.enablePartitionValue && !c.enableAutoPartition {
Expand Down Expand Up @@ -112,16 +119,17 @@ func addPartitionValueColumn(rawQuery []byte) []byte {
}

// constructQueryWithOrderedColumnsWithBQIngestionTime constructs query with ordered columns and BQ pseudo columns for ingestion time
// based on dstart.
// ref: https://cloud.google.com/bigquery/docs/querying-partitioned-tables#query_an_ingestion-time_partitioned_table
func constructQueryWithOrderedColumnsWithBQIngestionTime(query []byte, orderedColumns []string) []byte {
func constructQueryWithOrderedColumnsWithBQIngestionTime(query []byte, orderedColumns []string, dstart string) []byte {
var orderedColumnsWithBQIngestionTime []string
for _, col := range orderedColumns {
val := col
switch col {
case "_partitiontime":
val = "CURRENT_TIMESTAMP() as _partitiontime"
val = fmt.Sprintf("TIMESTAMP('%s') as _partitiontime", dstart)
case "_partitiondate":
val = "CURRENT_DATE() as _partitiondate"
val = fmt.Sprintf("DATE(TIMESTAMP('%s')) as _partitiondate", dstart)
}
orderedColumnsWithBQIngestionTime = append(orderedColumnsWithBQIngestionTime, val)
}
Expand Down
12 changes: 6 additions & 6 deletions mc2mc/internal/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestExecute(t *testing.T) {
require.NoError(t, err)
client.OdpsClient = &mockOdpsClient{}
// act
err = client.Execute(context.TODO(), "", "./nonexistentfile")
err = client.Execute(context.TODO(), "", "./nonexistentfile", "2024-11-04T00:00:00Z")
// assert
assert.Error(t, err)
})
Expand All @@ -34,7 +34,7 @@ func TestExecute(t *testing.T) {
}
assert.NoError(t, os.WriteFile("/tmp/query.sql", []byte("SELECT * FROM table;"), 0644))
// act
err = client.Execute(context.TODO(), "project_test.table_test", "/tmp/query.sql")
err = client.Execute(context.TODO(), "project_test.table_test", "/tmp/query.sql", "2024-11-04T00:00:00Z")
// assert
assert.Error(t, err)
assert.ErrorContains(t, err, "error get ordered columns")
Expand All @@ -53,7 +53,7 @@ func TestExecute(t *testing.T) {
}
assert.NoError(t, os.WriteFile("/tmp/query.sql", []byte("SELECT * FROM table;"), 0644))
// act
err = client.Execute(context.TODO(), "project_test.table_test", "/tmp/query.sql")
err = client.Execute(context.TODO(), "project_test.table_test", "/tmp/query.sql", "2024-11-04T00:00:00Z")
// assert
assert.Error(t, err)
assert.ErrorContains(t, err, "error get partition name")
Expand All @@ -75,7 +75,7 @@ func TestExecute(t *testing.T) {
}
require.NoError(t, os.WriteFile("/tmp/query.sql", []byte("SELECT * FROM table;"), 0644))
// act
err = client.Execute(context.TODO(), "project_test.table_test", "/tmp/query.sql")
err = client.Execute(context.TODO(), "project_test.table_test", "/tmp/query.sql", "2024-11-04T00:00:00Z")
// assert
assert.Error(t, err)
assert.ErrorContains(t, err, "error exec sql")
Expand Down Expand Up @@ -106,7 +106,7 @@ func TestExecute(t *testing.T) {
}
require.NoError(t, os.WriteFile("/tmp/query.sql", []byte("SELECT * FROM table;"), 0644))
// act
err = client.Execute(context.TODO(), "project_test.table_test", "/tmp/query.sql")
err = client.Execute(context.TODO(), "project_test.table_test", "/tmp/query.sql", "2024-11-04T00:00:00Z")
// assert
assert.NoError(t, err)
})
Expand Down Expand Up @@ -136,7 +136,7 @@ func TestExecute(t *testing.T) {
}
require.NoError(t, os.WriteFile("/tmp/query.sql", []byte("SELECT * FROM table;"), 0644))
// act
err = client.Execute(context.TODO(), "project_test.table_test", "/tmp/query.sql")
err = client.Execute(context.TODO(), "project_test.table_test", "/tmp/query.sql", "2024-11-04T00:00:00Z")
// assert
assert.NoError(t, err)
})
Expand Down
2 changes: 2 additions & 0 deletions mc2mc/internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type Config struct {
OtelCollectorGRPCEndpoint string
JobName string
ScheduledTime string
DStart string
// TODO: remove this temporary support after 15 nov 2024
DevEnablePartitionValue bool
DevEnableAutoPartition bool
Expand All @@ -40,6 +41,7 @@ func NewConfig() (*Config, error) {
OtelCollectorGRPCEndpoint: getEnv("OTEL_COLLECTOR_GRPC_ENDPOINT", ""),
JobName: getJobName(),
ScheduledTime: getEnv("SCHEDULED_TIME", ""),
DStart: getEnv("DSTART", ""),
// TODO: delete this after 15 nov
DevEnablePartitionValue: getEnv("DEV__ENABLE_PARTITION_VALUE", "false") == "true",
DevEnableAutoPartition: getEnv("DEV__ENABLE_AUTO_PARTITION", "false") == "true",
Expand Down
2 changes: 1 addition & 1 deletion mc2mc/mc2mc.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func mc2mc() error {
defer client.Close()

// execute query
err = client.Execute(ctx, cfg.DestinationTableID, cfg.QueryFilePath)
err = client.Execute(ctx, cfg.DestinationTableID, cfg.QueryFilePath, cfg.DStart)
if err != nil {
return errors.WithStack(err)
}
Expand Down
Loading