Skip to content

Commit

Permalink
feat: automatically convert date/datetime/timestamps (#422)
Browse files Browse the repository at this point in the history
  • Loading branch information
alvarowolfx authored Mar 20, 2024
1 parent d872a79 commit cbc7e94
Show file tree
Hide file tree
Showing 8 changed files with 230 additions and 33 deletions.
6 changes: 4 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
"precompile": "gts clean"
},
"dependencies": {
"extend": "^3.0.2",
"google-gax": "^4.3.1",
"google-auth-library": "^9.6.3"
},
Expand All @@ -35,10 +36,11 @@
},
"devDependencies": {
"@google-cloud/bigquery": "^7.0.0",
"@types/uuid": "^9.0.1",
"@types/extend": "^3.0.4",
"@types/mocha": "^9.0.0",
"@types/node": "^20.0.0",
"@types/sinon": "^17.0.0",
"@types/uuid": "^9.0.1",
"c8": "^9.0.0",
"gapic-tools": "^0.4.0",
"gts": "^5.0.0",
Expand All @@ -51,8 +53,8 @@
"pack-n-play": "^2.0.0",
"sinon": "^17.0.0",
"ts-loader": "^9.0.0",
"uuid": "^9.0.0",
"typescript": "^5.1.6",
"uuid": "^9.0.0",
"webpack": "^5.0.0",
"webpack-cli": "^5.0.0"
},
Expand Down
7 changes: 4 additions & 3 deletions samples/append_rows_proto2.js
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,10 @@ function main(
serializedRows = [];

// Row 7
const days = new Date('2019-02-07').getTime() / (1000 * 60 * 60 * 24);
row = {
rowNum: 7,
dateCol: 1132896,
dateCol: days, // The value is the number of days since the Unix epoch (1970-01-01)
};
serializedRows.push(SampleData.encode(row).finish());

Expand Down Expand Up @@ -172,10 +173,10 @@ function main(
serializedRows.push(SampleData.encode(row).finish());

// Row 12
const timestamp = 1641700186564;
const timestamp = new Date('2022-01-09T03:49:46.564Z').getTime();
row = {
rowNum: 12,
timestampCol: timestamp,
timestampCol: timestamp * 1000, // The value is given in microseconds since the Unix epoch (1970-01-01)
};
serializedRows.push(SampleData.encode(row).finish());

Expand Down
7 changes: 3 additions & 4 deletions samples/append_rows_table_to_proto2.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ function main(
console.log(`Stream created: ${streamId}`);

const writer = new JSONWriter({
streamId,
connection,
protoDescriptor,
});
Expand Down Expand Up @@ -131,14 +130,14 @@ function main(
// Row 7
row = {
row_num: 7,
date_col: 1132896,
date_col: new Date('2019-02-07'),
};
rows.push(row);

// Row 8
row = {
row_num: 8,
datetime_col: bigquery.datetime('2019-02-17 11:24:00.000').value, // BigQuery civil time
datetime_col: new Date('2019-02-17T11:24:00.000Z'),
};
rows.push(row);

Expand Down Expand Up @@ -167,7 +166,7 @@ function main(
// Row 12
row = {
row_num: 12,
timestamp_col: 1641700186564,
timestamp_col: new Date('2022-01-09T03:49:46.564Z'),
};
rows.push(row);

Expand Down
4 changes: 2 additions & 2 deletions samples/test/writeClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ describe('writeClient', () => {
assert.deepInclude(rows, [{float64_col: 987.6539916992188}, {row_num: 4}]);
assert.deepInclude(rows, [{int64_col: 321}, {row_num: 5}]);
assert.deepInclude(rows, [{string_col: 'octavia'}, {row_num: 6}]);
assert.deepInclude(rows, [{date_col: '5071-10-07'}, {row_num: 7}]);
assert.deepInclude(rows, [{date_col: '2019-02-07'}, {row_num: 7}]);
assert.deepInclude(rows, [
{datetime_col: '2019-02-17T11:24:00'},
{row_num: 8},
Expand All @@ -340,7 +340,7 @@ describe('writeClient', () => {
]);
assert.deepInclude(rows, [{time_col: '18:00:00'}, {row_num: 11}]);
assert.deepInclude(rows, [
{timestamp_col: '1970-01-20T00:01:40.186564000Z'},
{timestamp_col: '2022-01-09T03:49:46.564Z'},
{row_num: 12},
]);
assert.deepInclude(rows, [{int64_list: [1999, 2001]}, {row_num: 13}]);
Expand Down
133 changes: 133 additions & 0 deletions src/managedwriter/encoder.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

import * as protobuf from 'protobufjs';
import * as protos from '../../protos/protos';
import {normalizeDescriptor} from '../adapt/proto';
import * as extend from 'extend';

type IDescriptorProto = protos.google.protobuf.IDescriptorProto;
type DescriptorProto = protos.google.protobuf.DescriptorProto;

const DescriptorProto = protos.google.protobuf.DescriptorProto;
const {Type} = protobuf;

/**
* Internal class used by the JSONWriter to convert JSON data to protobuf messages.
* It can be configure to do some data conversion to match what BigQuery expects.
*
* @class
* @memberof managedwriter
*/
export class JSONEncoder {
private _type: protobuf.Type = Type.fromJSON('root', {
fields: {},
});

/**
* Creates a new JSONEncoder instance.
*
* @param {Object} params - The parameters for the JSONEncoder.
* @param {IDescriptorProto} params.protoDescriptor - The proto descriptor
* for the JSON rows.
*/
constructor(params: {protoDescriptor: IDescriptorProto}) {
const {protoDescriptor} = params;
this.setProtoDescriptor(protoDescriptor);
}

/**
* Update the proto descriptor for the Encoder.
*
* @param {IDescriptorProto} protoDescriptor - The proto descriptor.
*/
setProtoDescriptor(protoDescriptor: IDescriptorProto): void {
const normalized = normalizeDescriptor(
new DescriptorProto(protoDescriptor)
);
this._type = Type.fromDescriptor(normalized);
}

/**
* Writes a JSONList that contains objects to be written to the BigQuery table by first converting
* the JSON data to protobuf messages, then using Writer's appendRows() to write the data at current end
* of stream. If there is a schema update, the current Writer is closed and reopened with the updated schema.
*
* @param {JSONList} rows - The list of JSON rows.
* @returns {Uint8Array[]} The encoded rows.
*/
encodeRows(rows: any[]): Uint8Array[] {

Check warning on line 70 in src/managedwriter/encoder.ts

View workflow job for this annotation

GitHub Actions / lint

Unexpected any. Specify a different type
const serializedRows = rows
.map(r => {
return this.convertRow(r);
})
.map(r => {
return this.encodeRow(r);
});
return serializedRows;
}

private isPlainObject(value: any): boolean {

Check warning on line 81 in src/managedwriter/encoder.ts

View workflow job for this annotation

GitHub Actions / lint

Unexpected any. Specify a different type
return value && [undefined, Object].includes(value.constructor);
}

private encodeRow(row: any): Uint8Array {

Check warning on line 85 in src/managedwriter/encoder.ts

View workflow job for this annotation

GitHub Actions / lint

Unexpected any. Specify a different type
const msg = this._type.create(row);
return this._type.encode(msg).finish();
}

private convertRow(source: any): Object {

Check warning on line 90 in src/managedwriter/encoder.ts

View workflow job for this annotation

GitHub Actions / lint

Unexpected any. Specify a different type
const row = extend(true, {}, source);
for (const key in row) {
const value = row[key];
if (value === null) {
continue;
}
if (value instanceof Date) {
const pfield = this._type.fields[key];
if (!pfield) {
continue;
}
switch (pfield.type) {
case 'int32': // DATE
// The value is the number of days since the Unix epoch (1970-01-01)
row[key] = value.getTime() / (1000 * 60 * 60 * 24);
break;
case 'int64': // TIMESTAMP
// The value is given in microseconds since the Unix epoch (1970-01-01)
row[key] = value.getTime() * 1000;
break;
case 'string': // DATETIME
row[key] = value.toJSON().replace(/^(.*)T(.*)Z$/, '$1 $2');
break;
}
continue;
}
if (Array.isArray(value)) {
row[key] = value.map(v => {
if (!this.isPlainObject(v)) {
return v;
}
return this.convertRow(v);
});
continue;
}
if (this.isPlainObject(value)) {
row[key] = this.convertRow(value);
continue;
}
}
return row;
}
}
2 changes: 1 addition & 1 deletion src/managedwriter/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
* More information about this new write client may also be found in
* the public documentation: https://cloud.google.com/bigquery/docs/write-api
*
* It is EXPERIMENTAL and subject to change or removal without notice. This is primarily to signal that this
* It is EXPERIMENTAL and subject to change or removal without notice. This is primarily to signal that this
* package may still make breaking changes to existing methods and functionality.
*
* @namespace managedwriter
Expand Down
33 changes: 12 additions & 21 deletions src/managedwriter/json_writer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,21 @@
// See the License for the specific language governing permissions and
// limitations under the License.

import * as protobuf from 'protobufjs';
import * as protos from '../../protos/protos';
import {PendingWrite} from './pending_write';
import {StreamConnection, RemoveListener} from './stream_connection';
import * as adapt from '../adapt';
import {Writer} from './writer';
import {JSONEncoder} from './encoder';

type TableSchema = protos.google.cloud.bigquery.storage.v1.ITableSchema;
type IInt64Value = protos.google.protobuf.IInt64Value;
type IDescriptorProto = protos.google.protobuf.IDescriptorProto;
type DescriptorProto = protos.google.protobuf.DescriptorProto;
type JSONPrimitive = string | number | boolean | null;
type JSONValue = JSONPrimitive | JSONObject | JSONArray;
type JSONObject = {[member: string]: JSONValue};
type JSONArray = Array<JSONValue>;
type JSONList = Array<JSONObject>;

const DescriptorProto = protos.google.protobuf.DescriptorProto;
const {Type} = protobuf;
export type JSONPrimitive = string | number | boolean | Date | null;
export type JSONValue = JSONPrimitive | JSONObject | JSONArray;
export type JSONObject = {[member: string]: JSONValue};
export type JSONArray = Array<JSONValue>;
export type JSONList = Array<JSONObject>;

/**
* A StreamWriter that can write JSON data to BigQuery tables. The JSONWriter is
Expand All @@ -47,9 +43,7 @@ const {Type} = protobuf;
*/
export class JSONWriter {
private _writer: Writer;
private _type: protobuf.Type = Type.fromJSON('root', {
fields: {},
});
private _encoder: JSONEncoder;
private _schemaListener: RemoveListener;

/**
Expand All @@ -67,6 +61,9 @@ export class JSONWriter {
}) {
const {connection, protoDescriptor} = params;
this._writer = new Writer(params);
this._encoder = new JSONEncoder({
protoDescriptor: params.protoDescriptor,
});
this._schemaListener = connection.onSchemaUpdated(this.onSchemaUpdated);
this.setProtoDescriptor(protoDescriptor);
}
Expand All @@ -87,11 +84,8 @@ export class JSONWriter {
* @param {IDescriptorProto} protoDescriptor - The proto descriptor.
*/
setProtoDescriptor(protoDescriptor: IDescriptorProto): void {
const normalized = adapt.normalizeDescriptor(
new DescriptorProto(protoDescriptor)
);
this._type = Type.fromDescriptor(normalized);
this._writer.setProtoDescriptor(protoDescriptor);
this._encoder.setProtoDescriptor(protoDescriptor);
}

/**
Expand All @@ -104,10 +98,7 @@ export class JSONWriter {
* @returns {managedwriter.PendingWrite} The pending write.
*/
appendRows(rows: JSONList, offsetValue?: IInt64Value['value']): PendingWrite {
const serializedRows = rows.map(r => {
const msg = this._type.create(r);
return this._type.encode(msg).finish();
});
const serializedRows = this._encoder.encodeRows(rows);
const pw = this._writer.appendRows(
{
serializedRows,
Expand Down
Loading

0 comments on commit cbc7e94

Please sign in to comment.