diff --git a/mc2mc/internal/client/client.go b/mc2mc/internal/client/client.go index 25a70ff..3c9c845 100644 --- a/mc2mc/internal/client/client.go +++ b/mc2mc/internal/client/client.go @@ -73,8 +73,8 @@ func (c *Client) Execute(ctx context.Context, tableID, queryFilePath string) err if err != nil { return errors.WithStack(err) } - // construct query with ordered columns - queryRaw = constructQueryWithOrderedColumns(queryRaw, columnNames) + // construct query with ordered columns and BQ pseudo columns for ingestion time + queryRaw = constructQueryWithOrderedColumnsWithBQIngestionTime(queryRaw, columnNames) } if c.enablePartitionValue && !c.enableAutoPartition { @@ -111,6 +111,23 @@ func addPartitionValueColumn(rawQuery []byte) []byte { return []byte(fmt.Sprintf("%s SELECT *, STRING(CURRENT_DATE()) as __partitionvalue FROM (%s)", header, qr)) } +// constructQueryWithOrderedColumnsWithBQIngestionTime constructs query with ordered columns and BQ pseudo columns for ingestion time +// ref: https://cloud.google.com/bigquery/docs/querying-partitioned-tables#query_an_ingestion-time_partitioned_table +func constructQueryWithOrderedColumnsWithBQIngestionTime(query []byte, orderedColumns []string) []byte { + var orderedColumnsWithBQIngestionTime []string + for _, col := range orderedColumns { + val := col + switch col { + case "_partitiontime": + val = "CURRENT_TIMESTAMP() as _partitiontime" + case "_partitiondate": + val = "CURRENT_DATE() as _partitiondate" + } + orderedColumnsWithBQIngestionTime = append(orderedColumnsWithBQIngestionTime, val) + } + return constructQueryWithOrderedColumns(query, orderedColumnsWithBQIngestionTime) +} + func constructQueryWithOrderedColumns(query []byte, orderedColumns []string) []byte { header, qr := loader.SeparateHeadersAndQuery(string(query)) return []byte(fmt.Sprintf("%s %s", header, loader.ConstructQueryWithOrderedColumns(qr, orderedColumns)))