diff --git a/.gitignore b/.gitignore index 0d5fc380..87e59907 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,7 @@ dist/ .idea node_modules benchmarks/leaks/input +benchmarks/dev *.tgz .npmrc webpack diff --git a/packages/client-common/__tests__/unit/row_binary_columns_parser.test.ts b/packages/client-common/__tests__/unit/row_binary_columns_parser.test.ts new file mode 100644 index 00000000..af9a3a90 --- /dev/null +++ b/packages/client-common/__tests__/unit/row_binary_columns_parser.test.ts @@ -0,0 +1,56 @@ +import { parseFixedStringType } from '../../src/data_formatter/row_binary/columns_parser' + +describe('RowBinary column types parser', () => { + describe('FixedString', () => { + it('should parse FixedString', async () => { + const args: [string, number][] = [ + ['FixedString(1)', 1], + ['FixedString(42)', 42], + ['FixedString(100)', 100], + ['FixedString(32768)', 32768], + ] + args.forEach(([columnType, sizeBytes]) => { + const result = parseFixedStringType({ + columnType, + sourceType: columnType, + }) + expect(result) + .withContext( + `Expected ${columnType} to be parsed as a FixedString with size ${sizeBytes}`, + ) + .toEqual({ type: 'FixedString', sizeBytes, sourceType: columnType }) + }) + }) + + it('should throw on invalid FixedString type', async () => { + const args: [string][] = [ + ['FixedString'], + ['FixedString('], + ['FixedString()'], + ['String'], + ] + args.forEach(([columnType]) => { + expect(() => + parseFixedStringType({ columnType, sourceType: columnType }), + ) + .withContext(`Expected ${columnType} to throw`) + .toThrowError('Invalid FixedString type') + }) + }) + + it('should throw on invalid FixedString size', async () => { + const args: [string][] = [ + ['FixedString(0)'], + ['FixedString(x)'], + [`FixedString(')`], + ] + args.forEach(([columnType]) => { + expect(() => + parseFixedStringType({ columnType, sourceType: columnType }), + ) + .withContext(`Expected ${columnType} to throw`) + .toThrowError('Invalid FixedString size in bytes') + }) + }) + }) +}) diff --git a/packages/client-common/__tests__/unit/row_binary_columns_parser_array.test.ts b/packages/client-common/__tests__/unit/row_binary_columns_parser_array.test.ts new file mode 100644 index 00000000..9e6688bf --- /dev/null +++ b/packages/client-common/__tests__/unit/row_binary_columns_parser_array.test.ts @@ -0,0 +1,308 @@ +import type { SimpleColumnType } from '../../src/data_formatter' +import type { + ParsedColumnDateTime, + ParsedColumnDateTime64, + ParsedColumnEnum, +} from '../../src/data_formatter/row_binary/columns_parser' +import { parseArrayType } from '../../src/data_formatter/row_binary/columns_parser' + +describe('RowBinary column types parser - Array', () => { + it('should parse Array with a simple value type', async () => { + type TestArgs = { + columnType: string + valueType: SimpleColumnType + dimensions: number + } + const args: TestArgs[] = [ + { + columnType: 'Array(String)', + valueType: 'String', + dimensions: 1, + }, + { + columnType: 'Array(UInt8)', + valueType: 'UInt8', + dimensions: 1, + }, + { + columnType: 'Array(Array(Int32))', + valueType: 'Int32', + dimensions: 2, + }, + { + columnType: 'Array(Array(Array(Date32)))', + valueType: 'Date32', + dimensions: 3, + }, + { + columnType: 'Array(Array(Array(Array(Float32))))', + valueType: 'Float32', + dimensions: 4, + }, + ] + args.forEach((args: TestArgs) => { + const { columnType, valueType, dimensions } = args + const result = parseArrayType({ columnType, sourceType: columnType }) + expect(result) + .withContext( + `Expected ${columnType} to be parsed as an Array with value type ${valueType} and ${dimensions} dimensions`, + ) + .toEqual({ + type: 'Array', + value: { + type: 'Simple', + columnType: valueType, + sourceType: valueType, // T + }, + sourceType: columnType, // Array(T) + dimensions, + }) + }) + }) + + it('should parse Array with Nullable', async () => { + type TestArgs = { + columnType: string + valueType: SimpleColumnType + dimensions: number + } + const args: TestArgs[] = [ + { + columnType: 'Array(Nullable(String))', + valueType: 'String', + dimensions: 1, + }, + { + columnType: 'Array(Array(Nullable(Int32)))', + valueType: 'Int32', + dimensions: 2, + }, + ] + args.forEach(({ columnType, valueType, dimensions }: TestArgs) => { + const result = parseArrayType({ columnType, sourceType: columnType }) + expect(result) + .withContext( + `Expected ${columnType} to be parsed as an Array with value type ${valueType} and ${dimensions} dimensions`, + ) + .toEqual({ + type: 'Array', + value: { + type: 'Nullable', + value: { + type: 'Simple', + columnType: valueType, + sourceType: valueType, // T + }, + sourceType: `Nullable(${valueType})`, // Nullable(T) + }, + sourceType: columnType, // Array(Nullable(T)) + dimensions, + }) + }) + }) + + it('should parse Array with Enum value type', async () => { + type TestArgs = { + value: ParsedColumnEnum + dimensions: number + columnType: string + } + const sourceEnum8 = `Enum8('foo' = 42)` + const valuesEnum8 = new Map([[42, 'foo']]) + const sourceEnum16 = `Enum16('bar' = 144, 'qaz' = 500)` + const valuesEnum16 = new Map([ + [144, 'bar'], + [500, 'qaz'], + ]) + const args: TestArgs[] = [ + { + value: { + type: 'Enum', + intSize: 8, + values: valuesEnum8, + sourceType: sourceEnum8, + }, + dimensions: 1, + columnType: `Array(${sourceEnum8})`, + }, + { + value: { + type: 'Enum', + intSize: 16, + values: valuesEnum16, + sourceType: sourceEnum16, + }, + dimensions: 1, + columnType: `Array(${sourceEnum16})`, + }, + { + value: { + type: 'Enum', + intSize: 8, + values: valuesEnum8, + sourceType: sourceEnum8, + }, + dimensions: 2, + columnType: `Array(Array(${sourceEnum8}))`, + }, + { + value: { + type: 'Enum', + intSize: 16, + values: valuesEnum16, + sourceType: sourceEnum16, + }, + dimensions: 3, + columnType: `Array(Array(Array(${sourceEnum16})))`, + }, + ] + args.forEach(({ columnType, dimensions, value }) => { + const result = parseArrayType({ columnType, sourceType: columnType }) + expect(result) + .withContext( + `Expected ${columnType} to be parsed as an Array with value type ${value.sourceType} and ${dimensions} dimensions`, + ) + .toEqual({ + type: 'Array', + sourceType: columnType, + dimensions, + value, + }) + }) + }) + + it('should parse Array of DateTime', async () => { + type TestArgs = { + value: ParsedColumnDateTime + dimensions: number + columnType: string + } + const args: TestArgs[] = [ + { + value: { + type: 'DateTime', + timezone: null, + sourceType: 'DateTime', + }, + dimensions: 1, + columnType: 'Array(DateTime)', + }, + { + value: { + type: 'DateTime', + timezone: 'UTC', + sourceType: `DateTime('UTC')`, + }, + dimensions: 1, + columnType: `Array(DateTime('UTC'))`, + }, + { + value: { + type: 'DateTime', + timezone: 'Etc/GMT-5', + sourceType: `DateTime('Etc/GMT-5')`, + }, + dimensions: 2, + columnType: `Array(Array(DateTime('Etc/GMT-5')))`, + }, + ] + args.forEach(({ columnType, dimensions, value }) => { + const result = parseArrayType({ columnType, sourceType: columnType }) + expect(result) + .withContext( + `Expected ${columnType} to be parsed as an Array with value type ${value.sourceType} and ${dimensions} dimensions`, + ) + .toEqual({ + type: 'Array', + sourceType: columnType, + dimensions, + value, + }) + }) + }) + + it('should parse Array of DateTime64', async () => { + type TestArgs = { + value: ParsedColumnDateTime64 + dimensions: number + columnType: string + } + const args: TestArgs[] = [ + { + value: { + type: 'DateTime64', + timezone: null, + sourceType: 'DateTime64(0)', + precision: 0, + }, + dimensions: 1, + columnType: 'Array(DateTime64(0))', + }, + { + value: { + type: 'DateTime64', + timezone: 'UTC', + sourceType: `DateTime64(3, 'UTC')`, + precision: 3, + }, + dimensions: 1, + columnType: `Array(DateTime64(3, 'UTC'))`, + }, + { + value: { + type: 'DateTime64', + timezone: 'Etc/GMT-5', + sourceType: `DateTime64(6, 'Etc/GMT-5')`, + precision: 6, + }, + dimensions: 2, + columnType: `Array(Array(DateTime64(6, 'Etc/GMT-5')))`, + }, + { + value: { + type: 'DateTime64', + timezone: 'Europe/Sofia', + sourceType: `DateTime64(9, 'Europe/Sofia')`, + precision: 9, + }, + dimensions: 3, + columnType: `Array(Array(Array(DateTime64(9, 'Europe/Sofia'))))`, + }, + ] + + args.forEach(({ columnType, dimensions, value }) => { + const result = parseArrayType({ columnType, sourceType: columnType }) + expect(result) + .withContext( + `Expected ${columnType} to be parsed as an Array with value type ${value.sourceType} and ${dimensions} dimensions`, + ) + .toEqual({ + type: 'Array', + sourceType: columnType, + dimensions, + value, + }) + }) + }) + + // TODO: Map type test. + + it('should throw on invalid Array type', async () => { + // Array(Int8) is the shortest valid definition + const args = [ + ['Array'], + ['Array('], + ['Array()'], + ['Array(a'], + ['Array(ab'], + ['Array(ab)'], + ['Array(abc)'], + ['String'], + ] + args.forEach(([columnType]) => { + expect(() => parseArrayType({ columnType, sourceType: columnType })) + .withContext(`Expected ${columnType} to throw`) + .toThrowError('Invalid Array type') + }) + }) +}) diff --git a/packages/client-common/__tests__/unit/row_binary_columns_parser_datetime.test.ts b/packages/client-common/__tests__/unit/row_binary_columns_parser_datetime.test.ts new file mode 100644 index 00000000..2ea6eee9 --- /dev/null +++ b/packages/client-common/__tests__/unit/row_binary_columns_parser_datetime.test.ts @@ -0,0 +1,116 @@ +import { + parseDateTime64Type, + parseDateTimeType, +} from '../../src/data_formatter/row_binary/columns_parser' + +describe('RowBinary column types parser - DateTime and DateTime64', () => { + describe('DateTime', () => { + it('should parse DateTime', async () => { + const args: [string, string | null][] = [ + ['DateTime', null], + [`DateTime('GB')`, 'GB'], + [`DateTime('UTC')`, 'UTC'], + [`DateTime('Europe/Amsterdam')`, 'Europe/Amsterdam'], + ] + args.forEach(([columnType, timezone]) => { + const result = parseDateTimeType({ columnType, sourceType: columnType }) + expect(result) + .withContext(`Expected ${columnType} to be parsed as a DateTime`) + .toEqual({ type: 'DateTime', sourceType: columnType, timezone }) + }) + }) + + it('should throw on invalid DateTime', async () => { + // DateTime('GB') has the least amount of chars allowed for a valid DateTime type. + const args: [string][] = [ + ['DateTime()'], + [`DateTime(')`], + [`DateTime('')`], + [`DateTime('A')`], + ['String'], + ] + args.forEach(([columnType]) => { + expect(() => parseDateTimeType({ columnType, sourceType: columnType })) + .withContext(`Expected ${columnType} to throw`) + .toThrowError('Invalid DateTime type') + }) + }) + }) + + describe('DateTime64', () => { + const precisionRange = [...Array(10).keys()] // 0..9 + + it('should parse DateTime64 without timezone', async () => { + const args: [string, number][] = precisionRange.map((precision) => [ + `DateTime64(${precision})`, + precision, + ]) + args.forEach(([columnType, precision]) => { + const result = parseDateTime64Type({ + columnType, + sourceType: columnType, + }) + expect(result) + .withContext( + `Expected ${columnType} to be parsed as a DateTime64 with precision ${precision}`, + ) + .toEqual({ + type: 'DateTime64', + timezone: null, + sourceType: columnType, + precision, + }) + }) + }) + + it('should parse DateTime64 with timezone', async () => { + const allPrecisionArgs: [string, number, string][][] = precisionRange.map( + (precision) => [ + [`DateTime64(${precision}, 'GB')`, precision, 'GB'], + [`DateTime64(${precision}, 'UTC')`, precision, 'UTC'], + [`DateTime64(${precision}, 'Etc/GMT-5')`, precision, 'Etc/GMT-5'], + ], + ) + allPrecisionArgs.forEach((args) => + args.forEach(([columnType, precision, timezone]) => { + const result = parseDateTime64Type({ + columnType, + sourceType: columnType, + }) + expect(result) + .withContext( + `Expected ${columnType} to be parsed as a DateTime64 with precision ${precision} and timezone ${timezone}`, + ) + .toEqual({ + type: 'DateTime64', + sourceType: columnType, + timezone, + precision, + }) + }), + ) + }) + + it('should throw on invalid DateTime64 type', async () => { + const args = [['DateTime64('], ['DateTime64()'], ['String']] + args.forEach(([columnType]) => { + expect(() => + parseDateTime64Type({ columnType, sourceType: columnType }), + ) + .withContext(`Expected ${columnType} to throw`) + .toThrowError('Invalid DateTime64 type') + }) + }) + + it('should throw on invalid DateTime64 precision', async () => { + const args = [[`DateTime64(')`], [`DateTime64(foo)`]] + args.forEach(([columnType]) => { + expect(() => + parseDateTime64Type({ columnType, sourceType: columnType }), + ) + .withContext(`Expected ${columnType} to throw`) + .toThrowError('Invalid DateTime64 precision') + }) + }) + }) +}) diff --git a/packages/client-common/__tests__/unit/row_binary_columns_parser_decimal.test.ts b/packages/client-common/__tests__/unit/row_binary_columns_parser_decimal.test.ts new file mode 100644 index 00000000..6d583721 --- /dev/null +++ b/packages/client-common/__tests__/unit/row_binary_columns_parser_decimal.test.ts @@ -0,0 +1,103 @@ +import { parseDecimalType } from '../../src/data_formatter/row_binary/columns_parser' + +describe('RowBinary column types parser - Decimal', () => { + type TestArgs = { + sourceType: string + precision: number + scale: number + intSize: 32 | 64 | 128 | 256 + } + + it('should parse Decimal', async () => { + const args: TestArgs[] = [ + { + sourceType: 'Decimal(7, 2)', + precision: 7, + scale: 2, + intSize: 32, + }, + { + sourceType: 'Decimal(12, 4)', + precision: 12, + scale: 4, + intSize: 64, + }, + { + sourceType: 'Decimal(27, 6)', + precision: 27, + scale: 6, + intSize: 128, + }, + { + sourceType: 'Decimal(42, 8)', + precision: 42, + scale: 8, + intSize: 256, + }, + ] + args.forEach(({ sourceType, precision, scale, intSize }) => { + const result = parseDecimalType({ columnType: sourceType, sourceType }) + expect(result) + .withContext( + `Expected ${sourceType} to be parsed as a Decimal with precision ${precision}, scale ${scale} and intSize ${intSize}`, + ) + .toEqual({ + type: 'Decimal', + params: { precision, scale, intSize }, + sourceType, + }) + }) + }) + + it('should throw on invalid Decimal type', async () => { + const args: [string][] = [ + ['Decimal'], + ['Decimal('], + ['Decimal()'], + ['Decimal(1)'], + ['Decimal(1,)'], + ['Decimal(1, )'], + ['String'], + ] + args.forEach(([columnType]) => { + expect(() => parseDecimalType({ columnType, sourceType: columnType })) + .withContext(`Expected ${columnType} to throw`) + .toThrowError('Invalid Decimal type') + }) + }) + + it('should throw on invalid Decimal precision', async () => { + const args: [string][] = [ + ['Decimal(0, 0)'], + ['Decimal(x, 0)'], + [`Decimal(', ')`], + [`Decimal(77, 1)`], // max is 76 + ] + args.forEach(([columnType]) => { + expect(() => parseDecimalType({ columnType, sourceType: columnType })) + .withContext(`Expected ${columnType} to throw`) + .toThrowError('Invalid Decimal precision') + }) + }) + + it('should throw on invalid Decimal scale', async () => { + const args: [string][] = [ + ['Decimal(1, 2)'], // scale should be less than precision + ['Decimal(1, x)'], + [`Decimal(42, ,)`], + [`Decimal(42, ')`], + ] + args.forEach(([columnType]) => { + expect(() => parseDecimalType({ columnType, sourceType: columnType })) + .withContext(`Expected ${columnType} to throw`) + .toThrowError('Invalid Decimal scale') + }) + }) + + it('should throw when precision or scale cannot be parsed', async () => { + const columnType = 'Decimal(foobar)' + expect(() => + parseDecimalType({ columnType, sourceType: columnType }), + ).toThrowError('Expected Decimal type to have both precision and scale') + }) +}) diff --git a/packages/client-common/__tests__/unit/row_binary_columns_parser_enum.test.ts b/packages/client-common/__tests__/unit/row_binary_columns_parser_enum.test.ts new file mode 100644 index 00000000..0fb4445c --- /dev/null +++ b/packages/client-common/__tests__/unit/row_binary_columns_parser_enum.test.ts @@ -0,0 +1,89 @@ +import { enumTypes, parsedEnumTestArgs } from '@test/utils' +import { parseEnumType } from '../../src/data_formatter/row_binary/columns_parser' + +describe('RowBinary column types parser - Enum', () => { + it('should parse correct values', async () => { + parsedEnumTestArgs.forEach((expected) => { + const result = parseEnumType({ + sourceType: expected.sourceType, + columnType: expected.sourceType, + }) + expect(result) + .withContext( + `Expected ${ + expected.sourceType + } to be parsed as an Enum with intSize ${ + expected.intSize + } and values [${[...expected.values.entries()]}]`, + ) + .toEqual(expected) + }) + }) + + it('should throw when the type is not a valid enum', async () => { + const args: [string][] = [ + ['Enum'], // should be either 8 or 16 + ['Enum32'], + ['Enum64'], + ['String'], + ['Enum(String)'], + ] + args.forEach(([columnType]) => { + expect(() => parseEnumType({ columnType, sourceType: columnType })) + .withContext(`Expected ${columnType} to throw`) + .toThrowError('Expected Enum to be either Enum8 or Enum16') + }) + }) + + it('should throw when the values are not valid', async () => { + const args: [string][] = [["Enum8('a' = x)"], ["Enum16('foo' = 'bar')"]] + args.forEach(([columnType]) => { + expect(() => parseEnumType({ columnType, sourceType: columnType })) + .withContext(`Expected ${columnType} to throw`) + .toThrowError('Expected Enum index to be a valid number') + }) + }) + + it('should throw on duplicate indices', async () => { + const args: [string][] = [ + ["Enum8('a' = 0, 'b' = 0)"], + ["Enum8('a' = 0, 'b' = 1, 'c' = 1)"], + ] + args.forEach(([columnType]) => { + expect(() => parseEnumType({ columnType, sourceType: columnType })) + .withContext(`Expected ${columnType} to throw`) + .toThrowError('Duplicate Enum index') + }) + }) + + it('should throw on duplicate names', async () => { + const args: [string][] = [ + ["Enum8('a' = 0, 'a' = 1)"], + ["Enum8('a' = 0, 'b' = 1, 'b' = 2)"], + ] + args.forEach(([columnType]) => { + expect(() => parseEnumType({ columnType, sourceType: columnType })) + .withContext(`Expected ${columnType} to throw`) + .toThrowError('Duplicate Enum name') + }) + }) + + it('should throw when Enum has no values to parse', async () => { + // The minimal allowed Enum definition is Enum8('' = 0), i.e. 6 chars inside. + const allEnumTypeArgs: string[][] = enumTypes.map(([enumType]) => [ + `${enumType}()`, + `${enumType}(')`, + `${enumType}('')`, + `${enumType}('' )`, + `${enumType}('' =)`, + `${enumType}('' = )`, + ]) + allEnumTypeArgs.forEach((args) => + args.forEach((columnType) => { + expect(() => parseEnumType({ columnType, sourceType: columnType })) + .withContext(`Expected ${columnType} to throw`) + .toThrowError('Invalid Enum type values') + }), + ) + }) +}) diff --git a/packages/client-common/__tests__/unit/row_binary_columns_parser_map.test.ts b/packages/client-common/__tests__/unit/row_binary_columns_parser_map.test.ts new file mode 100644 index 00000000..dd269664 --- /dev/null +++ b/packages/client-common/__tests__/unit/row_binary_columns_parser_map.test.ts @@ -0,0 +1,41 @@ +import type { ParsedColumnMap } from '../../src/data_formatter/row_binary/columns_parser' +import { parseMapType } from '../../src/data_formatter/row_binary/columns_parser' + +describe('RowBinary column types parser - Map', () => { + it('should parse Map with simple types', async () => { + const args: [ParsedColumnMap, string][] = [ + [ + { + type: 'Map', + key: { type: 'Simple', columnType: 'String', sourceType: 'String' }, + value: { type: 'Simple', columnType: 'UInt8', sourceType: 'UInt8' }, + sourceType: 'Map(String, UInt8)', + }, + 'Map(String, UInt8)', + ], + [ + { + type: 'Map', + key: { type: 'Simple', columnType: 'Int32', sourceType: 'Int32' }, + value: { + type: 'Simple', + columnType: 'Float32', + sourceType: 'Float32', + }, + sourceType: 'Map(Int32, Float32)', + }, + 'Map(Int32, Float32)', + ], + ] + args.forEach(([expected, sourceType]) => { + const result = parseMapType({ columnType: sourceType, sourceType }) + expect(result) + .withContext( + `Expected ${sourceType} to be parsed as a Map with key type ${expected.key.sourceType} and value type ${expected.value.sourceType}`, + ) + .toEqual(expected) + }) + }) + + // TODO: rest of the allowed types. +}) diff --git a/packages/client-common/__tests__/unit/row_binary_columns_parser_nullable.test.ts b/packages/client-common/__tests__/unit/row_binary_columns_parser_nullable.test.ts new file mode 100644 index 00000000..b9e29c15 --- /dev/null +++ b/packages/client-common/__tests__/unit/row_binary_columns_parser_nullable.test.ts @@ -0,0 +1,266 @@ +import type { + ParsedColumnDateTime, + ParsedColumnDateTime64, + ParsedColumnDecimal, + ParsedColumnEnum, + ParsedColumnSimple, +} from '../../src/data_formatter/row_binary/columns_parser' +import { asNullableType } from '../../src/data_formatter/row_binary/columns_parser' + +describe('RowBinary column types parser - Nullable', () => { + it('should wrap a simple type', async () => { + const args: [ParsedColumnSimple, string][] = [ + [ + { type: 'Simple', columnType: 'String', sourceType: 'String' }, + 'Nullable(String)', + ], + [ + { type: 'Simple', columnType: 'UInt8', sourceType: 'UInt8' }, + 'Nullable(UInt8)', + ], + [ + { type: 'Simple', columnType: 'Int32', sourceType: 'Int32' }, + 'Nullable(Int32)', + ], + [ + { type: 'Simple', columnType: 'Float32', sourceType: 'Float32' }, + 'Nullable(Float32)', + ], + ] + args.forEach(([value, sourceType]) => { + const result = asNullableType(value, sourceType) + expect(result) + .withContext( + `Expected ${value.columnType} to be wrapped as ${sourceType}`, + ) + .toEqual({ + type: 'Nullable', + sourceType, + value, + }) + }) + }) + + it('should wrap an Enum', async () => { + const sourceEnum8 = `Enum8('foo' = 42)` + const valuesEnum8 = new Map([[42, 'foo']]) + const sourceEnum16 = `Enum16('bar' = 144, 'qaz' = 500)` + const valuesEnum16 = new Map([ + [144, 'bar'], + [500, 'qaz'], + ]) + const args: [ParsedColumnEnum, string][] = [ + [ + { + type: 'Enum', + intSize: 8, + values: valuesEnum8, + sourceType: sourceEnum8, + }, + 'Nullable(Enum8)', + ], + [ + { + type: 'Enum', + intSize: 16, + values: valuesEnum16, + sourceType: sourceEnum16, + }, + 'Nullable(Enum16)', + ], + ] + args.forEach(([value, sourceType]) => { + const result = asNullableType(value, sourceType) + expect(result) + .withContext(`Expected ${value.type} to be wrapped as ${sourceType}`) + .toEqual({ + type: 'Nullable', + sourceType, + value, + }) + }) + }) + + it('should wrap a Decimal', async () => { + const args: [ParsedColumnDecimal, string][] = [ + [ + { + type: 'Decimal', + params: { intSize: 32, precision: 4, scale: 3 }, + sourceType: 'Decimal(4, 3)', + }, + 'Nullable(Decimal(4, 3))', + ], + [ + { + type: 'Decimal', + params: { intSize: 64, precision: 12, scale: 6 }, + sourceType: 'Decimal(12, 6)', + }, + 'Nullable(Decimal(12, 6))', + ], + [ + { + type: 'Decimal', + params: { intSize: 128, precision: 24, scale: 12 }, + sourceType: 'Decimal(24, 12)', + }, + 'Nullable(Decimal(24, 12))', + ], + [ + { + type: 'Decimal', + params: { intSize: 256, precision: 42, scale: 20 }, + sourceType: 'Decimal(42, 20)', + }, + 'Nullable(Decimal(42, 20))', + ], + ] + args.forEach(([value, sourceType]) => { + const result = asNullableType(value, sourceType) + expect(result) + .withContext( + `Expected ${value.sourceType} to be wrapped as ${sourceType}`, + ) + .toEqual({ + type: 'Nullable', + sourceType, + value, + }) + }) + }) + + it('should wrap a DateTime', async () => { + const args: [ParsedColumnDateTime, string][] = [ + [ + { type: 'DateTime', timezone: null, sourceType: 'DateTime' }, + 'Nullable(DateTime)', + ], + [ + { type: 'DateTime', timezone: 'UTC', sourceType: "DateTime('UTC')" }, + `Nullable(DateTime('UTC'))`, + ], + [ + { type: 'DateTime', timezone: 'GB', sourceType: "DateTime('GB')" }, + `Nullable(DateTime('GB'))`, + ], + [ + { + type: 'DateTime', + timezone: 'Etc/GMT-5', + sourceType: `DateTime('Etc/GMT-5')`, + }, + `Nullable(DateTime('Etc/GMT-5'))`, + ], + ] + args.forEach(([value, sourceType]) => { + const result = asNullableType(value, sourceType) + expect(result) + .withContext( + `Expected ${value.sourceType} to be wrapped as ${sourceType}`, + ) + .toEqual({ + type: 'Nullable', + sourceType, + value, + }) + }) + }) + + it('should wrap a DateTime64', async () => { + const args: [ParsedColumnDateTime64, string][] = [ + [ + { + type: 'DateTime64', + timezone: null, + sourceType: 'DateTime64(0)', + precision: 3, + }, + 'Nullable(DateTime64(0))', + ], + [ + { + type: 'DateTime64', + timezone: null, + sourceType: 'DateTime64(3)', + precision: 3, + }, + 'Nullable(DateTime64(3))', + ], + [ + { + type: 'DateTime64', + timezone: 'UTC', + sourceType: `DateTime64(3, 'UTC')`, + precision: 3, + }, + `Nullable(DateTime64(3, 'UTC'))`, + ], + [ + { + type: 'DateTime64', + timezone: 'GB', + sourceType: `DateTime64(6, 'GB')`, + precision: 6, + }, + `Nullable(DateTime64(6, 'GB'))`, + ], + [ + { + type: 'DateTime64', + timezone: 'Etc/GMT-5', + sourceType: `DateTime64(9, 'Etc/GMT-5')`, + precision: 9, + }, + `Nullable(DateTime64(9, 'Etc/GMT-5'))`, + ], + ] + args.forEach(([value, sourceType]) => { + const result = asNullableType(value, sourceType) + expect(result) + .withContext( + `Expected ${value.sourceType} to be wrapped as ${sourceType}`, + ) + .toEqual({ + type: 'Nullable', + sourceType, + value, + }) + }) + }) + + it('should throw in case of Array or Map', async () => { + const columnUInt8: ParsedColumnSimple = { + type: 'Simple', + columnType: 'UInt8', + sourceType: 'UInt8', + } + const columnString: ParsedColumnSimple = { + type: 'Simple', + columnType: 'String', + sourceType: 'String', + } + expect(() => + asNullableType( + { + type: 'Map', + key: columnUInt8, + value: columnString, + sourceType: 'Map(UInt8, String)', + }, + '...', + ), + ).toThrowError('Map cannot be Nullable') + expect(() => + asNullableType( + { + type: 'Array', + value: columnUInt8, + dimensions: 1, + sourceType: 'Array(UInt8)', + }, + '...', + ), + ).toThrowError('Array cannot be Nullable') + }) +}) diff --git a/packages/client-common/__tests__/unit/row_binary_columns_parser_tuple.test.ts b/packages/client-common/__tests__/unit/row_binary_columns_parser_tuple.test.ts new file mode 100644 index 00000000..49893a20 --- /dev/null +++ b/packages/client-common/__tests__/unit/row_binary_columns_parser_tuple.test.ts @@ -0,0 +1,164 @@ +import { parsedEnumTestArgs } from '@test/utils' +import type { + ParsedColumnDateTime, + ParsedColumnDateTime64, + ParsedColumnFixedString, + ParsedColumnSimple, + ParsedColumnTuple, +} from '../../src/data_formatter/row_binary/columns_parser' +import { parseTupleType } from '../../src/data_formatter/row_binary/columns_parser' + +fdescribe('RowBinary column types parser - Tuple', () => { + it('should parse Tuple with simple types', async () => { + const args: TestArgs[] = [ + { + sourceType: 'Tuple(String, UInt8)', + expected: { + type: 'Tuple', + elements: [ + { type: 'Simple', columnType: 'String', sourceType: 'String' }, + { type: 'Simple', columnType: 'UInt8', sourceType: 'UInt8' }, + ], + sourceType: 'Tuple(String, UInt8)', + }, + }, + { + sourceType: 'Tuple(Int32, Float32)', + expected: { + type: 'Tuple', + elements: [ + { type: 'Simple', columnType: 'Int32', sourceType: 'Int32' }, + { type: 'Simple', columnType: 'Float32', sourceType: 'Float32' }, + ], + sourceType: 'Tuple(Int32, Float32)', + }, + }, + ] + args.forEach(({ expected, sourceType }) => { + const result = parseTupleType({ columnType: sourceType, sourceType }) + expect(result) + .withContext( + `Expected ${sourceType} to have ${joinElements(expected)} elements`, + ) + .toEqual(expected) + }) + }) + + it('should parse Tuple with Decimals', async () => { + const args: TestArgs[] = [ + { + sourceType: 'Tuple(Decimal(7, 2), Decimal(18, 4))', + expected: { + type: 'Tuple', + elements: [ + { + type: 'Decimal', + sourceType: 'Decimal(7, 2)', + params: { precision: 7, scale: 2, intSize: 32 }, + }, + { + type: 'Decimal', + sourceType: 'Decimal(18, 4)', + params: { precision: 18, scale: 4, intSize: 64 }, + }, + ], + sourceType: 'Tuple(Decimal(7, 2), Decimal(18, 4))', + }, + }, + ] + args.forEach(({ expected, sourceType }) => { + const result = parseTupleType({ columnType: sourceType, sourceType }) + expect(result) + .withContext( + `Expected ${sourceType} to have ${joinElements(expected)} elements`, + ) + .toEqual(expected) + }) + }) + + it('should parse Tuple with Enums', async () => { + const args: TestArgs[] = parsedEnumTestArgs.map((enumElement) => { + // e.g. Tuple(String, Enum8('a' = 1)) + const sourceType = `Tuple(${stringElement.sourceType}, ${enumElement.sourceType})` + return { + sourceType, + expected: { + type: 'Tuple', + elements: [stringElement, enumElement], + sourceType, + }, + } + }) + args.forEach(({ expected, sourceType }) => { + const result = parseTupleType({ columnType: sourceType, sourceType }) + expect(result) + .withContext( + `Expected ${sourceType} to have ${joinElements(expected)} elements`, + ) + .toEqual(expected) + }) + }) + + it('should parse Tuple with FixedString/DateTime', async () => { + const fixedStringElement: ParsedColumnFixedString = { + type: 'FixedString', + sourceType: 'FixedString(16)', + sizeBytes: 16, + } + const dateTimeElement: ParsedColumnDateTime = { + type: 'DateTime', + timezone: null, + sourceType: 'DateTime', + } + const dateTimeWithTimezoneElement: ParsedColumnDateTime = { + type: 'DateTime', + timezone: 'Europe/Amsterdam', + sourceType: `DateTime('Europe/Amsterdam')`, + } + const dateTime64Element: ParsedColumnDateTime64 = { + type: 'DateTime64', + timezone: null, + precision: 3, + sourceType: 'DateTime64(3)', + } + const dateTime64WithTimezoneElement: ParsedColumnDateTime64 = { + type: 'DateTime64', + timezone: 'Europe/Amsterdam', + precision: 9, + sourceType: `DateTime64(9, 'Europe/Amsterdam')`, + } + const elements = [ + fixedStringElement, + dateTimeElement, + dateTimeWithTimezoneElement, + dateTime64Element, + dateTime64WithTimezoneElement, + ] + const elementsSourceTypes = elements.map((el) => el.sourceType).join(', ') + const sourceType = `Tuple(${elementsSourceTypes})` + const expected: ParsedColumnTuple = { + type: 'Tuple', + elements, + sourceType, + } + const result = parseTupleType({ columnType: sourceType, sourceType }) + expect(result).toEqual(expected) + }) + + // TODO: Simple types permutations, Nullable, Arrays, Maps, Nested Tuples + + const stringElement: ParsedColumnSimple = { + type: 'Simple', + sourceType: 'String', + columnType: 'String', + } +}) + +function joinElements(expected: ParsedColumnTuple) { + return expected.elements.map((el) => el.sourceType).join(', ') +} + +type TestArgs = { + sourceType: string + expected: ParsedColumnTuple +} diff --git a/packages/client-common/__tests__/unit/row_binary_decoders.test.ts b/packages/client-common/__tests__/unit/row_binary_decoders.test.ts new file mode 100644 index 00000000..587cbfc2 --- /dev/null +++ b/packages/client-common/__tests__/unit/row_binary_decoders.test.ts @@ -0,0 +1,22 @@ +import { RowBinaryTypesDecoder } from '../../src/data_formatter' + +describe('RowBinary decoders', () => { + it('should decode Date', () => { + const args: [Uint8Array, Date][] = [ + [new Uint8Array([0x00, 0x00]), new Date('1970-01-01T00:00:00.000Z')], + [new Uint8Array([0x01, 0x00]), new Date('1970-01-02T00:00:00.000Z')], + [new Uint8Array([0x02, 0x00]), new Date('1970-01-03T00:00:00.000Z')], + [new Uint8Array([0x10, 0x00]), new Date('1970-01-17T00:00:00.000Z')], + [new Uint8Array([0x4a, 0x4d]), new Date('2024-03-04T00:00:00.000Z')], + [new Uint8Array([0xff, 0xff]), new Date('2149-06-06T00:00:00.000Z')], + ] + args.forEach(([src, expected]) => { + const res = RowBinaryTypesDecoder.date(Buffer.from(src), 0)! + expect(+res[0]) + .withContext( + `Decoded ${src.toString()}. Result ${res[0]} != expected ${expected}`, + ) + .toEqual(+expected) + }) + }) +}) diff --git a/packages/client-common/__tests__/utils/index.ts b/packages/client-common/__tests__/utils/index.ts index 08966f5f..5b32c507 100644 --- a/packages/client-common/__tests__/utils/index.ts +++ b/packages/client-common/__tests__/utils/index.ts @@ -11,4 +11,5 @@ export { TestEnv } from './test_env' export { sleep } from './sleep' export { whenOnEnv } from './jasmine' export { getRandomInt } from './random' +export * from './row_binary' export * from './permutations' diff --git a/packages/client-common/__tests__/utils/row_binary/index.ts b/packages/client-common/__tests__/utils/row_binary/index.ts new file mode 100644 index 00000000..25981934 --- /dev/null +++ b/packages/client-common/__tests__/utils/row_binary/index.ts @@ -0,0 +1 @@ +export * from './row_binary_test_args' diff --git a/packages/client-common/__tests__/utils/row_binary/row_binary_test_args.ts b/packages/client-common/__tests__/utils/row_binary/row_binary_test_args.ts new file mode 100644 index 00000000..53c4f5ef --- /dev/null +++ b/packages/client-common/__tests__/utils/row_binary/row_binary_test_args.ts @@ -0,0 +1,108 @@ +import type { ParsedColumnEnum } from '../../../src/data_formatter/row_binary/columns_parser' + +export const enumTypes: ['Enum8' | 'Enum16', 8 | 16][] = [ + ['Enum8', 8], + ['Enum16', 16], +] + +export const parsedEnumTestArgs: ParsedColumnEnum[] = enumTypes.flatMap( + ([enumType, intSize]) => [ + { + type: 'Enum', + sourceType: `${enumType}('a' = 1)`, + values: new Map([[1, 'a']]), + intSize, + }, + { + type: 'Enum', + sourceType: `${enumType}('a' = 0, 'b' = 2)`, + values: new Map([ + [0, 'a'], + [2, 'b'], + ]), + intSize, + }, + { + type: 'Enum', + sourceType: `${enumType}('a' = 1, 'b' = 2, 'c' = 42)`, + values: new Map([ + [1, 'a'], + [2, 'b'], + [42, 'c'], + ]), + intSize, + }, + { + type: 'Enum', + sourceType: `${enumType}('f\\'' = 1, 'x =' = 2, 'b\\'\\'\\'' = 3, '\\'c=4=' = 42, '4' = 100)`, + values: new Map([ + [1, "f\\'"], + [2, 'x ='], + [3, "b\\'\\'\\'"], + [42, "\\'c=4="], + [100, '4'], + ]), + intSize, + }, + { + type: 'Enum', + sourceType: `${enumType}('f\\'()' = 1)`, + values: new Map([[1, "f\\'()"]]), + intSize, + }, + { + type: 'Enum', + sourceType: `${enumType}('\\'' = 0)`, + values: new Map([[0, `\\'`]]), + intSize, + }, + { + type: 'Enum', + sourceType: `${enumType}('' = 0)`, + values: new Map([[0, '']]), + intSize, + }, + { + type: 'Enum', + sourceType: `${enumType}('' = 42)`, + values: new Map([[42, '']]), + intSize, + }, + { + type: 'Enum', + sourceType: `${enumType}('foo' = 1, '' = 42)`, + values: new Map([ + [1, 'foo'], + [42, ''], + ]), + intSize, + }, + { + type: 'Enum', + sourceType: `${enumType}('' = 0, 'foo' = 42)`, + values: new Map([ + [0, ''], + [42, 'foo'], + ]), + intSize, + }, + { + type: 'Enum', + sourceType: `${enumType}('(' = 1)`, + values: new Map([[1, '(']]), + intSize, + }, + { + type: 'Enum', + sourceType: `${enumType}(')' = 1)`, + values: new Map([[1, ')']]), + intSize, + }, + { + type: 'Enum', + sourceType: `${enumType}('()' = 1)`, + values: new Map([[1, '()']]), + intSize, + }, + ], +) diff --git a/packages/client-common/src/client.ts b/packages/client-common/src/client.ts index 44de1a08..1bf1021b 100644 --- a/packages/client-common/src/client.ts +++ b/packages/client-common/src/client.ts @@ -248,7 +248,11 @@ export class ClickHouseClient { function formatQuery(query: string, format: DataFormat): string { query = query.trim() query = removeTrailingSemi(query) - return query + ' \nFORMAT ' + format + return ( + query + + ' \nFORMAT ' + + (format === 'RowBinary' ? 'RowBinaryWithNamesAndTypes' : format) + ) } function removeTrailingSemi(query: string) { diff --git a/packages/client-common/src/data_formatter/formatter.ts b/packages/client-common/src/data_formatter/formatter.ts index 1261d843..81ce0844 100644 --- a/packages/client-common/src/data_formatter/formatter.ts +++ b/packages/client-common/src/data_formatter/formatter.ts @@ -33,6 +33,9 @@ const supportedRawFormats = [ 'CustomSeparatedWithNames', 'CustomSeparatedWithNamesAndTypes', 'Parquet', + // translates to RowBinaryWithNamesAndTypes under the hood (see client/formatQuery); + // we expose a shorter name to the user for simplicity. + 'RowBinary', ] as const /** CSV, TSV, etc. - can be streamed, but cannot be decoded as JSON. */ diff --git a/packages/client-common/src/data_formatter/index.ts b/packages/client-common/src/data_formatter/index.ts index 8f880b33..a4f5ce41 100644 --- a/packages/client-common/src/data_formatter/index.ts +++ b/packages/client-common/src/data_formatter/index.ts @@ -1,3 +1,4 @@ export * from './formatter' export { formatQueryParams } from './format_query_params' export { formatQuerySettings } from './format_query_settings' +export * from './row_binary' diff --git a/packages/client-common/src/data_formatter/row_binary/columns_header.ts b/packages/client-common/src/data_formatter/row_binary/columns_header.ts new file mode 100644 index 00000000..3a5b6d6c --- /dev/null +++ b/packages/client-common/src/data_formatter/row_binary/columns_header.ts @@ -0,0 +1,143 @@ +import type { + DecimalParams, + ParsedColumnArray, + ParsedColumnNullable, + ParsedColumnType, +} from './columns_parser' +import { parseColumnType } from './columns_parser' +import { ClickHouseRowBinaryError } from './errors' +import type { DecodeResult } from './read_bytes' +import { readBytesAsUnsignedLEB128 } from './read_bytes' +import type { SimpleTypeDecoder } from './types' +import { RowBinarySimpleDecoders, RowBinaryTypesDecoder } from './types' + +export type DecodedColumns = DecodeResult<{ + names: string[] + types: ParsedColumnType[] + decoders: SimpleTypeDecoder[] +}> + +/** @throws ClickHouseRowBinaryError */ +export class RowBinaryColumnsHeader { + static decode(src: Buffer): DecodedColumns { + const res = readBytesAsUnsignedLEB128(src, 0) + if (res === null) { + throw ClickHouseRowBinaryError.headerDecodingError( + 'Not enough data to decode number of columns', + {}, + ) + } + const numColumns = res[0] + if (numColumns === 0) { + throw ClickHouseRowBinaryError.headerDecodingError( + 'Unexpected zero number of columns', + {}, + ) + } + let nextLoc = res[1] + const names = new Array(numColumns) + const types = new Array(numColumns) + const decoders = new Array(numColumns) + for (let i = 0; i < numColumns; i++) { + const res = RowBinaryTypesDecoder.string(src, nextLoc) + if (res === null) { + throw ClickHouseRowBinaryError.headerDecodingError( + `Not enough data to decode column name`, + { i, names, numColumns, nextLoc }, + ) + } + nextLoc = res[1] + names[i] = res[0] + } + for (let i = 0; i < numColumns; i++) { + const res = RowBinaryTypesDecoder.string(src, nextLoc) + if (res === null) { + throw ClickHouseRowBinaryError.headerDecodingError( + `Not enough data to decode column type`, + { i, names, types, numColumns, nextLoc }, + ) + } + nextLoc = res[1] + const col = parseColumnType(res[0]) + types[i] = col + switch (col.type) { + case 'Simple': + decoders[i] = RowBinarySimpleDecoders[col.columnType] + break + case 'Decimal': + decoders[i] = getDecimalDecoder(col.params) + break + case 'Array': + decoders[i] = getArrayDecoder(col) + break + case 'Nullable': + decoders[i] = getNullableDecoder(col) + break + default: + throw ClickHouseRowBinaryError.headerDecodingError( + `Unsupported column type ${col.type}`, + { col }, + ) + } + } + // console.log(`Decoded columns:`, names, types) + return [{ names, types, decoders }, nextLoc] + } +} + +function getDecimalDecoder(decimalParams: DecimalParams): SimpleTypeDecoder { + const intSize = decimalParams.intSize + if (intSize === 32) { + return RowBinaryTypesDecoder.decimal32(decimalParams.scale) + } + if (intSize === 64) { + return RowBinaryTypesDecoder.decimal64(decimalParams.scale) + } + // for tests only (128 and 256 support is there) + throw new Error(`Unsupported Decimal size: ${intSize}`) +} + +function getEnumDecoder( + intSize: 8 | 16, + values: Map, +): SimpleTypeDecoder { + if (intSize === 8) { + return RowBinaryTypesDecoder.enum8(values) + } + if (intSize === 16) { + return RowBinaryTypesDecoder.enum16(values) + } + throw new Error(`Unsupported Enum size: ${intSize}`) +} + +function getArrayDecoder(col: ParsedColumnArray): SimpleTypeDecoder { + let valueDecoder + if (col.value.type === 'Simple') { + valueDecoder = RowBinarySimpleDecoders[col.value.columnType] + } else if (col.value.type === 'Decimal') { + valueDecoder = getDecimalDecoder(col.value.params) + } else if (col.value.type === 'Enum') { + valueDecoder = getEnumDecoder(col.value.intSize, col.value.values) + } else if (col.value.type === 'Nullable') { + valueDecoder = getNullableDecoder(col.value) + } else { + // FIXME: add other types + throw new Error(`Unsupported Array value type: ${col.value.type}`) + } + return RowBinaryTypesDecoder.array(valueDecoder, col.dimensions) +} + +function getNullableDecoder(col: ParsedColumnNullable) { + let valueDecoder + if (col.value.type === 'Simple') { + valueDecoder = RowBinarySimpleDecoders[col.value.columnType] + } else if (col.value.type === 'Decimal') { + valueDecoder = getDecimalDecoder(col.value.params) + } else if (col.value.type === 'Enum') { + valueDecoder = getEnumDecoder(col.value.intSize, col.value.values) + } else { + // FIXME: add other types + throw new Error(`Unsupported Nullable value type: ${col.value.type}`) + } + return RowBinaryTypesDecoder.nullable(valueDecoder) +} diff --git a/packages/client-common/src/data_formatter/row_binary/columns_parser.ts b/packages/client-common/src/data_formatter/row_binary/columns_parser.ts new file mode 100644 index 00000000..b3b63ce0 --- /dev/null +++ b/packages/client-common/src/data_formatter/row_binary/columns_parser.ts @@ -0,0 +1,683 @@ +import { ClickHouseRowBinaryError } from './errors' +import type { SimpleColumnType } from './types' +import { RowBinarySimpleDecoders } from './types' + +export interface ParsedColumnSimple { + type: 'Simple' + /** Without LowCardinality and Nullable. For example: + * * UInt8 -> UInt8 + * * LowCardinality(Nullable(String)) -> String */ + columnType: SimpleColumnType + /** The original type before parsing. */ + sourceType: string +} + +export interface ParsedColumnFixedString { + type: 'FixedString' + sizeBytes: number + sourceType: string +} + +export interface ParsedColumnDateTime { + type: 'DateTime' + timezone: string | null + sourceType: string +} + +export interface ParsedColumnDateTime64 { + type: 'DateTime64' + timezone: string | null + /** Valid range: [0 : 9] */ + precision: number + sourceType: string +} + +export interface ParsedColumnEnum { + type: 'Enum' + /** Index to name */ + values: Map + /** UInt8 or UInt16 */ + intSize: 8 | 16 + sourceType: string +} + +/** Int size for Decimal depends on the Precision + * * 32 bits for precision < 10 (JS number) + * * 64 bits for precision < 19 (JS BigInt) + * * 128 bits for precision < 39 (JS BigInt) + * * 256 bits for precision >= 39 (JS BigInt) + */ +export interface DecimalParams { + precision: number + scale: number + intSize: 32 | 64 | 128 | 256 +} +export interface ParsedColumnDecimal { + type: 'Decimal' + params: DecimalParams + sourceType: string +} + +/** Tuple, Array or Map itself cannot be Nullable */ +export interface ParsedColumnNullable { + type: 'Nullable' + value: + | ParsedColumnSimple + | ParsedColumnEnum + | ParsedColumnDecimal + | ParsedColumnFixedString + | ParsedColumnDateTime + | ParsedColumnDateTime64 + sourceType: string +} + +/** Array cannot be Nullable or LowCardinality, but its value type can be. + * Arrays can be multidimensional, e.g. Array(Array(Array(T))). + * Arrays are allowed to have a Map as the value type. + */ +export interface ParsedColumnArray { + type: 'Array' + value: + | ParsedColumnNullable + | ParsedColumnSimple + | ParsedColumnFixedString + | ParsedColumnDecimal + | ParsedColumnEnum + | ParsedColumnMap + | ParsedColumnDateTime + | ParsedColumnDateTime64 + | ParsedColumnTuple + /** Array(T) = 1 dimension, Array(Array(T)) = 2, etc. */ + dimensions: number + sourceType: string +} + +/** @see https://clickhouse.com/docs/en/sql-reference/data-types/map */ +export interface ParsedColumnMap { + type: 'Map' + /** Possible key types: + * - String, Integer, UUID, Date, Date32 ({@link ParsedColumnSimple}) + * - FixedString + * - DateTime + * - Enum + */ + key: + | ParsedColumnSimple + | ParsedColumnFixedString + | ParsedColumnEnum + | ParsedColumnDateTime + /** Value types are arbitrary, including Map, Array, and Tuple. */ + value: ParsedColumnType + sourceType: string +} + +export interface ParsedColumnTuple { + type: 'Tuple' + /** Element types are arbitrary, including Map, Array, and Tuple. */ + elements: ParsedColumnType[] + sourceType: string +} + +export type ParsedColumnType = + | ParsedColumnSimple + | ParsedColumnEnum + | ParsedColumnFixedString + | ParsedColumnNullable + | ParsedColumnDecimal + | ParsedColumnDateTime + | ParsedColumnDateTime64 + | ParsedColumnArray + | ParsedColumnTuple + | ParsedColumnMap + +export function parseColumnType(sourceType: string): ParsedColumnType { + let columnType = sourceType + let isNullable = false + if (columnType.startsWith(LowCardinalityPrefix)) { + columnType = columnType.slice(LowCardinalityPrefix.length, -1) + } + if (columnType.startsWith(NullablePrefix)) { + columnType = columnType.slice(NullablePrefix.length, -1) + isNullable = true + } + let result: ParsedColumnType + if (columnType in RowBinarySimpleDecoders) { + result = { + type: 'Simple', + columnType: columnType as SimpleColumnType, + sourceType, + } + } else if (columnType.startsWith(DecimalPrefix)) { + result = parseDecimalType({ + sourceType, + columnType, + }) + } else if (columnType.startsWith(DateTime64Prefix)) { + result = parseDateTime64Type({ sourceType, columnType }) + } else if (columnType.startsWith(DateTimePrefix)) { + result = parseDateTimeType({ sourceType, columnType }) + } else if (columnType.startsWith(FixedStringPrefix)) { + result = parseFixedStringType({ sourceType, columnType }) + } else if ( + columnType.startsWith(Enum8Prefix) || + columnType.startsWith(Enum16Prefix) + ) { + result = parseEnumType({ sourceType, columnType }) + } else if (columnType.startsWith(ArrayPrefix)) { + result = parseArrayType({ sourceType, columnType }) + } else if (columnType.startsWith(MapPrefix)) { + result = parseMapType({ sourceType, columnType }) + } else if (columnType.startsWith(TuplePrefix)) { + result = parseTupleType({ sourceType, columnType }) + } else { + throw ClickHouseRowBinaryError.headerDecodingError( + 'Unsupported column type', + { columnType }, + ) + } + if (isNullable) { + return asNullableType(result, sourceType) + } else { + return result + } +} + +export function parseDecimalType({ + columnType, + sourceType, +}: ParseColumnTypeParams): ParsedColumnDecimal { + if ( + !columnType.startsWith(DecimalPrefix) || + columnType.length < DecimalPrefix.length + 5 // Decimal(1, 0) is the shortest valid definition + ) { + throw ClickHouseRowBinaryError.headerDecodingError('Invalid Decimal type', { + sourceType, + columnType, + }) + } + const split = columnType.slice(DecimalPrefix.length, -1).split(', ') + if (split.length !== 2) { + throw ClickHouseRowBinaryError.headerDecodingError( + 'Expected Decimal type to have both precision and scale', + { + sourceType, + columnType, + split, + }, + ) + } + let intSize: DecimalParams['intSize'] = 32 + const precision = parseInt(split[0], 10) + if (Number.isNaN(precision) || precision < 1 || precision > 76) { + throw ClickHouseRowBinaryError.headerDecodingError( + 'Invalid Decimal precision', + { columnType, sourceType, precision }, + ) + } + const scale = parseInt(split[1], 10) + if (Number.isNaN(scale) || scale < 0 || scale > precision) { + throw ClickHouseRowBinaryError.headerDecodingError( + 'Invalid Decimal scale', + { columnType, sourceType, precision, scale }, + ) + } + if (precision > 38) { + intSize = 256 + } else if (precision > 18) { + intSize = 128 + } else if (precision > 9) { + intSize = 64 + } + return { + type: 'Decimal', + params: { + precision, + scale, + intSize, + }, + sourceType, + } +} + +export function parseEnumType({ + columnType, + sourceType, +}: ParseColumnTypeParams): ParsedColumnEnum { + let intSize: 8 | 16 + if (columnType.startsWith(Enum8Prefix)) { + columnType = columnType.slice(Enum8Prefix.length, -1) + intSize = 8 + } else if (columnType.startsWith(Enum16Prefix)) { + columnType = columnType.slice(Enum16Prefix.length, -1) + intSize = 16 + } else { + throw ClickHouseRowBinaryError.headerDecodingError( + 'Expected Enum to be either Enum8 or Enum16', + { + columnType, + sourceType, + }, + ) + } + // The minimal allowed Enum definition is Enum8('' = 0), i.e. 6 chars inside. + if (columnType.length < 6) { + throw ClickHouseRowBinaryError.headerDecodingError( + 'Invalid Enum type values', + { + columnType, + sourceType, + }, + ) + } + + const names: string[] = [] + const indices: number[] = [] + let parsingName = true // false when parsing the index + let charEscaped = false // we should ignore escaped ticks + let startIndex = 1 // Skip the first ' + + // Should support the most complicated enums, such as Enum8('f\'' = 1, 'x =' = 2, 'b\'\'\'' = 3, '\'c=4=' = 42, '4' = 100) + for (let i = 1; i < columnType.length; i++) { + if (parsingName) { + if (charEscaped) { + charEscaped = false + } else { + if (columnType.charCodeAt(i) === BackslashASCII) { + charEscaped = true + } else if (columnType.charCodeAt(i) === SingleQuoteASCII) { + // non-escaped closing tick - push the name + const name = columnType.slice(startIndex, i) + if (names.includes(name)) { + throw ClickHouseRowBinaryError.headerDecodingError( + 'Duplicate Enum name', + { columnType, sourceType, name, names, indices }, + ) + } + names.push(name) + i += 4 // skip ` = ` and the first digit, as it will always have at least one. + startIndex = i + parsingName = false + } + } + } + // Parsing the index, skipping next iterations until the first non-digit one + else if ( + columnType.charCodeAt(i) < ZeroASCII || + columnType.charCodeAt(i) > NineASCII + ) { + pushEnumIndex(startIndex, i) + // the char at this index should be comma. + i += 2 // skip ` '`, but not the first char - ClickHouse allows something like Enum8('foo' = 0, '' = 42) + startIndex = i + 1 + parsingName = true + charEscaped = false + } + } + + // Push the last index + pushEnumIndex(startIndex, columnType.length) + if (names.length !== indices.length) { + throw ClickHouseRowBinaryError.headerDecodingError( + 'Expected Enum to have the same number of names and indices', + { columnType, sourceType, names, indices }, + ) + } + + const values = new Map() + for (let i = 0; i < names.length; i++) { + values.set(indices[i], names[i]) + } + return { + type: 'Enum', + values, + intSize, + sourceType, + } + + function pushEnumIndex(start: number, end: number) { + const index = parseInt(columnType.slice(start, end), 10) + if (Number.isNaN(index) || index < 0) { + throw ClickHouseRowBinaryError.headerDecodingError( + 'Expected Enum index to be a valid number', + { + columnType, + sourceType, + names, + indices, + index, + start, + end, + }, + ) + } + if (indices.includes(index)) { + throw ClickHouseRowBinaryError.headerDecodingError( + 'Duplicate Enum index', + { columnType, sourceType, index, names, indices }, + ) + } + indices.push(index) + } +} + +export function parseMapType({ + columnType, + sourceType, +}: ParseColumnTypeParams): ParsedColumnMap { + if ( + !columnType.startsWith(MapPrefix) || + columnType.length < MapPrefix.length + 11 // the shortest definition seems to be Map(Int8, Int8) + ) { + throw ClickHouseRowBinaryError.headerDecodingError('Invalid Map type', { + columnType, + sourceType, + }) + } + columnType = columnType.slice(MapPrefix.length, -1) + const [keyType, valueType] = getElementsTypes({ columnType, sourceType }, 2) + const key = parseColumnType(keyType) + if ( + key.type === 'DateTime64' || + key.type === 'Nullable' || + key.type === 'Array' || + key.type === 'Map' || + key.type === 'Decimal' || + key.type === 'Tuple' + ) { + throw ClickHouseRowBinaryError.headerDecodingError('Invalid Map key type', { + key, + sourceType, + }) + } + const value = parseColumnType(valueType) + return { + type: 'Map', + key, + value, + sourceType, + } +} + +export function parseTupleType({ + columnType, + sourceType, +}: ParseColumnTypeParams): ParsedColumnTuple { + if ( + !columnType.startsWith(TuplePrefix) || + columnType.length < TuplePrefix.length + 5 // Tuple(Int8) is the shortest valid definition + ) { + throw ClickHouseRowBinaryError.headerDecodingError('Invalid Tuple type', { + columnType, + sourceType, + }) + } + columnType = columnType.slice(TuplePrefix.length, -1) + const elements = getElementsTypes({ columnType, sourceType }, 1).map((type) => + parseColumnType(type), + ) + return { + type: 'Tuple', + elements, + sourceType, + } +} + +export function parseArrayType({ + columnType, + sourceType, +}: ParseColumnTypeParams): ParsedColumnArray { + if ( + !columnType.startsWith(ArrayPrefix) || + columnType.length < ArrayPrefix.length + 5 // Array(Int8) is the shortest valid definition + ) { + throw ClickHouseRowBinaryError.headerDecodingError('Invalid Array type', { + columnType, + sourceType, + }) + } + + let dimensions = 0 + while (columnType.length > 0) { + if (columnType.startsWith(ArrayPrefix)) { + columnType = columnType.slice(ArrayPrefix.length, -1) // Array(T) -> T + dimensions++ + } else { + break + } + } + if (dimensions === 0 || dimensions > 10) { + // TODO: check how many we can handle; max 10 seems more than enough. + throw ClickHouseRowBinaryError.headerDecodingError( + 'Expected Array to have between 1 and 10 dimensions', + { columnType }, + ) + } + const value = parseColumnType(columnType) + if (value.type === 'Array') { + throw ClickHouseRowBinaryError.headerDecodingError( + 'Unexpected Array as value type', + { columnType, sourceType }, + ) + } + return { + type: 'Array', + value, + dimensions, + sourceType, + } +} + +export function parseDateTimeType({ + columnType, + sourceType, +}: ParseColumnTypeParams): ParsedColumnDateTime { + if ( + columnType.startsWith(DateTimeWithTimezonePrefix) && + columnType.length > DateTimeWithTimezonePrefix.length + 4 // DateTime('GB') has the least amount of chars + ) { + const timezone = columnType.slice(DateTimeWithTimezonePrefix.length + 1, -2) + return { + type: 'DateTime', + timezone, + sourceType, + } + } else if ( + columnType.startsWith(DateTimePrefix) && + columnType.length === DateTimePrefix.length + ) { + return { + type: 'DateTime', + timezone: null, + sourceType, + } + } else { + throw ClickHouseRowBinaryError.headerDecodingError( + 'Invalid DateTime type', + { + columnType, + sourceType, + }, + ) + } +} + +export function parseDateTime64Type({ + columnType, + sourceType, +}: ParseColumnTypeParams): ParsedColumnDateTime64 { + if ( + !columnType.startsWith(DateTime64Prefix) || + columnType.length < DateTime64Prefix.length + 2 // should at least have a precision + ) { + throw ClickHouseRowBinaryError.headerDecodingError( + 'Invalid DateTime64 type', + { + columnType, + sourceType, + }, + ) + } + const precision = parseInt(columnType[DateTime64Prefix.length], 10) + if (Number.isNaN(precision) || precision < 0 || precision > 9) { + throw ClickHouseRowBinaryError.headerDecodingError( + 'Invalid DateTime64 precision', + { + columnType, + sourceType, + precision, + }, + ) + } + let timezone = null + if (columnType.length > DateTime64Prefix.length + 2) { + // e.g. DateTime64(3, 'UTC') -> UTC + timezone = columnType.slice(DateTime64Prefix.length + 4, -2) + } + return { + type: 'DateTime64', + timezone, + precision, + sourceType, + } +} + +export function parseFixedStringType({ + columnType, + sourceType, +}: ParseColumnTypeParams): ParsedColumnFixedString { + if ( + !columnType.startsWith(FixedStringPrefix) || + columnType.length < FixedStringPrefix.length + 2 // i.e. at least FixedString(1) + ) { + throw ClickHouseRowBinaryError.headerDecodingError( + 'Invalid FixedString type', + { columnType, sourceType }, + ) + } + const sizeBytes = parseInt(columnType.slice(FixedStringPrefix.length, -1), 10) + if (Number.isNaN(sizeBytes) || sizeBytes < 1) { + throw ClickHouseRowBinaryError.headerDecodingError( + 'Invalid FixedString size in bytes', + { columnType, sourceType, sizeBytes }, + ) + } + return { + type: 'FixedString', + sizeBytes, + sourceType, + } +} + +export function asNullableType( + value: ParsedColumnType, + sourceType: string, +): ParsedColumnNullable { + if ( + value.type === 'Array' || + value.type === 'Map' || + value.type === 'Tuple' || + value.type === 'Nullable' + ) { + throw ClickHouseRowBinaryError.headerDecodingError( + `${value.type} cannot be Nullable`, + { sourceType }, + ) + } + if (value.sourceType.startsWith(NullablePrefix)) { + value.sourceType = value.sourceType.slice(NullablePrefix.length, -1) + } + return { + type: 'Nullable', + sourceType, + value, + } +} + +/** Used for Map key/value types and Tuple elements. + * * `String, UInt8` results in [`String`, `UInt8`]. + * * `String, UInt8, Array(String)` results in [`String`, `UInt8`, `Array(String)`]. + * * Throws if parsed values are below the required minimum. */ +export function getElementsTypes( + { columnType, sourceType }: ParseColumnTypeParams, + minElements: number, +): string[] { + const elements: string[] = [] + /** Consider the element type parsed once we reach a comma outside of parens AND after an unescaped tick. + * The most complicated cases are values names in the self-defined Enum types: + * * `Tuple(Enum8('f\'()' = 1))` -> `f\'()` + * * `Tuple(Enum8('(' = 1))` -> `(` + * See also: {@link parseEnumType }, which works similarly (but has to deal with the indices following the names). */ + let openParens = 0 + let quoteOpen = false + let charEscaped = false + let lastElementIndex = 0 + for (let i = 0; i < columnType.length; i++) { + // prettier-ignore + // console.log(i, 'Current char:', columnType[i], 'openParens:', openParens, 'quoteOpen:', quoteOpen, 'charEscaped:', charEscaped) + if (charEscaped) { + charEscaped = false + } else if (columnType.charCodeAt(i) === BackslashASCII) { + charEscaped = true + } else if (columnType.charCodeAt(i) === SingleQuoteASCII) { + quoteOpen = !quoteOpen // unescaped quote + } else { + if (!quoteOpen) { + if (columnType.charCodeAt(i) === LeftParenASCII) { + openParens++ + } else if (columnType.charCodeAt(i) === RightParenASCII) { + openParens-- + } else if (columnType.charCodeAt(i) === CommaASCII) { + if (openParens === 0) { + elements.push(columnType.slice(lastElementIndex, i)) + // console.log('Pushed element:', elements[elements.length - 1]) + i += 2 // skip ', ' + lastElementIndex = i + } + } + } + } + } + + // prettier-ignore + // console.log('Final elements:', elements, 'nextElementIndex:', lastElementIndex, 'minElements:', minElements, 'openParens:', openParens) + + // Push the remaining part of the type if it seems to be valid (at least all parentheses are closed) + if (!openParens && lastElementIndex < columnType.length - 1) { + elements.push(columnType.slice(lastElementIndex)) + } + if (elements.length < minElements) { + throw ClickHouseRowBinaryError.headerDecodingError( + 'Expected more elements in the type', + { sourceType, columnType, elements, minElements }, + ) + } + return elements +} + +interface ParseColumnTypeParams { + /** A particular type to parse, such as DateTime. */ + columnType: string + /** Full type definition, such as Map(String, DateTime). */ + sourceType: string +} + +const NullablePrefix = 'Nullable(' as const +const LowCardinalityPrefix = 'LowCardinality(' as const +const DecimalPrefix = 'Decimal(' as const +const ArrayPrefix = 'Array(' as const +const MapPrefix = 'Map(' as const +const Enum8Prefix = 'Enum8(' as const +const Enum16Prefix = 'Enum16(' as const +const TuplePrefix = 'Tuple(' as const +const DateTimePrefix = 'DateTime' as const +const DateTimeWithTimezonePrefix = 'DateTime(' as const +const DateTime64Prefix = 'DateTime64(' as const +const FixedStringPrefix = 'FixedString(' as const + +const SingleQuoteASCII = 39 as const +const LeftParenASCII = 40 as const +const RightParenASCII = 41 as const +const CommaASCII = 44 as const +const ZeroASCII = 48 as const +const NineASCII = 57 as const +const BackslashASCII = 92 as const diff --git a/packages/client-common/src/data_formatter/row_binary/errors.ts b/packages/client-common/src/data_formatter/row_binary/errors.ts new file mode 100644 index 00000000..44f903c8 --- /dev/null +++ b/packages/client-common/src/data_formatter/row_binary/errors.ts @@ -0,0 +1,32 @@ +const HeaderDecodingError = 'HEADER_DECODING_ERROR' as const + +export class ClickHouseRowBinaryError extends Error { + readonly args: Record + constructor({ message, args }: ClickHouseRowBinaryError) { + super(message) + this.args = args + + // Set the prototype explicitly, see: + // https://github.com/Microsoft/TypeScript/wiki/Breaking-Changes#extending-built-ins-like-error-array-and-map-may-no-longer-work + Object.setPrototypeOf(this, ClickHouseRowBinaryError.prototype) + } + static headerDecodingError( + message: string, + args?: Record, + ): ClickHouseRowBinaryError { + return new ClickHouseRowBinaryError({ + name: HeaderDecodingError, + args: args ?? {}, + message, + }) + } + static decoderNotFoundError( + col: Record, + ): ClickHouseRowBinaryError { + return new ClickHouseRowBinaryError({ + name: HeaderDecodingError, + message: 'Could find a suitable decoder for this column', + args: { col }, + }) + } +} diff --git a/packages/client-common/src/data_formatter/row_binary/index.ts b/packages/client-common/src/data_formatter/row_binary/index.ts new file mode 100644 index 00000000..07594db4 --- /dev/null +++ b/packages/client-common/src/data_formatter/row_binary/index.ts @@ -0,0 +1,5 @@ +export * from './columns_header' +export * from './read_bytes' +export * from './types' +export * from './errors' +export * from './mappers' diff --git a/packages/client-common/src/data_formatter/row_binary/mappers.ts b/packages/client-common/src/data_formatter/row_binary/mappers.ts new file mode 100644 index 00000000..014509fd --- /dev/null +++ b/packages/client-common/src/data_formatter/row_binary/mappers.ts @@ -0,0 +1,17 @@ +export interface RowBinaryMappers { + date?: (daysSinceEpoch: number) => T + date32?: (daysSinceOrBeforeEpoch: number) => T + datetime?: (secondsSinceEpoch: number, timezone?: string) => T + datetime64?: ( + secondsSinceOrBeforeEpoch: bigint, + nanosOfSecond: number, + timezone?: string, + ) => T + /** Decimal types with scale more than 9: Decimal64, Decimal128, Decimal256 */ + decimal?: (whole: bigint, fractional: bigint) => T + /** Decimal types with scale 9 and less */ + decimal32?: (whole: number, fractional: number) => T +} +export interface RowBinaryResultSetOptions { + mappers?: RowBinaryMappers +} diff --git a/packages/client-common/src/data_formatter/row_binary/read_bytes.ts b/packages/client-common/src/data_formatter/row_binary/read_bytes.ts new file mode 100644 index 00000000..8e45f5ee --- /dev/null +++ b/packages/client-common/src/data_formatter/row_binary/read_bytes.ts @@ -0,0 +1,25 @@ +// Decoded value + the next index to scan from +export type DecodeResult = [T, number] + +// May return null since we cannot determine how many bytes we need to read in advance +export function readBytesAsUnsignedLEB128( + src: Buffer, + loc: number, +): DecodeResult | null { + let result = 0 + let shift = 0 + let ix = 0 + let byte: number + // eslint-disable-next-line no-constant-condition + while (true) { + if (src.length < loc + ix + 1) { + return null + } + byte = src[loc + ix++] + result |= (byte & 0x7f) << shift + if (byte >> 7 === 0) { + return [result, loc + ix] + } + shift += 7 + } +} diff --git a/packages/client-common/src/data_formatter/row_binary/types.ts b/packages/client-common/src/data_formatter/row_binary/types.ts new file mode 100644 index 00000000..62bd553a --- /dev/null +++ b/packages/client-common/src/data_formatter/row_binary/types.ts @@ -0,0 +1,301 @@ +import type { Buffer } from 'buffer' +import type { DecodeResult } from './read_bytes' +import { readBytesAsUnsignedLEB128 } from './read_bytes' + +export type SimpleColumnType = + /** {@link SimpleTypeDecoder} */ + | 'Bool' + | 'UInt8' + | 'Int8' + | 'UInt16' + | 'Int16' + | 'UInt32' + | 'Int32' + | 'UInt64' + | 'Int64' + // | 'UInt128' + // | 'Int128' + // | 'UInt256' + // | 'Int256' + | 'Float32' + | 'Float64' + | 'String' + | 'Date' + | 'Date32' + +export type SimpleTypeDecoder = ( + src: Buffer, + loc: number, +) => DecodeResult | null +export type DecimalTypeDecoder = ( + precision: number, + scale: number, +) => SimpleTypeDecoder +export type NullableTypeDecoder = ( + baseTypeDecoder: SimpleTypeDecoder | DecimalTypeDecoder, +) => SimpleTypeDecoder +export type ArrayTypeDecoder = ( + innerDecoder: SimpleTypeDecoder, + dimensions: number, +) => SimpleTypeDecoder +export type TypeDecoder = + | SimpleTypeDecoder + | DecimalTypeDecoder + | ArrayTypeDecoder + +// TBD: nested key type safety? +export type MapTypeDecoder = ( + keyDecoder: SimpleTypeDecoder, + valueDecoder: + | SimpleTypeDecoder + | ArrayTypeDecoder + | MapTypeDecoder, +) => SimpleTypeDecoder> + +const DayMillis = 24 * 3600 * 1000 + +export class RowBinaryTypesDecoder { + static bool(src: Buffer, loc: number): DecodeResult | null { + if (src.length < loc + 1) return null + return [src[loc] === 1, loc + 1] + } + static uint8(src: Buffer, loc: number): DecodeResult | null { + if (src.length < loc + 1) return null + return [src[loc], loc + 1] + } + static int8(src: Buffer, loc: number): DecodeResult | null { + if (src.length < loc + 1) return null + return [src.readInt8(loc), loc + 1] + } + static uint16(src: Buffer, loc: number): DecodeResult | null { + if (src.length < loc + 2) return null + return [src.readUint16LE(loc), loc + 2] + } + static int16(src: Buffer, loc: number): DecodeResult | null { + if (src.length < loc + 2) return null + return [src.readInt16LE(loc), loc + 2] + } + static uint32(src: Buffer, loc: number): DecodeResult | null { + if (src.length < loc + 4) return null + return [src.readUInt32LE(loc), loc + 4] + } + static int32(src: Buffer, loc: number): DecodeResult | null { + if (src.length < loc + 4) return null + return [src.readInt32LE(loc), loc + 4] + } + static uint64(src: Buffer, loc: number): DecodeResult | null { + if (src.length < loc + 8) return null + return [src.readBigUInt64LE(loc), loc + 8] + } + static int64(src: Buffer, loc: number): DecodeResult | null { + if (src.length < loc + 8) return null + return [src.readBigInt64LE(loc), loc + 8] + } + // static uint128(src: Buffer, loc: number): DecodeResult | null { + // if (src.length < loc + 16) return null + // return [readBytesAsUnsignedBigInt(src, loc, 16), loc + 16] + // } + // static int128(src: Buffer, loc: number): DecodeResult | null { + // if (src.length < loc + 16) return null + // const x = readBytesAsUnsignedBigInt(src, loc, 16) + // return [x < Int128Overflow ? x : x - UInt128Overflow, loc + 16] + // } + // static uint256(src: Buffer, loc: number): DecodeResult | null { + // if (src.length < loc + 32) return null + // return [readBytesAsUnsignedBigInt(src, loc, 32), loc + 32] + // } + // static int256(src: Buffer, loc: number): DecodeResult | null { + // if (src.length < loc + 32) return null + // const x = readBytesAsUnsignedBigInt(src, loc, 32) + // return [x < Int256Overflow ? x : x - UInt256Overflow, loc + 32] + // } + static float32(src: Buffer, loc: number): DecodeResult | null { + if (src.length < loc + 4) return null + // console.log(f32) + return [src.readFloatLE(loc), loc + 4] + } + static float64(src: Buffer, loc: number): DecodeResult | null { + if (src.length < loc + 8) return null + return [src.readDoubleLE(loc), loc + 8] + } + static string(src: Buffer, loc: number): DecodeResult | null { + if (src.length < loc + 1) return null + const res = readBytesAsUnsignedLEB128(src, loc) + if (res === null) { + return null + } + const [length, nextLoc] = res + const endLoc = nextLoc + length + if (src.length < endLoc) return null + return [src.toString('utf8', nextLoc, endLoc), endLoc] + } + static date(src: Buffer, loc: number): DecodeResult | null { + if (src.length < loc + 2) return null + const daysSinceEpoch = src.readUInt16LE(loc) + return [new Date(daysSinceEpoch * DayMillis), loc + 2] + } + static date32(src: Buffer, loc: number): DecodeResult | null { + if (src.length < loc + 4) return null + const daysBeforeOrSinceEpoch = src.readInt32LE(loc) + return [new Date(daysBeforeOrSinceEpoch * DayMillis), loc + 4] + } + static nullable( + baseTypeDecoder: SimpleTypeDecoder, + ): (src: Buffer, loc: number) => DecodeResult | null { + return (src: Buffer, loc: number) => { + if (src.length < loc + 1) return null + const isNull = src[loc] + if (isNull === 1) { + return [null, loc + 1] + } + return baseTypeDecoder(src, loc + 1) + } + } + static enum8( + values: Map, + ): (src: Buffer, loc: number) => DecodeResult | null { + return (src: Buffer, loc: number) => { + if (src.length < loc + 1) return null + const index = src.readUInt8(loc) + const value = values.get(index)! // TODO: handle missing values + return [value, loc + 1] + } + } + static enum16( + values: Map, + ): (src: Buffer, loc: number) => DecodeResult | null { + return (src: Buffer, loc: number) => { + if (src.length < loc + 2) return null + const index = src.readUInt16LE(loc) + const value = values.get(index)! // TODO: handle missing values + return [value, loc + 2] + } + } + // static decimal( + // precision: number, + // scale: number + // ): (src: Buffer, loc: number) => DecodeResult | null { + // const intSize = getDecimalIntSize(precision) + // let scaleMultiplier: number | bigint + // if (intSize === 32) { + // scaleMultiplier = 10 ** scale + // } else { + // scaleMultiplier = BigInt(10 ** scale) + // } + // // const scaleMultiplier = + // // intSize === 32 + // // ? DecimalScaleMultipliersNumber[scale] + // // : DecimalScaleMultipliersBigInt[scale] + // return (src: Buffer, loc: number) => { + // if (intSize === 32) { + // const res = RowBinaryTypesDecoder.int32(src, loc) + // if (res === null) return null + // const whole = ~~(res[0] / (scaleMultiplier as number)) + // const fractional = res[0] % (scaleMultiplier as number) + // return [`${whole.toString(10)}.${fractional.toString(10)}`, res[1]] + // } + // let res: DecodeResult | null + // if (intSize === 64) { + // if (src.length < loc + 8) return null + // const x = readBytesAsUnsignedBigInt(src, loc, 8) + // res = [x < Int64Overflow ? x : x - UInt64Overflow, loc + 8] + // } else if (intSize === 128) { + // res = RowBinaryTypesDecoder.int128(src, loc) + // } else if (intSize === 256) { + // res = RowBinaryTypesDecoder.int256(src, loc) + // } else { + // throw new Error(`Unsupported int size: ${intSize}`) + // } + // if (res === null) return null + // const whole = res[0] / (scaleMultiplier as bigint) + // const fractional = res[0] % (scaleMultiplier as bigint) + // return [`${whole.toString(10)}.${fractional.toString(10)}`, res[1]] + // } + // } + static decimal32( + scale: number, + mapper?: (whole: number, fractional: number) => T, + ): (src: Buffer, loc: number) => DecodeResult | null { + const scaleMultiplier = 10 ** scale + return (src: Buffer, loc: number) => { + if (src.length < loc + 4) return null + const fullDecimal32 = src.readInt32LE(loc) + const whole = ~~(fullDecimal32 / (scaleMultiplier as number)) + const fractional = fullDecimal32 % (scaleMultiplier as number) + if (mapper !== undefined) { + return [mapper(whole, fractional), loc + 4] + } + return [`${whole.toString(10)}.${fractional.toString(10)}`, loc + 4] + } + } + static decimal64( + scale: number, + ): (src: Buffer, loc: number) => DecodeResult | null { + // const scaleMultiplier = BigInt(10) ** BigInt(scale) + return (src: Buffer, loc: number) => { + if (src.length < loc + 8) return null + const fullDecimal64 = src.readBigInt64LE(loc) + // Avoid BigInt math; it's slower than just dealing with a string + const str = fullDecimal64.toString(10) + if (scale >= str.length) { + return [`0.${str}`, loc + 8] + } else { + const dotIndex = str.length - scale + return [`${str.slice(0, dotIndex)}.${str.slice(dotIndex)}`, loc + 8] + } + } + } + static array( + innerDecoder: + | SimpleTypeDecoder + | ReturnType + | ReturnType>, + dimensions = 0, + ): (src: Buffer, loc: number) => DecodeResult> | null { + return (src: Buffer, loc: number) => { + const leb128 = readBytesAsUnsignedLEB128(src, loc) + if (leb128 === null) return null + const result = new Array(leb128[0]) + if (dimensions === 0) { + for (let i = 0; i < leb128[0]; i++) { + const res = innerDecoder(src, leb128[1]) + if (res === null) return null + result[i] = res[0] + } + } else { + return this.array(innerDecoder, dimensions - 1)(src, leb128[1]) + } + return null + } + } +} + +export function getDecimalIntSize(precision: number): 32 | 128 | 64 | 256 { + if (precision < 10) return 32 + if (precision < 19) return 64 + if (precision < 39) return 128 + return 256 +} + +export const RowBinarySimpleDecoders: { + [key in SimpleColumnType]: SimpleTypeDecoder +} = { + Bool: RowBinaryTypesDecoder.bool, + UInt8: RowBinaryTypesDecoder.uint8, + Int8: RowBinaryTypesDecoder.int8, + UInt16: RowBinaryTypesDecoder.uint16, + Int16: RowBinaryTypesDecoder.int16, + UInt32: RowBinaryTypesDecoder.uint32, + Int32: RowBinaryTypesDecoder.int32, + UInt64: RowBinaryTypesDecoder.uint64, + Int64: RowBinaryTypesDecoder.int64, + // UInt128: RowBinaryTypesDecoder.uint128, + // Int128: RowBinaryTypesDecoder.int128, + // UInt256: RowBinaryTypesDecoder.uint256, + // Int256: RowBinaryTypesDecoder.int256, + Float32: RowBinaryTypesDecoder.float32, + Float64: RowBinaryTypesDecoder.float64, + String: RowBinaryTypesDecoder.string, + Date: RowBinaryTypesDecoder.date, + Date32: RowBinaryTypesDecoder.date32, +} diff --git a/packages/client-node/__tests__/integration/node_stream_row_binary.test.ts b/packages/client-node/__tests__/integration/node_stream_row_binary.test.ts new file mode 100644 index 00000000..d15ebe51 --- /dev/null +++ b/packages/client-node/__tests__/integration/node_stream_row_binary.test.ts @@ -0,0 +1,125 @@ +import type { ClickHouseClient } from '@clickhouse/client-common' +import { createTestClient, guid } from '@test/utils' +import type Stream from 'stream' + +describe('[Node.js] stream RowBinary', () => { + let client: ClickHouseClient + let tableName: string + + beforeEach(async () => { + client = createTestClient() + }) + afterEach(async () => { + await client.close() + }) + + it('should stream booleans and integers up to 32 bits', async () => { + const columns = [ + ['b', 'Bool'], + ['i8', 'Int8'], + ['i16', 'Int16'], + ['i32', 'Int32'], + ['u8', 'UInt8'], + ['u16', 'UInt16'], + ['u32', 'UInt32'], + ] + const values = [ + [true, 127, 32767, 2147483647, 255, 65535, 4294967295], + [false, -128, -32768, -2147483648, 120, 1234, 51234], + ] + await createTableWithData(columns, values, 'int') + await selectAndAssert(values) + }) + + it('should stream 64/128/256-bit integers', async () => { + const columns = [ + ['i64', 'Int64'], + ['i128', 'Int128'], + ['i256', 'Int256'], + // ['u64', 'UInt64'], + // ['u128', 'UInt128'], + // ['u256', 'UInt256'], + ] + const insertValues = [ + [ + '9223372036854775807', + '170141183460469231731687303715884105727', + '57896044618658097711785492504343953926634992332820282019728792003956564819967', + // '18446744073709551615', + // '340282366920938463463374607431768211455', + // '115792089237316195423570985008687907853269984665640564039457584007913129639935', + ], + [ + '-9223372036854775808', + '-170141183460469231731687303715884105728', + '-57896044618658097711785492504343953926634992332820282019728792003956564819968', + // '120', + // '1234', + // '51234', + ], + ] + const assertValues = [ + [ + BigInt('9223372036854775807'), + BigInt('170141183460469231731687303715884105727'), + BigInt( + '57896044618658097711785492504343953926634992332820282019728792003956564819967', + ), + // BigInt('18446744073709551615'), + // BigInt('340282366920938463463374607431768211455'), + // BigInt( + // '115792089237316195423570985008687907853269984665640564039457584007913129639935' + // ), + ], + [ + BigInt('-9223372036854775808'), + BigInt('-170141183460469231731687303715884105728'), + BigInt( + '-57896044618658097711785492504343953926634992332820282019728792003956564819968', + ), + // BigInt('120'), + // BigInt('1234'), + // BigInt('51234'), + ], + ] + await createTableWithData(columns, insertValues, 'bigint') + await selectAndAssert(assertValues) + }) + + async function selectAndAssert(assertValues: unknown[][]) { + const rs = await client.query({ + query: `SELECT * EXCEPT id FROM ${tableName} ORDER BY id ASC`, + format: 'RowBinary', + }) + const values: unknown[][] = [] + for await (const rows of rs.stream()) { + rows.forEach((row: unknown[]) => { + values.push(row) + }) + } + expect(values).toEqual(assertValues) + } + + async function createTableWithData( + colNameToType: string[][], + insertValues: unknown[][], + testName: string, + ) { + tableName = `insert_stream_row_binary_${testName}_${guid()}` + const cols = colNameToType + .map(([name, type]) => `${name} ${type}`) + .join(', ') + await client.command({ + query: `CREATE TABLE ${tableName} (id UInt32, ${cols}) ENGINE MergeTree ORDER BY (id)`, + clickhouse_settings: { + wait_end_of_query: 1, + }, + }) + let id = 1 + await client.insert({ + table: tableName, + values: insertValues.map((value) => [id++, ...value]), + format: 'JSONCompactEachRow', + }) + } +}) diff --git a/packages/client-node/src/config.ts b/packages/client-node/src/config.ts index 17acd78f..714e69b1 100644 --- a/packages/client-node/src/config.ts +++ b/packages/client-node/src/config.ts @@ -10,6 +10,7 @@ import { import type Stream from 'stream' import { createConnection, type TLSParams } from './connection' import { ResultSet } from './result_set' +import { RowBinaryResultSet } from './row_binary_result_set' import { NodeValuesEncoder } from './utils' export type NodeClickHouseClientConfigOptions = @@ -102,7 +103,13 @@ export const NodeConfigImpl: Required< stream: Stream.Readable, format: DataFormat, query_id: string, - ) => new ResultSet(stream, format, query_id)) as any, + ) => { + if (format === 'RowBinary') { + return new RowBinaryResultSet(stream, format, query_id) + } else { + return new ResultSet(stream, format, query_id) + } + }) as any, close_stream: async (stream) => { stream.destroy() }, diff --git a/packages/client-node/src/row_binary_result_set.ts b/packages/client-node/src/row_binary_result_set.ts new file mode 100644 index 00000000..15b79ab0 --- /dev/null +++ b/packages/client-node/src/row_binary_result_set.ts @@ -0,0 +1,221 @@ +import type { BaseResultSet, DataFormat } from '@clickhouse/client-common' +import type { DecodedColumns } from '@clickhouse/client-common/src/data_formatter/row_binary/columns_header' +import { RowBinaryColumnsHeader } from '@clickhouse/client-common/src/data_formatter/row_binary/columns_header' +import { Buffer } from 'buffer' +import Stream, { Transform, type TransformCallback } from 'stream' + +export interface RowBinaryStreamParams { + /** Determines whether each row will be returned as an array or an object. Possible options: 'Array', 'Object'. + * + * NB: Object mode will reduce performance by approximately 25-30%, as there will be processing overhead + * (similar to JSONEachRow vs JSONCompactEachRow). + * + * Default: 'Array'. */ + mode?: 'Array' | 'Object' +} + +// FIXME: remove BaseResultSet inheritance (after 1.0.0 is merged). +// FIXME: add logger (after 1.0.0 is merged). +export class RowBinaryResultSet + implements BaseResultSet +{ + constructor( + private _stream: Stream.Readable, + private readonly format: DataFormat, + public readonly query_id: string, + ) {} + + // FIXME: remove this (after 1.0.0 is merged). + async text(): Promise { + throw new Error( + `Can't call 'text()' on RowBinary result set; please use 'stream' instead`, + ) + } + + // FIXME: remove this (after 1.0.0 is merged). + async json(): Promise { + throw new Error( + `Can't call 'json()' on RowBinary result set; please use 'stream' instead`, + ) + } + + /** Consume the entire stream at once and get all the rows as a single array. + * If your result set might be too large, consider using {@link stream} instead. + * + * @returns {Promise} - An array of rows. + */ + async get(params?: RowBinaryStreamParams): Promise { + if (this.format !== 'RowBinary') { + throw new Error( + `Can't use RowBinaryResultSet if the format is not RowBinary`, + ) + } + const result: any[] = [] + await new Promise((resolve, reject) => { + this.stream(params) + .on('data', (rows: unknown[][]) => { + for (let i = 0; i < rows.length; i++) { + result.push(rows[i] as any) + } + }) + .on('end', resolve) + .on('error', reject) + }) + return result + } + + // FIXME: return StreamReadable after 1.0.0. + stream(params?: RowBinaryStreamParams): Stream.Readable { + // If the underlying stream has already ended, + // Stream.pipeline will create a new empty stream, + // but without "readableEnded" flag set to true + if (this._stream.readableEnded) { + throw Error('Stream has been already consumed') + } + if (this.format !== 'RowBinary') { + throw Error(`Format ${this.format} is not RowBinary`) + } + + // ClickHouse columns with their types; decoded from the header in the first chunk(s) + let columns: DecodedColumns[0] | undefined + // Current column index in the row being processed + let columnIndex = 0 + // Fully decoded rows, pending to be pushed downstream + let decodedRows: any[] = [] + // Whether to return each row as an object or an array + const asObject = params?.mode === 'Object' ?? false + // Used as a prototype if it's Object mode + let protoObject: any + + let src: Buffer + let incompleteChunk: Buffer | undefined + + // const measures: Record = {} + // let iterations = 0 + // let incompleteChunksTotal = 0 + // const NS_PER_SEC = 1e9 + + const toRows = new Transform({ + transform( + chunk: Buffer, + _encoding: BufferEncoding, + callback: TransformCallback, + ) { + if (chunk.length === 0) { + return callback() + } + + if (incompleteChunk !== undefined) { + src = Buffer.concat( + [incompleteChunk, chunk], + incompleteChunk.length + chunk.length, + ) + incompleteChunk = undefined + } else { + src = chunk + } + + let loc = 0 + if (columns === undefined) { + try { + const res = RowBinaryColumnsHeader.decode(src) + columns = res[0] + loc = res[1] + if (asObject) { + protoObject = Object.create(null) + for (let i = 0; i < columns.names.length; i++) { + protoObject[columns.names[i]] = undefined + } + } + } catch (err) { + return callback(err as Error) + } + } + + // function logIterationExecutionTime(end: [number, number]) { + // const col = columns!.types[columnIndex] + // const name = columns!.names[columnIndex] + // const execTime = end[0] * NS_PER_SEC + end[1] + // iterations++ + // const key = `${col.dbType} - ${name}` + // measures[key] = (measures[key] || 0) + execTime + // } + + while (loc < src.length) { + const row = asObject + ? Object.create(protoObject) + : new Array(columns.names.length) + while (columnIndex < columns.names.length) { + // const start = process.hrtime() + const decodeResult = columns.decoders[columnIndex](src, loc) + + // const end = process.hrtime(start) + // logIterationExecutionTime(end) + + // not enough data to finish the row - null indicates that + if (decodeResult === null) { + // incompleteChunksTotal++ + // will be added to the beginning of the next received chunk + incompleteChunk = src.subarray(loc) + if (decodedRows.length > 0) { + // console.log(`pushing ${rowsToPush.length} rows`) + this.push(decodedRows) + decodedRows = [] + } + return callback() + } else { + // successfully decoded a value for the column + if (asObject) { + ;(row as any)[columns.names[columnIndex]] = decodeResult[0] + } else { + ;(row as any[])[columnIndex] = decodeResult[0] + } + loc = decodeResult[1] + columnIndex++ + } + } + decodedRows.push(row) + columnIndex = 0 + } + + // if (loc > src.length) { + // console.log(`loc > src.length, ${loc} > ${src.length}`) + // } + + if (decodedRows.length > 0) { + // console.log(`pushing ${rowsToPush.length} rows`) + this.push(decodedRows) + decodedRows = [] + } + + return callback() + }, + final(callback: TransformCallback) { + if (decodedRows.length > 0) { + this.push(decodedRows) + decodedRows = [] + } + // console.log(`Measures (${iterations})`, measures) + // for (const key in measures) { + // console.log(`Avg ns for ${key}:`, measures[key] / iterations) + // } + // console.log(`Incomplete chunks total:`, incompleteChunksTotal) + return callback() + }, + autoDestroy: true, + objectMode: true, + }) + + return Stream.pipeline(this._stream, toRows, function pipelineCb(err) { + if (err) { + // FIXME: use logger instead + // eslint-disable-next-line no-console + console.error(err) + } + }) + } + + close() { + this._stream.destroy() + } +}