Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for missing value interpretation #428

45 changes: 34 additions & 11 deletions src/managedwriter/json_writer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,17 @@

import * as protos from '../../protos/protos';
import {PendingWrite} from './pending_write';
import {StreamConnection, RemoveListener} from './stream_connection';
import {RemoveListener} from './stream_connection';
import * as adapt from '../adapt';
import {Writer} from './writer';
import {Writer, WriterOptions} from './writer';
import {JSONEncoder} from './encoder';

type TableSchema = protos.google.cloud.bigquery.storage.v1.ITableSchema;
type MissingValueInterpretation =
protos.google.cloud.bigquery.storage.v1.AppendRowsRequest['defaultMissingValueInterpretation'];
type MissingValueInterpretationMap = {
[column: string]: MissingValueInterpretation;
};
type IInt64Value = protos.google.protobuf.IInt64Value;
type IDescriptorProto = protos.google.protobuf.IDescriptorProto;
export type JSONPrimitive = string | number | boolean | Date | null;
Expand Down Expand Up @@ -49,16 +54,10 @@ export class JSONWriter {
/**
* Creates a new JSONWriter instance.
*
* @param {Object} params - The parameters for the JSONWriter.
* @param {StreamConnection} params.connection - The stream connection
* to the BigQuery streaming insert operation.
* @param {IDescriptorProto} params.protoDescriptor - The proto descriptor
* for the JSON rows.
* @param {WriterOptions} params - The parameters for the JSONWriter.
* See WriterOptions docs for more information.
*/
constructor(params: {
connection: StreamConnection;
protoDescriptor: IDescriptorProto;
}) {
constructor(params: WriterOptions) {
const {connection, protoDescriptor} = params;
this._writer = new Writer(params);
this._encoder = new JSONEncoder({
Expand Down Expand Up @@ -88,6 +87,30 @@ export class JSONWriter {
this._encoder.setProtoDescriptor(protoDescriptor);
}

/**
* Update how missing values are interpreted for the given stream.
*
* @param {MissingValueInterpretation} defaultMissingValueInterpretation
*/
setDefaultMissingValueInterpretation(
defaultMissingValueInterpretation: MissingValueInterpretation
) {
this._writer.setDefaultMissingValueInterpretation(
defaultMissingValueInterpretation
);
}

/**
* Update how missing values are interpreted for individual columns.
*
* @param {MissingValueInterpretationMap} missingValueInterpretations
*/
setMissingValueInterpretations(
missingValueInterpretations: MissingValueInterpretationMap
) {
this._writer.setMissingValueInterpretations(missingValueInterpretations);
}

/**
* Writes a JSONList that contains objects to be written to the BigQuery table by first converting
* the JSON data to protobuf messages, then using Writer's appendRows() to write the data at current end
Expand Down
100 changes: 90 additions & 10 deletions src/managedwriter/writer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,60 @@ type ProtoData =
protos.google.cloud.bigquery.storage.v1.AppendRowsRequest.IProtoData;
type IDescriptorProto = protos.google.protobuf.IDescriptorProto;
type DescriptorProto = protos.google.protobuf.DescriptorProto;
type MissingValueInterpretation =
AppendRowRequest['defaultMissingValueInterpretation'];
type MissingValueInterpretationMap = {
[column: string]: MissingValueInterpretation;
};

const DescriptorProto = protos.google.protobuf.DescriptorProto;

export interface WriterOptions {
/** The stream connection to the BigQuery streaming insert operation. */
connection: StreamConnection;

/** The proto descriptor for the stream. */
protoDescriptor: IDescriptorProto;

/**
* Controls how missing values are interpreted for a given stream.
* `missingValueInterpretations` set for individual columns can override the default chosen
* with this option.
*
* For example, if you want to write
* `NULL` instead of using default values for some columns, you can set
* `defaultMissingValueInterpretation` to `DEFAULT_VALUE` and at the same
* time, set `missingValueInterpretations` to `NULL_VALUE` on those columns.
*/
defaultMissingValueInterpretation?: MissingValueInterpretation;

/**
* Control how missing values are interpreted for individual columns.
*
* You must provide an object to indicate how to interpret missing value for some fields. Missing
* values are fields present in user schema but missing in rows. The key is
* the field name. The value is the interpretation of missing values for the
* field.
*
* For example, the following option would indicate that missing values in the "foo"
* column are interpreted as null, whereas missing values in the "bar" column are
* treated as the default value:
*
* {
* "foo": 'DEFAULT_VALUE',
* "bar": 'NULL_VALUE',
* }
*
* If a field is not in this object and has missing values, the missing values
* in this field are interpreted as NULL unless overridden with a default missing
* value interpretation.
*
* Currently, field name can only be top-level column name, can't be a struct
* field path like 'foo.bar'.
*/
missingValueInterpretations?: MissingValueInterpretationMap;
}

/**
* A BigQuery Storage API Writer that can be used to write data into BigQuery Table
* using the Storage API.
Expand All @@ -37,23 +88,26 @@ const DescriptorProto = protos.google.protobuf.DescriptorProto;
export class Writer {
private _protoDescriptor: DescriptorProto;
private _streamConnection: StreamConnection;
private _defaultMissingValueInterpretation?: MissingValueInterpretation;
private _missingValueInterpretations?: MissingValueInterpretationMap;

/**
* Creates a new Writer instance.
*
* @param {Object} params - The parameters for the JSONWriter.
* @param {StreamConnection} params.connection - The stream connection
* to the BigQuery streaming insert operation.
* @param {IDescriptorProto} params.protoDescriptor - The proto descriptor
* for the JSON rows.
* @param {WriterOptions} params - The parameters for the Writer.
* See WriterOptions docs for more information.
*/
constructor(params: {
connection: StreamConnection;
protoDescriptor: IDescriptorProto;
}) {
const {connection, protoDescriptor} = params;
constructor(params: WriterOptions) {
const {
connection,
protoDescriptor,
missingValueInterpretations,
defaultMissingValueInterpretation,
} = params;
this._streamConnection = connection;
this._protoDescriptor = new DescriptorProto(protoDescriptor);
this._defaultMissingValueInterpretation = defaultMissingValueInterpretation;
this._missingValueInterpretations = missingValueInterpretations;
}

/**
Expand All @@ -72,6 +126,28 @@ export class Writer {
}
}

/**
* Update how missing values are interpreted for the given stream.
*
* @param {MissingValueInterpretation} defaultMissingValueInterpretation
*/
setDefaultMissingValueInterpretation(
defaultMissingValueInterpretation: MissingValueInterpretation
) {
this._defaultMissingValueInterpretation = defaultMissingValueInterpretation;
}

/**
* Update how missing values are interpreted for individual columns.
*
* @param {MissingValueInterpretationMap} missingValueInterpretations
*/
setMissingValueInterpretations(
missingValueInterpretations: MissingValueInterpretationMap
) {
this._missingValueInterpretations = missingValueInterpretations;
}

/**
* Schedules the writing of rows at given offset.
*
Expand All @@ -97,6 +173,10 @@ export class Writer {
protoDescriptor: this._protoDescriptor.toJSON(),
},
},
defaultMissingValueInterpretation:
this._defaultMissingValueInterpretation,
missingValueInterpretations: this
._missingValueInterpretations as AppendRowRequest['missingValueInterpretations'],
offset,
};

Expand Down
136 changes: 136 additions & 0 deletions system-test/managed_writer_client_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,142 @@ describe('managedwriter.WriterClient', () => {
}).timeout(30 * 1000);
});

it('should fill default values when MissingValuesInterpretation is set', async () => {
bqWriteClient.initialize();
const client = new WriterClient();
client.setClient(bqWriteClient);

const schema: TableSchema = {
fields: [
{
name: 'customer_name',
type: 'STRING',
mode: 'REQUIRED',
},
{
name: 'row_num',
type: 'INTEGER',
mode: 'REQUIRED',
},
{
name: 'id',
type: 'STRING',
defaultValueExpression: 'GENERATE_UUID()',
},
{
name: 'created_at',
type: 'TIMESTAMP',
defaultValueExpression: 'CURRENT_TIMESTAMP()',
},
{
name: 'updated_at',
type: 'TIMESTAMP',
defaultValueExpression: 'CURRENT_TIMESTAMP()',
},
],
};
const [table] = await bigquery
.dataset(datasetId)
.createTable(tableId + '_default_values', {schema});
const parent = `projects/${projectId}/datasets/${datasetId}/tables/${table.id}`;

const storageSchema =
adapt.convertBigQuerySchemaToStorageTableSchema(schema);
const protoDescriptor: DescriptorProto =
adapt.convertStorageSchemaToProto2Descriptor(storageSchema, 'root');

const row1 = {
customer_name: 'Ada Lovelace',
row_num: 1,
};

const row2 = {
customer_name: 'Alan Turing',
row_num: 2,
};

try {
const connection = await client.createStreamConnection({
streamType: managedwriter.PendingStream,
destinationTable: parent,
});

const streamId = connection.getStreamId();
const writer = new JSONWriter({
connection,
protoDescriptor,
defaultMissingValueInterpretation: 'DEFAULT_VALUE',
missingValueInterpretations: {
updated_at: 'NULL_VALUE',
},
});

let pw = writer.appendRows([row1, row2], 0);
let result = await pw.getResult();

// change MVI config
writer.setDefaultMissingValueInterpretation('NULL_VALUE');
writer.setMissingValueInterpretations({
updated_at: 'DEFAULT_VALUE',
});

const row3 = {
customer_name: 'Charles Babbage',
row_num: 3,
};

const row4 = {
customer_name: 'Lord Byron',
row_num: 4,
};

pw = writer.appendRows([row3, row4], 2);
result = await pw.getResult();

assert.equal(result.error, null);

const res = await connection.finalize();
connection.close();
assert.equal(res?.rowCount, 4);

const commitResponse = await client.batchCommitWriteStream({
parent,
writeStreams: [streamId],
});
assert.equal(commitResponse.streamErrors?.length, 0);

const [rows] = await bigquery.query(
`SELECT * FROM \`${projectId}.${datasetId}.${table.id}\` order by row_num`
);
assert.strictEqual(rows.length, 4);

const first = rows[0];
assert.notEqual(first.id, null);
assert.notEqual(first.created_at, null);
assert.equal(first.updated_at, null);

const second = rows[1];
assert.notEqual(second.id, null);
assert.notEqual(second.created_at, null);
assert.equal(second.updated_at, null);

// After change on MVI config
const third = rows[2];
assert.equal(third.id, null);
assert.equal(third.created_at, null);
assert.notEqual(third.updated_at, null);

const forth = rows[3];
assert.equal(forth.id, null);
assert.equal(forth.created_at, null);
assert.notEqual(forth.updated_at, null);

writer.close();
} finally {
client.close();
}
});

describe('Error Scenarios', () => {
it('send request with mismatched proto descriptor', async () => {
bqWriteClient.initialize();
Expand Down
Loading