Skip to content

Commit

Permalink
fix: arrow parsing for nested and nullable fields (#519)
Browse files Browse the repository at this point in the history
* fix: arrow parsing for nested and nullable fields

* fix: address pr comments

* docs: add comment
  • Loading branch information
alvarowolfx authored Oct 23, 2024
1 parent 44986cd commit fc052fb
Show file tree
Hide file tree
Showing 2 changed files with 216 additions and 19 deletions.
44 changes: 30 additions & 14 deletions src/reader/arrow_transform.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import {
RecordBatchReader,
RecordBatch,
RecordBatchStreamReader,
Vector,
DataType,
} from 'apache-arrow';
import * as protos from '../../protos/protos';

Expand Down Expand Up @@ -140,12 +140,13 @@ export class ArrowRecordBatchTableRowTransform extends Transform {
}
for (let j = 0; j < batch.numCols; j++) {
const column = batch.selectAt([j]);
const columnName = column.schema.fields[0].name;
const field = column.schema.fields[0];
const columnName = field.name;
for (let i = 0; i < batch.numRows; i++) {
const fieldData = column.get(i);
const fieldValue = fieldData?.toJSON()[columnName];
rows[i].f[j] = {
v: convertArrowValue(fieldValue),
v: convertArrowValue(fieldValue, field.type as DataType),
};
}
}
Expand All @@ -156,21 +157,36 @@ export class ArrowRecordBatchTableRowTransform extends Transform {
}
}

function convertArrowValue(fieldValue: any): any {
if (typeof fieldValue === 'object') {
if (fieldValue instanceof Vector) {
const arr = fieldValue.toJSON();
return arr.map((v: any) => {
return {v: convertArrowValue(v)};
});
}
const tableRow: TableRow = {f: []};
function convertArrowValue(fieldValue: any, type: DataType): any {

Check warning on line 160 in src/reader/arrow_transform.ts

View workflow job for this annotation

GitHub Actions / lint

Unexpected any. Specify a different type

Check warning on line 160 in src/reader/arrow_transform.ts

View workflow job for this annotation

GitHub Actions / lint

Unexpected any. Specify a different type
if (fieldValue === null) {
return null;
}
if (DataType.isList(type)) {
const arr = fieldValue.toJSON();
return arr.map((v: any) => {

Check warning on line 166 in src/reader/arrow_transform.ts

View workflow job for this annotation

GitHub Actions / lint

Unexpected any. Specify a different type
// Arrays/lists in BigQuery have the same datatype for every element
// so getting the first one is all we need
const elemType = type.children[0].type;
return {v: convertArrowValue(v, elemType)};
});
}
if (DataType.isStruct(type)) {
const tableRow: TableRow = {};
Object.keys(fieldValue).forEach(key => {
tableRow.f?.push({
v: convertArrowValue(fieldValue[key]),
const elemType = type.children.find(f => f.name === key);
if (!tableRow.f) {
tableRow.f = [];
}
tableRow.f.push({
v: convertArrowValue(fieldValue[key], elemType?.type as DataType),
});
});
return tableRow;
}
if (DataType.isTimestamp(type)) {
// timestamp comes in microsecond, convert to nanoseconds
// to make it compatible with BigQuery.timestamp.
return fieldValue * 1000;
}
return fieldValue;
}
191 changes: 186 additions & 5 deletions system-test/reader_client_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,33 @@ describe('reader.ReaderClient', () => {
type: 'INTEGER',
mode: 'REQUIRED',
},
{
name: 'optional',
type: 'STRING',
mode: 'NULLABLE',
},
{
name: 'list',
type: 'INT64',
mode: 'REPEATED',
},
{
name: 'metadata',
type: 'RECORD',
mode: 'NULLABLE',
fields: [
{
name: 'created_at',
type: 'TIMESTAMP',
mode: 'REQUIRED',
},
{
name: 'updated_at',
type: 'TIMESTAMP',
mode: 'NULLABLE',
},
],
},
],
};

Expand All @@ -97,9 +124,26 @@ describe('reader.ReaderClient', () => {
.dataset(datasetId)
.table(tableId)
.insert([
{name: 'Ada Lovelace', row_num: 1},
{name: 'Alan Turing', row_num: 2},
{name: 'Bell', row_num: 3},
{
name: 'Ada Lovelace',
row_num: 1,
optional: 'Some data',
list: [1],
metadata: {
created_at: bigquery.timestamp('2020-04-27T18:07:25.356Z'),
updated_at: bigquery.timestamp('2020-04-27T20:07:25.356Z'),
},
},
{
name: 'Alan Turing',
row_num: 2,
optional: 'Some other data',
list: [1, 2],
metadata: {
created_at: bigquery.timestamp('2020-04-27T18:07:25.356Z'),
},
},
{name: 'Bell', row_num: 3, list: [1, 2, 3]},
]);
});

Expand Down Expand Up @@ -218,7 +262,7 @@ describe('reader.ReaderClient', () => {
const table = await tableFromIPC(content);

assert.equal(table.numRows, 3);
assert.equal(table.numCols, 2);
assert.equal(table.numCols, 5);

reader.close();
} finally {
Expand Down Expand Up @@ -253,7 +297,7 @@ describe('reader.ReaderClient', () => {
const table = new Table(batches);

assert.equal(table.numRows, 3);
assert.equal(table.numCols, 2);
assert.equal(table.numCols, 5);

reader.close();
} finally {
Expand Down Expand Up @@ -295,6 +339,143 @@ describe('reader.ReaderClient', () => {

assert.equal(rows.length, 3);

assert.deepEqual(rows, [
{
f: [
{
v: 'Ada Lovelace',
},
{
v: '1',
},
{
v: 'Some data',
},
{
v: [
{
v: '1',
},
],
},
{
v: {
f: [
{
v: 1588010845356000,
},
{
v: 1588018045356000,
},
],
},
},
],
},
{
f: [
{
v: 'Alan Turing',
},
{
v: '2',
},
{
v: 'Some other data',
},
{
v: [
{
v: '1',
},
{
v: '2',
},
],
},
{
v: {
f: [
{
v: 1588010845356000,
},
{
v: null,
},
],
},
},
],
},
{
f: [
{
v: 'Bell',
},
{
v: '3',
},
{
v: null,
},
{
v: [
{
v: '1',
},
{
v: '2',
},
{
v: '3',
},
],
},
{
v: null,
},
],
},
]);
const mergedRows = BigQuery.mergeSchemaWithRows_(schema, rows, {
wrapIntegers: false,
});
assert.deepEqual(mergedRows, [
{
name: 'Ada Lovelace',
row_num: 1,
optional: 'Some data',
list: [1],
metadata: {
created_at: {
value: '2020-04-27T18:07:25.356Z',
},
updated_at: {
value: '2020-04-27T20:07:25.356Z',
},
},
},
{
name: 'Alan Turing',
row_num: 2,
optional: 'Some other data',
list: [1, 2],
metadata: {
created_at: {
value: '2020-04-27T18:07:25.356Z',
},
updated_at: null,
},
},
{
name: 'Bell',
row_num: 3,
list: [1, 2, 3],
optional: null,
metadata: null,
},
]);

reader.close();
} finally {
client.close();
Expand Down

0 comments on commit fc052fb

Please sign in to comment.