diff --git a/mc2mc/internal/client/client.go b/mc2mc/internal/client/client.go index 3c9c845..4735929 100644 --- a/mc2mc/internal/client/client.go +++ b/mc2mc/internal/client/client.go @@ -7,6 +7,7 @@ import ( "log/slog" "os" "strings" + "time" "github.com/pkg/errors" @@ -59,7 +60,7 @@ func (c *Client) Close() error { return errors.WithStack(err) } -func (c *Client) Execute(ctx context.Context, tableID, queryFilePath string) error { +func (c *Client) Execute(ctx context.Context, tableID, queryFilePath string, dstart string) error { // read query from filepath c.logger.Info(fmt.Sprintf("executing query from %s", queryFilePath)) queryRaw, err := os.ReadFile(queryFilePath) @@ -73,8 +74,14 @@ func (c *Client) Execute(ctx context.Context, tableID, queryFilePath string) err if err != nil { return errors.WithStack(err) } - // construct query with ordered columns and BQ pseudo columns for ingestion time - queryRaw = constructQueryWithOrderedColumnsWithBQIngestionTime(queryRaw, columnNames) + // convert time 2024-11-04T00:00:00Z to 2024-11-04 00:00:00 + start, err := time.Parse(time.RFC3339, dstart) + if err != nil { + return errors.WithStack(err) + } + dstart = start.Format(time.DateTime) + // construct query with ordered columns and BQ pseudo columns for ingestion time (based on dstart) + queryRaw = constructQueryWithOrderedColumnsWithBQIngestionTime(queryRaw, columnNames, dstart) } if c.enablePartitionValue && !c.enableAutoPartition { @@ -112,16 +119,17 @@ func addPartitionValueColumn(rawQuery []byte) []byte { } // constructQueryWithOrderedColumnsWithBQIngestionTime constructs query with ordered columns and BQ pseudo columns for ingestion time +// based on dstart. // ref: https://cloud.google.com/bigquery/docs/querying-partitioned-tables#query_an_ingestion-time_partitioned_table -func constructQueryWithOrderedColumnsWithBQIngestionTime(query []byte, orderedColumns []string) []byte { +func constructQueryWithOrderedColumnsWithBQIngestionTime(query []byte, orderedColumns []string, dstart string) []byte { var orderedColumnsWithBQIngestionTime []string for _, col := range orderedColumns { val := col switch col { case "_partitiontime": - val = "CURRENT_TIMESTAMP() as _partitiontime" + val = fmt.Sprintf("TIMESTAMP('%s') as _partitiontime", dstart) case "_partitiondate": - val = "CURRENT_DATE() as _partitiondate" + val = fmt.Sprintf("DATE(TIMESTAMP('%s')) as _partitiondate", dstart) } orderedColumnsWithBQIngestionTime = append(orderedColumnsWithBQIngestionTime, val) } diff --git a/mc2mc/internal/client/client_test.go b/mc2mc/internal/client/client_test.go index de64f99..43deb90 100644 --- a/mc2mc/internal/client/client_test.go +++ b/mc2mc/internal/client/client_test.go @@ -19,7 +19,7 @@ func TestExecute(t *testing.T) { require.NoError(t, err) client.OdpsClient = &mockOdpsClient{} // act - err = client.Execute(context.TODO(), "", "./nonexistentfile") + err = client.Execute(context.TODO(), "", "./nonexistentfile", "2024-11-04T00:00:00Z") // assert assert.Error(t, err) }) @@ -34,7 +34,7 @@ func TestExecute(t *testing.T) { } assert.NoError(t, os.WriteFile("/tmp/query.sql", []byte("SELECT * FROM table;"), 0644)) // act - err = client.Execute(context.TODO(), "project_test.table_test", "/tmp/query.sql") + err = client.Execute(context.TODO(), "project_test.table_test", "/tmp/query.sql", "2024-11-04T00:00:00Z") // assert assert.Error(t, err) assert.ErrorContains(t, err, "error get ordered columns") @@ -53,7 +53,7 @@ func TestExecute(t *testing.T) { } assert.NoError(t, os.WriteFile("/tmp/query.sql", []byte("SELECT * FROM table;"), 0644)) // act - err = client.Execute(context.TODO(), "project_test.table_test", "/tmp/query.sql") + err = client.Execute(context.TODO(), "project_test.table_test", "/tmp/query.sql", "2024-11-04T00:00:00Z") // assert assert.Error(t, err) assert.ErrorContains(t, err, "error get partition name") @@ -75,7 +75,7 @@ func TestExecute(t *testing.T) { } require.NoError(t, os.WriteFile("/tmp/query.sql", []byte("SELECT * FROM table;"), 0644)) // act - err = client.Execute(context.TODO(), "project_test.table_test", "/tmp/query.sql") + err = client.Execute(context.TODO(), "project_test.table_test", "/tmp/query.sql", "2024-11-04T00:00:00Z") // assert assert.Error(t, err) assert.ErrorContains(t, err, "error exec sql") @@ -106,7 +106,7 @@ func TestExecute(t *testing.T) { } require.NoError(t, os.WriteFile("/tmp/query.sql", []byte("SELECT * FROM table;"), 0644)) // act - err = client.Execute(context.TODO(), "project_test.table_test", "/tmp/query.sql") + err = client.Execute(context.TODO(), "project_test.table_test", "/tmp/query.sql", "2024-11-04T00:00:00Z") // assert assert.NoError(t, err) }) @@ -136,7 +136,7 @@ func TestExecute(t *testing.T) { } require.NoError(t, os.WriteFile("/tmp/query.sql", []byte("SELECT * FROM table;"), 0644)) // act - err = client.Execute(context.TODO(), "project_test.table_test", "/tmp/query.sql") + err = client.Execute(context.TODO(), "project_test.table_test", "/tmp/query.sql", "2024-11-04T00:00:00Z") // assert assert.NoError(t, err) }) diff --git a/mc2mc/internal/config/config.go b/mc2mc/internal/config/config.go index e24ee29..71950c1 100644 --- a/mc2mc/internal/config/config.go +++ b/mc2mc/internal/config/config.go @@ -16,6 +16,7 @@ type Config struct { OtelCollectorGRPCEndpoint string JobName string ScheduledTime string + DStart string // TODO: remove this temporary support after 15 nov 2024 DevEnablePartitionValue bool DevEnableAutoPartition bool @@ -40,6 +41,7 @@ func NewConfig() (*Config, error) { OtelCollectorGRPCEndpoint: getEnv("OTEL_COLLECTOR_GRPC_ENDPOINT", ""), JobName: getJobName(), ScheduledTime: getEnv("SCHEDULED_TIME", ""), + DStart: getEnv("DSTART", ""), // TODO: delete this after 15 nov DevEnablePartitionValue: getEnv("DEV__ENABLE_PARTITION_VALUE", "false") == "true", DevEnableAutoPartition: getEnv("DEV__ENABLE_AUTO_PARTITION", "false") == "true", diff --git a/mc2mc/mc2mc.go b/mc2mc/mc2mc.go index d890b92..76de214 100644 --- a/mc2mc/mc2mc.go +++ b/mc2mc/mc2mc.go @@ -39,7 +39,7 @@ func mc2mc() error { defer client.Close() // execute query - err = client.Execute(ctx, cfg.DestinationTableID, cfg.QueryFilePath) + err = client.Execute(ctx, cfg.DestinationTableID, cfg.QueryFilePath, cfg.DStart) if err != nil { return errors.WithStack(err) }