diff --git a/package.json b/package.json index d52fd9f2..a3bd859b 100644 --- a/package.json +++ b/package.json @@ -27,6 +27,7 @@ "precompile": "gts clean" }, "dependencies": { + "extend": "^3.0.2", "google-gax": "^4.3.1", "google-auth-library": "^9.6.3" }, @@ -35,12 +36,13 @@ }, "devDependencies": { "@google-cloud/bigquery": "^7.0.0", - "@types/uuid": "^9.0.1", + "@types/extend": "^3.0.4", "@types/mocha": "^9.0.0", "@types/node": "^20.0.0", "@types/sinon": "^17.0.0", + "@types/uuid": "^9.0.1", "c8": "^9.0.0", - "gapic-tools": "^0.3.0", + "gapic-tools": "^0.4.0", "gts": "^5.0.0", "jsdoc": "^4.0.0", "jsdoc-fresh": "^3.0.0", @@ -51,8 +53,8 @@ "pack-n-play": "^2.0.0", "sinon": "^17.0.0", "ts-loader": "^9.0.0", - "uuid": "^9.0.0", "typescript": "^5.1.6", + "uuid": "^9.0.0", "webpack": "^5.0.0", "webpack-cli": "^5.0.0" }, diff --git a/samples/append_rows_proto2.js b/samples/append_rows_proto2.js index 8b7bd9cf..7ceca8cd 100644 --- a/samples/append_rows_proto2.js +++ b/samples/append_rows_proto2.js @@ -136,9 +136,10 @@ function main( serializedRows = []; // Row 7 + const days = new Date('2019-02-07').getTime() / (1000 * 60 * 60 * 24); row = { rowNum: 7, - dateCol: 1132896, + dateCol: days, // The value is the number of days since the Unix epoch (1970-01-01) }; serializedRows.push(SampleData.encode(row).finish()); @@ -172,10 +173,10 @@ function main( serializedRows.push(SampleData.encode(row).finish()); // Row 12 - const timestamp = 1641700186564; + const timestamp = new Date('2022-01-09T03:49:46.564Z').getTime(); row = { rowNum: 12, - timestampCol: timestamp, + timestampCol: timestamp * 1000, // The value is given in microseconds since the Unix epoch (1970-01-01) }; serializedRows.push(SampleData.encode(row).finish()); diff --git a/samples/append_rows_table_to_proto2.js b/samples/append_rows_table_to_proto2.js index 24afd3ab..ad9eff03 100644 --- a/samples/append_rows_table_to_proto2.js +++ b/samples/append_rows_table_to_proto2.js @@ -58,7 +58,6 @@ function main( console.log(`Stream created: ${streamId}`); const writer = new JSONWriter({ - streamId, connection, protoDescriptor, }); @@ -131,14 +130,14 @@ function main( // Row 7 row = { row_num: 7, - date_col: 1132896, + date_col: new Date('2019-02-07'), }; rows.push(row); // Row 8 row = { row_num: 8, - datetime_col: bigquery.datetime('2019-02-17 11:24:00.000').value, // BigQuery civil time + datetime_col: new Date('2019-02-17T11:24:00.000Z'), }; rows.push(row); @@ -167,7 +166,7 @@ function main( // Row 12 row = { row_num: 12, - timestamp_col: 1641700186564, + timestamp_col: new Date('2022-01-09T03:49:46.564Z'), }; rows.push(row); diff --git a/samples/test/writeClient.js b/samples/test/writeClient.js index fc238eea..d1aba2d9 100644 --- a/samples/test/writeClient.js +++ b/samples/test/writeClient.js @@ -327,7 +327,7 @@ describe('writeClient', () => { assert.deepInclude(rows, [{float64_col: 987.6539916992188}, {row_num: 4}]); assert.deepInclude(rows, [{int64_col: 321}, {row_num: 5}]); assert.deepInclude(rows, [{string_col: 'octavia'}, {row_num: 6}]); - assert.deepInclude(rows, [{date_col: '5071-10-07'}, {row_num: 7}]); + assert.deepInclude(rows, [{date_col: '2019-02-07'}, {row_num: 7}]); assert.deepInclude(rows, [ {datetime_col: '2019-02-17T11:24:00'}, {row_num: 8}, @@ -340,7 +340,7 @@ describe('writeClient', () => { ]); assert.deepInclude(rows, [{time_col: '18:00:00'}, {row_num: 11}]); assert.deepInclude(rows, [ - {timestamp_col: '1970-01-20T00:01:40.186564000Z'}, + {timestamp_col: '2022-01-09T03:49:46.564Z'}, {row_num: 12}, ]); assert.deepInclude(rows, [{int64_list: [1999, 2001]}, {row_num: 13}]); diff --git a/src/managedwriter/encoder.ts b/src/managedwriter/encoder.ts new file mode 100644 index 00000000..e54b8139 --- /dev/null +++ b/src/managedwriter/encoder.ts @@ -0,0 +1,133 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import * as protobuf from 'protobufjs'; +import * as protos from '../../protos/protos'; +import {normalizeDescriptor} from '../adapt/proto'; +import * as extend from 'extend'; + +type IDescriptorProto = protos.google.protobuf.IDescriptorProto; +type DescriptorProto = protos.google.protobuf.DescriptorProto; + +const DescriptorProto = protos.google.protobuf.DescriptorProto; +const {Type} = protobuf; + +/** + * Internal class used by the JSONWriter to convert JSON data to protobuf messages. + * It can be configure to do some data conversion to match what BigQuery expects. + * + * @class + * @memberof managedwriter + */ +export class JSONEncoder { + private _type: protobuf.Type = Type.fromJSON('root', { + fields: {}, + }); + + /** + * Creates a new JSONEncoder instance. + * + * @param {Object} params - The parameters for the JSONEncoder. + * @param {IDescriptorProto} params.protoDescriptor - The proto descriptor + * for the JSON rows. + */ + constructor(params: {protoDescriptor: IDescriptorProto}) { + const {protoDescriptor} = params; + this.setProtoDescriptor(protoDescriptor); + } + + /** + * Update the proto descriptor for the Encoder. + * + * @param {IDescriptorProto} protoDescriptor - The proto descriptor. + */ + setProtoDescriptor(protoDescriptor: IDescriptorProto): void { + const normalized = normalizeDescriptor( + new DescriptorProto(protoDescriptor) + ); + this._type = Type.fromDescriptor(normalized); + } + + /** + * Writes a JSONList that contains objects to be written to the BigQuery table by first converting + * the JSON data to protobuf messages, then using Writer's appendRows() to write the data at current end + * of stream. If there is a schema update, the current Writer is closed and reopened with the updated schema. + * + * @param {JSONList} rows - The list of JSON rows. + * @returns {Uint8Array[]} The encoded rows. + */ + encodeRows(rows: any[]): Uint8Array[] { + const serializedRows = rows + .map(r => { + return this.convertRow(r); + }) + .map(r => { + return this.encodeRow(r); + }); + return serializedRows; + } + + private isPlainObject(value: any): boolean { + return value && [undefined, Object].includes(value.constructor); + } + + private encodeRow(row: any): Uint8Array { + const msg = this._type.create(row); + return this._type.encode(msg).finish(); + } + + private convertRow(source: any): Object { + const row = extend(true, {}, source); + for (const key in row) { + const value = row[key]; + if (value === null) { + continue; + } + if (value instanceof Date) { + const pfield = this._type.fields[key]; + if (!pfield) { + continue; + } + switch (pfield.type) { + case 'int32': // DATE + // The value is the number of days since the Unix epoch (1970-01-01) + row[key] = value.getTime() / (1000 * 60 * 60 * 24); + break; + case 'int64': // TIMESTAMP + // The value is given in microseconds since the Unix epoch (1970-01-01) + row[key] = value.getTime() * 1000; + break; + case 'string': // DATETIME + row[key] = value.toJSON().replace(/^(.*)T(.*)Z$/, '$1 $2'); + break; + } + continue; + } + if (Array.isArray(value)) { + row[key] = value.map(v => { + if (!this.isPlainObject(v)) { + return v; + } + return this.convertRow(v); + }); + continue; + } + if (this.isPlainObject(value)) { + row[key] = this.convertRow(value); + continue; + } + } + return row; + } +} diff --git a/src/managedwriter/index.ts b/src/managedwriter/index.ts index ff6d0523..0ce72a8e 100644 --- a/src/managedwriter/index.ts +++ b/src/managedwriter/index.ts @@ -18,7 +18,7 @@ * More information about this new write client may also be found in * the public documentation: https://cloud.google.com/bigquery/docs/write-api * - * It is EXPERIMENTAL and subject to change or removal without notice. This is primarily to signal that this + * It is EXPERIMENTAL and subject to change or removal without notice. This is primarily to signal that this * package may still make breaking changes to existing methods and functionality. * * @namespace managedwriter diff --git a/src/managedwriter/json_writer.ts b/src/managedwriter/json_writer.ts index 0afe5755..e4fdb28d 100644 --- a/src/managedwriter/json_writer.ts +++ b/src/managedwriter/json_writer.ts @@ -12,25 +12,21 @@ // See the License for the specific language governing permissions and // limitations under the License. -import * as protobuf from 'protobufjs'; import * as protos from '../../protos/protos'; import {PendingWrite} from './pending_write'; import {StreamConnection, RemoveListener} from './stream_connection'; import * as adapt from '../adapt'; import {Writer} from './writer'; +import {JSONEncoder} from './encoder'; type TableSchema = protos.google.cloud.bigquery.storage.v1.ITableSchema; type IInt64Value = protos.google.protobuf.IInt64Value; type IDescriptorProto = protos.google.protobuf.IDescriptorProto; -type DescriptorProto = protos.google.protobuf.DescriptorProto; -type JSONPrimitive = string | number | boolean | null; -type JSONValue = JSONPrimitive | JSONObject | JSONArray; -type JSONObject = {[member: string]: JSONValue}; -type JSONArray = Array; -type JSONList = Array; - -const DescriptorProto = protos.google.protobuf.DescriptorProto; -const {Type} = protobuf; +export type JSONPrimitive = string | number | boolean | Date | null; +export type JSONValue = JSONPrimitive | JSONObject | JSONArray; +export type JSONObject = {[member: string]: JSONValue}; +export type JSONArray = Array; +export type JSONList = Array; /** * A StreamWriter that can write JSON data to BigQuery tables. The JSONWriter is @@ -47,9 +43,7 @@ const {Type} = protobuf; */ export class JSONWriter { private _writer: Writer; - private _type: protobuf.Type = Type.fromJSON('root', { - fields: {}, - }); + private _encoder: JSONEncoder; private _schemaListener: RemoveListener; /** @@ -67,6 +61,9 @@ export class JSONWriter { }) { const {connection, protoDescriptor} = params; this._writer = new Writer(params); + this._encoder = new JSONEncoder({ + protoDescriptor: params.protoDescriptor, + }); this._schemaListener = connection.onSchemaUpdated(this.onSchemaUpdated); this.setProtoDescriptor(protoDescriptor); } @@ -87,11 +84,8 @@ export class JSONWriter { * @param {IDescriptorProto} protoDescriptor - The proto descriptor. */ setProtoDescriptor(protoDescriptor: IDescriptorProto): void { - const normalized = adapt.normalizeDescriptor( - new DescriptorProto(protoDescriptor) - ); - this._type = Type.fromDescriptor(normalized); this._writer.setProtoDescriptor(protoDescriptor); + this._encoder.setProtoDescriptor(protoDescriptor); } /** @@ -104,10 +98,7 @@ export class JSONWriter { * @returns {managedwriter.PendingWrite} The pending write. */ appendRows(rows: JSONList, offsetValue?: IInt64Value['value']): PendingWrite { - const serializedRows = rows.map(r => { - const msg = this._type.create(r); - return this._type.encode(msg).finish(); - }); + const serializedRows = this._encoder.encodeRows(rows); const pw = this._writer.appendRows( { serializedRows, diff --git a/src/managedwriter/stream_connection.ts b/src/managedwriter/stream_connection.ts index 995fa7d6..f0c91d7a 100644 --- a/src/managedwriter/stream_connection.ts +++ b/src/managedwriter/stream_connection.ts @@ -133,25 +133,14 @@ export class StreamConnection extends EventEmitter { }; private shouldReconnect(err: gax.GoogleError): boolean { - if ( - err.code && - [gax.Status.UNAVAILABLE, gax.Status.RESOURCE_EXHAUSTED].includes( - err.code - ) && - err.message - ) { - const detail = err.message.toLowerCase(); - const knownErrors = [ - 'service is currently unavailable', // schema mismatch - 'read econnreset', // idle connection reset - 'bandwidth exhausted', - 'memory limit exceeded', - ]; - const isKnownError = - knownErrors.findIndex(err => detail.includes(err)) !== -1; - return isKnownError; - } - return false; + const reconnectionErrorCodes = [ + gax.Status.UNAVAILABLE, + gax.Status.RESOURCE_EXHAUSTED, + gax.Status.ABORTED, + gax.Status.CANCELLED, + gax.Status.DEADLINE_EXCEEDED, + ]; + return !!err.code && reconnectionErrorCodes.includes(err.code); } private isPermanentError(err: gax.GoogleError): boolean { diff --git a/src/managedwriter/writer_client.ts b/src/managedwriter/writer_client.ts index 8d9d1747..55721ab6 100644 --- a/src/managedwriter/writer_client.ts +++ b/src/managedwriter/writer_client.ts @@ -22,7 +22,6 @@ import {StreamConnection} from './stream_connection'; type StreamConnections = { connectionList: StreamConnection[]; - connections: Record; }; type CreateWriteStreamRequest = protos.google.cloud.bigquery.storage.v1.ICreateWriteStreamRequest; @@ -68,7 +67,6 @@ export class WriterClient { }); this._connections = { connectionList: [], - connections: {}, }; this._open = false; } @@ -189,7 +187,6 @@ export class WriterClient { options ); this._connections.connectionList.push(streamConnection); - this._connections.connections[`${streamId}`] = streamConnection; return streamConnection; } catch (err) { throw new Error('managed stream connection failed:' + err); @@ -230,6 +227,9 @@ export class WriterClient { this._connections.connectionList.map(conn => { conn.close(); }); + this._connections = { + connectionList: [], + }; this._open = false; } diff --git a/system-test/managed_writer_client_test.ts b/system-test/managed_writer_client_test.ts index 7b7dfa0b..633d394c 100644 --- a/system-test/managed_writer_client_test.ts +++ b/system-test/managed_writer_client_test.ts @@ -16,12 +16,17 @@ import * as assert from 'assert'; import {describe, it, xit} from 'mocha'; import * as uuid from 'uuid'; import * as gax from 'google-gax'; +import * as sinon from 'sinon'; import {BigQuery, TableSchema} from '@google-cloud/bigquery'; import * as protos from '../protos/protos'; import * as bigquerywriter from '../src'; import * as protobuf from 'protobufjs'; import {ClientOptions} from 'google-gax'; import * as customerRecordProtoJson from '../samples/customer_record.json'; +import {JSONEncoder} from '../src/managedwriter/encoder'; + +const sandbox = sinon.createSandbox(); +afterEach(() => sandbox.restore()); const {managedwriter, adapt} = bigquerywriter; const {WriterClient, Writer, JSONWriter, parseStorageErrors} = managedwriter; @@ -364,6 +369,76 @@ describe('managedwriter.WriterClient', () => { }); }); + describe('JSONEncoder', () => { + it('should automatically convert date/datetime/timestamps to expect BigQuery format', () => { + const updatedSchema = { + fields: [ + ...(schema.fields || []), + { + name: 'customer_birthday', + type: 'DATE', + }, + { + name: 'customer_created_at', + type: 'DATETIME', + }, + { + name: 'customer_updated_at', + type: 'TIMESTAMP', + }, + ], + }; + const storageSchema = + adapt.convertBigQuerySchemaToStorageTableSchema(updatedSchema); + const protoDescriptor: DescriptorProto = + adapt.convertStorageSchemaToProto2Descriptor(storageSchema, 'root'); + const encoder = new JSONEncoder({ + protoDescriptor, + }); + + // Row 1 + const row1 = { + customer_name: 'Ada Lovelace', + row_num: 1, + customer_birthday: new Date('1815-12-10'), + customer_created_at: new Date('2022-01-09T03:49:46.564Z'), + customer_updated_at: new Date('2023-01-09T03:49:46.564Z'), + }; + + // Row 2 + const row2 = { + customer_name: 'Alan Turing', + row_num: 2, + customer_birthday: new Date('1912-07-23'), + customer_created_at: new Date('2022-01-09T03:49:46.564Z'), + customer_updated_at: new Date('2023-01-09T03:49:46.564Z'), + }; + + const Proto = Type.fromDescriptor(protoDescriptor); + const encoded = encoder.encodeRows([row1, row2]); + + const encodedRow1 = encoded[0]; + const decodedRow1 = Proto.decode(encodedRow1).toJSON(); + assert.deepEqual(decodedRow1, { + customer_name: 'Ada Lovelace', + row_num: 1, + customer_birthday: -56270, + customer_created_at: '2022-01-09 03:49:46.564', + customer_updated_at: 1673236186564000, + }); + + const encodedRow2 = encoded[1]; + const decodedRow2 = Proto.decode(encodedRow2).toJSON(); + assert.deepEqual(decodedRow2, { + customer_name: 'Alan Turing', + row_num: 2, + customer_birthday: -20981, + customer_created_at: '2022-01-09 03:49:46.564', + customer_updated_at: 1673236186564000, + }); + }); + }); + describe('JSONWriter', () => { it('should invoke appendRows without errors', async () => { bqWriteClient.initialize(); @@ -780,6 +855,91 @@ describe('managedwriter.WriterClient', () => { } }); + it('should trigger reconnection given some specific errors', async () => { + bqWriteClient.initialize(); + const client = new WriterClient(); + client.setClient(bqWriteClient); + + const connection = await client.createStreamConnection({ + streamType: managedwriter.PendingStream, + destinationTable: parent, + }); + + let reconnectedCalled = false; + sandbox.stub(connection, 'reconnect').callsFake(() => { + reconnectedCalled = true; + }); + + const writer = new JSONWriter({ + connection, + protoDescriptor, + }); + + try { + // Write some data and trigger error + const pw = writer.appendRows( + [ + { + customer_name: 'Ada Lovelace', + row_num: 1, + }, + { + customer_name: 'Alan Turing', + row_num: 2, + }, + ], + 0 + ); + await pw.getResult(); + + const reconnectErrorCases: gax.GoogleError[] = [ + { + code: gax.Status.ABORTED, + msg: 'Closing the stream because it has been inactive', + }, + { + code: gax.Status.RESOURCE_EXHAUSTED, + msg: 'read econnreset', + }, + { + code: gax.Status.ABORTED, + msg: 'service is currently unavailable', + }, + { + code: gax.Status.RESOURCE_EXHAUSTED, + msg: 'bandwidth exhausted', + }, + { + code: gax.Status.RESOURCE_EXHAUSTED, + msg: 'memory limit exceeded', + }, + { + code: gax.Status.CANCELLED, + msg: 'any', + }, + { + code: gax.Status.DEADLINE_EXCEEDED, + msg: 'a msg', + }, + ].map(err => { + const gerr = new gax.GoogleError(err.msg); + gerr.code = err.code; + return gerr; + }); + for (const gerr of reconnectErrorCases) { + const conn = connection['_connection'] as gax.CancellableStream; // private method + conn.emit('error', gerr); + assert.equal(reconnectedCalled, true); + + reconnectedCalled = false; // reset flag + } + + writer.close(); + } finally { + client.close(); + } + }); + xit('reconnect on idle connection', async () => { bqWriteClient.initialize(); const client = new WriterClient();