diff --git a/docs/schema-avro.md b/docs/schema-avro.md new file mode 100755 index 0000000..cac2fd9 --- /dev/null +++ b/docs/schema-avro.md @@ -0,0 +1,87 @@ +--- +id: schema-avro +title: Example Avro Schemas +sidebar_label: Example Avro Schemas +--- + +## Schema with references to other schemas + +You might want to split the Avro definition into several schemas one for each type. + +```json +{ + "type" : "record", + "namespace" : "test", + "name" : "A", + "fields" : [ + { "name" : "id" , "type" : "int" }, + { "name" : "b" , "type" : "test.B" } + ] +} +``` + +```json +{ + "type" : "record", + "namespace" : "test", + "name" : "B", + "fields" : [ + { "name" : "id" , "type" : "int" } + ] +} +``` + +To register schemas with references, the schemas have to be registered in reverse order. The schema that references another schema has to be registered after the schema it references. In this example B has to be registered before A. Furthermore, when registering A, a list of references have to be provided. A reference consist of: + + * `name` - the fully qualified name of the referenced schema. Example: `test.B` + * `subject` - the subject the schema is registered under in the registry + * `version` - the version of the schema you want to use + +The library will handle an arbitrary number of nested levels. + +```js +const schemaA = { + type: 'record', + namespace: 'test', + name: 'A', + fields: [ + { name: 'id', type: 'int' }, + { name: 'b', type: 'test.B' }, + ], +} + +const schemaB = { + type: 'record', + namespace: 'test', + name: 'B', + fields: [{ name: 'id', type: 'int' }], +} + +await schemaRegistry.register( + { type: SchemaType.AVRO, schema: JSON.stringify(schemaB) }, + { subject: 'Avro:B' }, +) + +const response = await schemaRegistry.api.Subject.latestVersion({ subject: 'Avro:B' }) +const { version } = JSON.parse(response.responseData) + +const { id } = await schemaRegistry.register( +{ + type: SchemaType.AVRO, + schema: JSON.stringify(schemaA), + references: [ + { + name: 'test.B', + subject: 'Avro:B', + version, + }, + ], +}, +{ subject: 'Avro:A' }, +) + +const obj = { id: 1, b: { id: 2 } } + +const buffer = await schemaRegistry.encode(id, obj) +const decodedObj = await schemaRegistry.decode(buffer) +``` diff --git a/docs/schema-json.md b/docs/schema-json.md new file mode 100755 index 0000000..82d3915 --- /dev/null +++ b/docs/schema-json.md @@ -0,0 +1,85 @@ +--- +id: schema-json +title: Example JSON Schemas +sidebar_label: Example JSON Schemas +--- + +## Schema with references to other schemas + +You might want to split the JSON definition into several schemas one for each type. + +```JSON +{ + "$id": "https://example.com/schemas/A", + "type": "object", + "properties": { + "id": { "type": "number" }, + "b": { "$ref": "https://example.com/schemas/B" } + } +} +``` + +```JSON +{ + "$id": "https://example.com/schemas/B", + "type": "object", + "properties": { + "id": { "type": "number" } + } +} +``` + +To register schemas with references, the schemas have to be registered in reverse order. The schema that references another schema has to be registered after the schema it references. In this example B has to be registered before A. Furthermore, when registering A, a list of references have to be provided. A reference consist of: + + * `name` - A URL matching the `$ref` from the schema + * `subject` - the subject the schema is registered under in the registry + * `version` - the version of the schema you want to use + +The library will handle an arbitrary number of nested levels. + +```js +const schemaA = { + $id: 'https://example.com/schemas/A', + type: 'object', + properties: { + id: { type: 'number' }, + b: { $ref: 'https://example.com/schemas/B' }, + }, +} + +const schemaB = { + $id: 'https://example.com/schemas/B', + type: 'object', + properties: { + id: { type: 'number' }, + }, +} + +await schemaRegistry.register( + { type: SchemaType.JSON, schema: JSON.stringify(schemaB) }, + { subject: 'JSON:B' }, +) + +const response = await schemaRegistry.api.Subject.latestVersion({ subject: 'JSON:B' }) +const { version } = JSON.parse(response.responseData) + +const { id } = await schemaRegistry.register( +{ + type: SchemaType.JSON, + schema: JSON.stringify(schemaA), + references: [ + { + name: 'https://example.com/schemas/B', + subject: 'JSON:B', + version, + }, + ], +}, +{ subject: 'JSON:A' }, +) + +const obj = { id: 1, b: { id: 2 } } + +const buffer = await schemaRegistry.encode(id, obj) +const decodedObj = await schemaRegistry.decode(buffer) +``` diff --git a/docs/schema-protobuf.md b/docs/schema-protobuf.md new file mode 100755 index 0000000..c2421ab --- /dev/null +++ b/docs/schema-protobuf.md @@ -0,0 +1,85 @@ +--- +id: schema-protobuf +title: Example Protobuf Schemas +sidebar_label: Example Protobuf Schemas +--- + +## Schema with references to other schemas + +You might want to split the Protobuf definition into several schemas, one for each type. + +```protobuf +syntax = "proto3"; +package test; +import "test/B.proto"; + +message A { + int32 id = 1; + B b = 2; +} +``` + +```protobuf +syntax = "proto3"; +package test; + +message B { + int32 id = 1; +} +``` + +To register schemas with references, the schemas have to be registered in reverse order. The schema that references another schema has to be registered after the schema it references. In this example B has to be registered before A. Furthermore, when registering A, a list of references have to be provided. A reference consist of: + + * `name` - String matching the import statement. For example: `test/B.proto` + * `subject` - the subject the schema is registered under in the registry + * `version` - the version of the schema you want to use + +The library will handle an arbitrary number of nested levels. + +```js +const schemaA = ` + syntax = "proto3"; + package test; + import "test/B.proto"; + + message A { + int32 id = 1; + B b = 2; + }` + +const schemaB = ` + syntax = "proto3"; + package test; + + message B { + int32 id = 1; + }` + +await schemaRegistry.register( + { type: SchemaType.PROTOBUF, schema: schemaB }, + { subject: 'Proto:B' }, +) + +const response = await schemaRegistry.api.Subject.latestVersion({ subject: 'Proto:B' }) +const { version } = JSON.parse(response.responseData) + +const { id } = await schemaRegistry.register( +{ + type: SchemaType.PROTOBUF, + schema: schemaA, + references: [ + { + name: 'test/B.proto', + subject: 'Proto:B', + version, + }, + ], +}, +{ subject: 'Proto:A' }, +) + +const obj = { id: 1, b: { id: 2 } } + +const buffer = await schemaRegistry.encode(id, obj) +const decodedObj = await schemaRegistry.decode(buffer) +``` diff --git a/src/@types.ts b/src/@types.ts index dc6c8bc..ad683a0 100644 --- a/src/@types.ts +++ b/src/@types.ts @@ -8,19 +8,28 @@ export enum SchemaType { PROTOBUF = 'PROTOBUF', UNKNOWN = 'UNKNOWN', } - export interface SchemaHelper { validate(schema: Schema): void getSubject(confluentSchema: ConfluentSchema, schema: Schema, separator: string): ConfluentSubject + toConfluentSchema(data: SchemaResponse): ConfluentSchema + updateOptionsFromSchemaReferences( + referencedSchemas: ConfluentSchema[], + options?: ProtocolOptions, + ): ProtocolOptions +} + +export type AvroOptions = Partial & { + referencedSchemas?: AvroConfluentSchema[] } -export type AvroOptions = Partial export type JsonOptions = ConstructorParameters[0] & { ajvInstance?: { + addSchema: Ajv['addSchema'] compile: (schema: any) => ValidateFunction } + referencedSchemas?: JsonConfluentSchema[] } -export type ProtoOptions = { messageName: string } +export type ProtoOptions = { messageName?: string; referencedSchemas?: ProtoConfluentSchema[] } export interface LegacyOptions { forSchemaOptions?: AvroOptions @@ -60,16 +69,28 @@ export interface ConfluentSubject { export interface AvroConfluentSchema { type: SchemaType.AVRO schema: string | RawAvroSchema + references?: SchemaReference[] } +export type SchemaReference = { + name: string + subject: string + version: number +} export interface ProtoConfluentSchema { type: SchemaType.PROTOBUF schema: string + references?: SchemaReference[] } - export interface JsonConfluentSchema { type: SchemaType.JSON schema: string + references?: SchemaReference[] +} +export interface SchemaResponse { + schema: string + schemaType: string + references?: SchemaReference[] } export type ConfluentSchema = AvroConfluentSchema | ProtoConfluentSchema | JsonConfluentSchema diff --git a/src/AvroHelper.ts b/src/AvroHelper.ts index af35ba5..9f0001b 100644 --- a/src/AvroHelper.ts +++ b/src/AvroHelper.ts @@ -5,10 +5,14 @@ import { ConfluentSchema, SchemaHelper, ConfluentSubject, + ProtocolOptions, + AvroConfluentSchema, } from './@types' import { ConfluentSchemaRegistryArgumentError } from './errors' -import avro from 'avsc' +import avro, { ForSchemaOptions, Schema, Type } from 'avsc' +import { SchemaResponse, SchemaType } from './@types' +type TypeHook = (schema: Schema, opts: ForSchemaOptions) => Type export default class AvroHelper implements SchemaHelper { private getRawAvroSchema(schema: ConfluentSchema): RawAvroSchema { return (typeof schema.schema === 'string' @@ -21,7 +25,27 @@ export default class AvroHelper implements SchemaHelper { ? schema : this.getRawAvroSchema(schema) // @ts-ignore TODO: Fix typings for Schema... - const avroSchema: AvroSchema = avro.Type.forSchema(rawSchema, opts) + + const addReferencedSchemas = (userHook?: TypeHook): TypeHook => ( + schema: avro.Schema, + opts: ForSchemaOptions, + ) => { + const avroOpts = opts as AvroOptions + avroOpts?.referencedSchemas?.forEach(subSchema => { + const rawSubSchema = this.getRawAvroSchema(subSchema) + avroOpts.typeHook = userHook + avro.Type.forSchema(rawSubSchema, avroOpts) + }) + if (userHook) { + return userHook(schema, opts) + } + } + + const avroSchema = avro.Type.forSchema(rawSchema, { + ...opts, + typeHook: addReferencedSchemas(opts?.typeHook), + }) + return avroSchema } @@ -32,7 +56,7 @@ export default class AvroHelper implements SchemaHelper { } public getSubject( - schema: ConfluentSchema, + schema: AvroConfluentSchema, // @ts-ignore avroSchema: AvroSchema, separator: string, @@ -53,4 +77,15 @@ export default class AvroHelper implements SchemaHelper { const asRawAvroSchema = schema as RawAvroSchema return asRawAvroSchema.name != null && asRawAvroSchema.type != null } + + public toConfluentSchema(data: SchemaResponse): AvroConfluentSchema { + return { type: SchemaType.AVRO, schema: data.schema, references: data.references } + } + + updateOptionsFromSchemaReferences( + referencedSchemas: AvroConfluentSchema[], + options: ProtocolOptions = {}, + ): ProtocolOptions { + return { ...options, [SchemaType.AVRO]: { ...options[SchemaType.AVRO], referencedSchemas } } + } } diff --git a/src/JsonHelper.ts b/src/JsonHelper.ts index 4a4b1cd..06b926b 100644 --- a/src/JsonHelper.ts +++ b/src/JsonHelper.ts @@ -1,17 +1,35 @@ -// @ts-nocheck -import { Schema, SchemaHelper, ConfluentSubject, ConfluentSchema } from './@types' +import { + Schema, + SchemaHelper, + ConfluentSubject, + SchemaResponse, + SchemaType, + ProtocolOptions, + JsonConfluentSchema, +} from './@types' import { ConfluentSchemaRegistryError } from './errors' export default class JsonHelper implements SchemaHelper { - public validate(schema: Schema): void { + public validate(_schema: Schema): void { return } public getSubject( - confluentSchema: ConfluentSchema, - schema: Schema, - separator: string, + _confluentSchema: JsonConfluentSchema, + _schema: Schema, + _separator: string, ): ConfluentSubject { throw new ConfluentSchemaRegistryError('not implemented yet') } + + public toConfluentSchema(data: SchemaResponse): JsonConfluentSchema { + return { type: SchemaType.JSON, schema: data.schema, references: data.references } + } + + updateOptionsFromSchemaReferences( + referencedSchemas: JsonConfluentSchema[], + options: ProtocolOptions = {}, + ): ProtocolOptions { + return { ...options, [SchemaType.JSON]: { ...options[SchemaType.JSON], referencedSchemas } } + } } diff --git a/src/JsonSchema.ts b/src/JsonSchema.ts index a2acc35..7b0d7cd 100644 --- a/src/JsonSchema.ts +++ b/src/JsonSchema.ts @@ -6,7 +6,6 @@ interface BaseAjvValidationError { data?: unknown schema?: unknown } - interface OldAjvValidationError extends BaseAjvValidationError { dataPath: string instancePath?: string @@ -30,6 +29,13 @@ export default class JsonSchema implements Schema { private getJsonSchema(schema: JsonConfluentSchema, opts?: JsonOptions) { const ajv = opts?.ajvInstance ?? new Ajv(opts) + const referencedSchemas = opts?.referencedSchemas + if (referencedSchemas) { + referencedSchemas.forEach(rawSchema => { + const $schema = JSON.parse(rawSchema.schema) + ajv.addSchema($schema, $schema['$id']) + }) + } const validate = ajv.compile(JSON.parse(schema.schema)) return validate } diff --git a/src/ProtoHelper.ts b/src/ProtoHelper.ts index d71124f..ba46aff 100644 --- a/src/ProtoHelper.ts +++ b/src/ProtoHelper.ts @@ -1,17 +1,38 @@ -// @ts-nocheck -import { Schema, SchemaHelper, ConfluentSubject, ConfluentSchema } from './@types' +import { + Schema, + SchemaHelper, + ConfluentSubject, + SchemaResponse, + SchemaType, + ProtocolOptions, + ProtoConfluentSchema, +} from './@types' import { ConfluentSchemaRegistryError } from './errors' export default class ProtoHelper implements SchemaHelper { - public validate(schema: Schema): void { + public validate(_schema: Schema): void { return } public getSubject( - confluentSchema: ConfluentSchema, - schema: Schema, - separator: string, + _confluentSchema: ProtoConfluentSchema, + _schema: Schema, + _separator: string, ): ConfluentSubject { throw new ConfluentSchemaRegistryError('not implemented yet') } + + public toConfluentSchema(data: SchemaResponse): ProtoConfluentSchema { + return { type: SchemaType.PROTOBUF, schema: data.schema, references: data.references } + } + + updateOptionsFromSchemaReferences( + referencedSchemas: ProtoConfluentSchema[], + options: ProtocolOptions = {}, + ): ProtocolOptions { + return { + ...options, + [SchemaType.PROTOBUF]: { ...options[SchemaType.PROTOBUF], referencedSchemas }, + } + } } diff --git a/src/ProtoSchema.ts b/src/ProtoSchema.ts index d69c2d7..bd358fe 100644 --- a/src/ProtoSchema.ts +++ b/src/ProtoSchema.ts @@ -12,6 +12,13 @@ export default class ProtoSchema implements Schema { constructor(schema: ProtoConfluentSchema, opts?: ProtoOptions) { const parsedMessage = protobuf.parse(schema.schema) const root = parsedMessage.root + const referencedSchemas = opts?.referencedSchemas + + // handle all schema references independent on nested references + if (referencedSchemas) { + referencedSchemas.forEach(rawSchema => protobuf.parse(rawSchema.schema as string, root)) + } + this.message = root.lookupType(this.getTypeName(parsedMessage, opts)) } diff --git a/src/SchemaRegistry.avro.spec.ts b/src/SchemaRegistry.avro.spec.ts new file mode 100644 index 0000000..7bf92da --- /dev/null +++ b/src/SchemaRegistry.avro.spec.ts @@ -0,0 +1,558 @@ +import SchemaRegistry, { RegisteredSchema } from './SchemaRegistry' +import API from './api' +import { AvroConfluentSchema, SchemaType } from './@types' +import avro from 'avsc' + +const REGISTRY_HOST = 'http://localhost:8982' +const schemaRegistryAPIClientArgs = { host: REGISTRY_HOST } +const schemaRegistryArgs = { host: REGISTRY_HOST } + +enum Color { + RED = 1, + GREEN = 2, + BLUE = 3, +} + +enum Direction { + UP = 1, + DOWN = 2, +} + +const TestSchemas = { + FirstLevelSchema: { + type: SchemaType.AVRO, + schema: ` + { + "type" : "record", + "namespace" : "test", + "name" : "FirstLevel", + "fields" : [ + { "name" : "id1" , "type" : "int" }, + { "name" : "level1a" , "type" : "test.SecondLevelA" }, + { "name" : "level1b" , "type" : "test.SecondLevelB" } + ] + }`, + references: [ + { + name: 'test.SecondLevelA', + subject: 'Avro:SecondLevelA', + version: undefined, + }, + { + name: 'test.SecondLevelB', + subject: 'Avro:SecondLevelB', + version: undefined, + }, + ], + } as AvroConfluentSchema, + + SecondLevelASchema: { + type: SchemaType.AVRO, + schema: ` + { + "type" : "record", + "namespace" : "test", + "name" : "SecondLevelA", + "fields" : [ + { "name" : "id2a" , "type" : "int" }, + { "name" : "level2a" , "type" : "test.ThirdLevel" } + ] + }`, + references: [ + { + name: 'test.ThirdLevel', + subject: 'Avro:ThirdLevel', + version: undefined, + }, + ], + } as AvroConfluentSchema, + + SecondLevelBSchema: { + type: SchemaType.AVRO, + schema: ` + { + "type" : "record", + "namespace" : "test", + "name" : "SecondLevelB", + "fields" : [ + { "name" : "id2b" , "type" : "int" }, + { "name" : "level2b" , "type" : "test.ThirdLevel" } + ] + }`, + references: [ + { + name: 'test.ThirdLevel', + subject: 'Avro:ThirdLevel', + version: undefined, + }, + ], + } as AvroConfluentSchema, + + ThirdLevelSchema: { + type: SchemaType.AVRO, + schema: ` + { + "type" : "record", + "namespace" : "test", + "name" : "ThirdLevel", + "fields" : [ + { "name" : "id3" , "type" : "int" } + ] + }`, + } as AvroConfluentSchema, + + EnumSchema: { + type: SchemaType.AVRO, + schema: ` + { + "type" : "record", + "namespace" : "test", + "name" : "EnumSchema", + "fields" : [ + { + "name": "color", + "type": ["null", { + "type": "enum", + "name": "Color", + "symbols": ["RED", "GREEN", "BLUE"] + } + ] + } + ] + }`, + } as AvroConfluentSchema, + + EnumWithReferencesSchema: { + type: SchemaType.AVRO, + schema: ` + { + "type" : "record", + "namespace" : "test", + "name" : "EnumWithReferences", + "fields" : [ + { + "name": "direction", + "type": ["null", { + "type": "enum", + "name": "Direction", + "symbols": ["UP", "DOWN"] + } + ] + }, + { "name" : "attributes" , "type" : "test.EnumSchema" } + ] + }`, + references: [ + { + name: 'test.EnumSchema', + subject: 'Avro:EnumSchema', + version: undefined, + }, + ], + } as AvroConfluentSchema, +} + +function apiResponse(result) { + return JSON.parse(result.responseData) +} + +describe('SchemaRegistry', () => { + let schemaRegistry: SchemaRegistry + let registeredSchema: RegisteredSchema + let api + + beforeEach(async () => { + api = API(schemaRegistryAPIClientArgs) + schemaRegistry = new SchemaRegistry(schemaRegistryArgs) + }) + + describe('when register', () => { + describe('when no reference', () => { + beforeEach(async () => { + registeredSchema = await schemaRegistry.register(TestSchemas.ThirdLevelSchema, { + subject: 'Avro:ThirdLevel', + }) + }) + + it('should return schema id', async () => { + expect(registeredSchema.id).toEqual(expect.any(Number)) + }) + + it('should be able to encode/decode', async () => { + const obj = { id3: 3 } + + const buffer = await schemaRegistry.encode(registeredSchema.id, obj) + const resultObj = await schemaRegistry.decode(buffer) + + expect(resultObj).toEqual(obj) + }) + }) + + describe('with reference', () => { + let schemaId + let referenceSchema + + beforeEach(async () => { + await schemaRegistry.register(TestSchemas.ThirdLevelSchema, { + subject: 'Avro:ThirdLevel', + }) + + const latest = apiResponse(await api.Subject.latestVersion({ subject: 'Avro:ThirdLevel' })) + TestSchemas.SecondLevelASchema.references[0].version = latest.version + registeredSchema = await schemaRegistry.register(TestSchemas.SecondLevelASchema, { + subject: 'Avro:SecondLevelA', + }) + schemaId = registeredSchema.id + + const schemaRaw = apiResponse(await api.Schema.find({ id: schemaId })) + referenceSchema = schemaRaw.references[0].subject + }) + + it('should return schema id', async () => { + expect(schemaId).toEqual(expect.any(Number)) + }) + + it('should create a schema with reference', async () => { + expect(referenceSchema).toEqual('Avro:ThirdLevel') + }) + + it('should be able to encode/decode', async () => { + const obj = { id2a: 2, level2a: { id3: 3 } } + + const buffer = await schemaRegistry.encode(registeredSchema.id, obj) + const resultObj = await schemaRegistry.decode(buffer) + + expect(resultObj).toEqual(obj) + }) + }) + + describe('with multiple reference', () => { + beforeEach(async () => { + let latest + + registeredSchema = await schemaRegistry.register(TestSchemas.ThirdLevelSchema, { + subject: 'Avro:ThirdLevel', + }) + + latest = apiResponse(await api.Subject.latestVersion({ subject: 'Avro:ThirdLevel' })) + TestSchemas.SecondLevelASchema.references[0].version = latest.version + registeredSchema = await schemaRegistry.register(TestSchemas.SecondLevelASchema, { + subject: 'Avro:SecondLevelA', + }) + + latest = apiResponse(await api.Subject.latestVersion({ subject: 'Avro:ThirdLevel' })) + TestSchemas.SecondLevelBSchema.references[0].version = latest.version + registeredSchema = await schemaRegistry.register(TestSchemas.SecondLevelBSchema, { + subject: 'Avro:SecondLevelB', + }) + + latest = apiResponse(await api.Subject.latestVersion({ subject: 'Avro:SecondLevelA' })) + TestSchemas.FirstLevelSchema.references[0].version = latest.version + latest = apiResponse(await api.Subject.latestVersion({ subject: 'Avro:SecondLevelB' })) + TestSchemas.FirstLevelSchema.references[1].version = latest.version + registeredSchema = await schemaRegistry.register(TestSchemas.FirstLevelSchema, { + subject: 'Avro:FirstLevel', + }) + }) + + it('should be able to encode/decode', async () => { + const obj = { + id1: 1, + level1a: { id2a: 2, level2a: { id3: 3 } }, + level1b: { id2b: 4, level2b: { id3: 5 } }, + } + + const buffer = await schemaRegistry.encode(registeredSchema.id, obj) + const resultObj = await schemaRegistry.decode(buffer) + + expect(resultObj).toEqual(obj) + }) + + it('should be able to encode/decode independent', async () => { + const obj = { + id1: 1, + level1a: { id2a: 2, level2a: { id3: 3 } }, + level1b: { id2b: 4, level2b: { id3: 5 } }, + } + + schemaRegistry = new SchemaRegistry(schemaRegistryArgs) + const buffer = await schemaRegistry.encode(registeredSchema.id, obj) + + schemaRegistry = new SchemaRegistry(schemaRegistryArgs) + const resultObj = await schemaRegistry.decode(buffer) + + expect(resultObj).toEqual(obj) + }) + }) + }) + + describe('_getSchema', () => { + let schema + + describe('no references', () => { + beforeEach(async () => { + registeredSchema = await schemaRegistry.register(TestSchemas.ThirdLevelSchema, { + subject: 'Avro:ThirdLevel', + }) + ;({ schema } = await schemaRegistry['_getSchema'](registeredSchema.id)) + }) + + it('should return schema that match name', async () => { + expect(schema.name).toEqual('test.ThirdLevel') + }) + + it('should be able to encode/decode', async () => { + const obj = { id3: 3 } + + const buffer = await schema.toBuffer(obj) + const resultObj = await schema.fromBuffer(buffer) + + expect(resultObj).toEqual(obj) + }) + }) + + describe('with references', () => { + beforeEach(async () => { + await schemaRegistry.register(TestSchemas.ThirdLevelSchema, { subject: 'Avro:ThirdLevel' }) + + const latest = apiResponse(await api.Subject.latestVersion({ subject: 'Avro:ThirdLevel' })) + TestSchemas.SecondLevelASchema.references[0].version = latest.version + registeredSchema = await schemaRegistry.register(TestSchemas.SecondLevelASchema, { + subject: 'Avro:SecondLevelA', + }) + ;({ schema } = await schemaRegistry['_getSchema'](registeredSchema.id)) + }) + + it('should return schema that match name', async () => { + expect(schema.name).toEqual('test.SecondLevelA') + }) + + it('should be able to encode/decode', async () => { + const obj = { id2a: 2, level2a: { id3: 3 } } + + const buffer = await schema.toBuffer(obj) + const resultObj = await schema.fromBuffer(buffer) + + expect(resultObj).toEqual(obj) + }) + }) + + describe('with multi references', () => { + beforeEach(async () => { + let latest + + await schemaRegistry.register(TestSchemas.ThirdLevelSchema, { + subject: 'Avro:ThirdLevel', + }) + + latest = apiResponse(await api.Subject.latestVersion({ subject: 'Avro:ThirdLevel' })) + TestSchemas.SecondLevelASchema.references[0].version = latest.version + registeredSchema = await schemaRegistry.register(TestSchemas.SecondLevelASchema, { + subject: 'Avro:SecondLevelA', + }) + + latest = apiResponse(await api.Subject.latestVersion({ subject: 'Avro:ThirdLevel' })) + TestSchemas.SecondLevelBSchema.references[0].version = latest.version + registeredSchema = await schemaRegistry.register(TestSchemas.SecondLevelBSchema, { + subject: 'Avro:SecondLevelB', + }) + + latest = apiResponse(await api.Subject.latestVersion({ subject: 'Avro:SecondLevelA' })) + TestSchemas.FirstLevelSchema.references[0].version = latest.version + latest = apiResponse(await api.Subject.latestVersion({ subject: 'Avro:SecondLevelB' })) + TestSchemas.FirstLevelSchema.references[1].version = latest.version + registeredSchema = await schemaRegistry.register(TestSchemas.FirstLevelSchema, { + subject: 'Avro:FirstLevel', + }) + ;({ schema } = await schemaRegistry['_getSchema'](registeredSchema.id)) + }) + + it('should return schema that match name', async () => { + expect(schema.name).toEqual('test.FirstLevel') + }) + + it('should be able to encode/decode', async () => { + const obj = { + id1: 1, + level1a: { id2a: 2, level2a: { id3: 3 } }, + level1b: { id2b: 4, level2b: { id3: 5 } }, + } + + const buffer = await schema.toBuffer(obj) + const resultObj = await schema.fromBuffer(buffer) + + expect(resultObj).toEqual(obj) + }) + }) + }) + + describe('when document example', () => { + it('should encode/decode', async () => { + const schemaA = { + type: 'record', + namespace: 'test', + name: 'A', + fields: [ + { name: 'id', type: 'int' }, + { name: 'b', type: 'test.B' }, + ], + } + + const schemaB = { + type: 'record', + namespace: 'test', + name: 'B', + fields: [{ name: 'id', type: 'int' }], + } + + await schemaRegistry.register( + { type: SchemaType.AVRO, schema: JSON.stringify(schemaB) }, + { subject: 'Avro:B' }, + ) + + const response = await schemaRegistry.api.Subject.latestVersion({ subject: 'Avro:B' }) + const { version } = JSON.parse(response.responseData) + + const { id } = await schemaRegistry.register( + { + type: SchemaType.AVRO, + schema: JSON.stringify(schemaA), + references: [ + { + name: 'test.B', + subject: 'Avro:B', + version, + }, + ], + }, + { subject: 'Avro:A' }, + ) + + const obj = { id: 1, b: { id: 2 } } + + const buffer = await schemaRegistry.encode(id, obj) + const decodedObj = await schemaRegistry.decode(buffer) + + expect(decodedObj).toEqual(obj) + }) + }) + + describe('with EnumType types and nested schemas', () => { + /** + * Hook which will decode/encode enums to/from integers. + * + * The default `EnumType` implementation represents enum values as strings + * (consistent with the JSON representation). This hook can be used to provide + * an alternate representation (which is for example compatible with TypeScript + * enums). + * + * For simplicity, we don't do any bound checking here but we could by + * implementing a "bounded long" logical type and returning that instead. + * + * https://gist.github.com/mtth/c0088c745de048c4e466#file-long-enum-js + */ + function typeHook(attrs, opts) { + if (attrs.type === 'enum') { + return avro.parse('long', opts) + } + } + + let schema + + describe('with no enum typeHook defined', () => { + beforeEach(async () => { + const schemaRegistry = new SchemaRegistry(schemaRegistryArgs) + + await schemaRegistry.register(TestSchemas.EnumSchema, { + subject: 'Avro:EnumSchema', + }) + + const latest = apiResponse(await api.Subject.latestVersion({ subject: 'Avro:EnumSchema' })) + TestSchemas.EnumWithReferencesSchema.references[0].version = latest.version + const registeredSchema = await schemaRegistry.register( + TestSchemas.EnumWithReferencesSchema, + { + subject: 'Avro:EnumWithReferences', + }, + ) + ;({ schema } = await schemaRegistry['_getSchema'](registeredSchema.id)) + }) + + it('should not be able to encode/decode enums schemas', async () => { + const obj = { + direction: Direction.UP, + attributes: { color: Color.BLUE }, + } + + expect(() => schema.toBuffer(obj)).toThrow(Error) + }) + }) + + describe('with enum typeHook defined', () => { + beforeEach(async () => { + const schemaRegistry = new SchemaRegistry(schemaRegistryArgs, { + [SchemaType.AVRO]: { typeHook }, + }) + + await schemaRegistry.register(TestSchemas.EnumSchema, { + subject: 'Avro:EnumSchema', + }) + + const latest = apiResponse(await api.Subject.latestVersion({ subject: 'Avro:EnumSchema' })) + TestSchemas.EnumWithReferencesSchema.references[0].version = latest.version + const registeredSchema = await schemaRegistry.register( + TestSchemas.EnumWithReferencesSchema, + { + subject: 'Avro:EnumWithReferences', + }, + ) + ;({ schema } = await schemaRegistry['_getSchema'](registeredSchema.id)) + }) + + it('should be able to encode/decode enums schemas', async () => { + const obj = { + direction: Direction.UP, + attributes: { color: Color.BLUE }, + } + + const buffer = await schema.toBuffer(obj) + const resultObj = await schema.fromBuffer(buffer) + + expect(resultObj).toEqual(obj) + }) + }) + describe('with enum typeHook defined as LegacyOptions', () => { + beforeEach(async () => { + const schemaRegistry = new SchemaRegistry(schemaRegistryArgs, { + forSchemaOptions: { typeHook }, + }) + + await schemaRegistry.register(TestSchemas.EnumSchema, { + subject: 'Avro:EnumSchema', + }) + + const latest = apiResponse(await api.Subject.latestVersion({ subject: 'Avro:EnumSchema' })) + TestSchemas.EnumWithReferencesSchema.references[0].version = latest.version + const registeredSchema = await schemaRegistry.register( + TestSchemas.EnumWithReferencesSchema, + { + subject: 'Avro:EnumWithReferences', + }, + ) + ;({ schema } = await schemaRegistry['_getSchema'](registeredSchema.id)) + }) + + it('should be able to encode/decode enums schemas', async () => { + const obj = { + direction: Direction.UP, + attributes: { color: Color.BLUE }, + } + + const buffer = await schema.toBuffer(obj) + const resultObj = await schema.fromBuffer(buffer) + + expect(resultObj).toEqual(obj) + }) + }) + }) +}) diff --git a/src/SchemaRegistry.json.spec.ts b/src/SchemaRegistry.json.spec.ts new file mode 100644 index 0000000..fbade20 --- /dev/null +++ b/src/SchemaRegistry.json.spec.ts @@ -0,0 +1,364 @@ +import SchemaRegistry, { RegisteredSchema } from './SchemaRegistry' +import API from './api' +import { JsonConfluentSchema, SchemaType } from './@types' + +const REGISTRY_HOST = 'http://localhost:8982' +const schemaRegistryAPIClientArgs = { host: REGISTRY_HOST } +const schemaRegistryArgs = { host: REGISTRY_HOST } + +const TestSchemas = { + ThirdLevelSchema: { + type: SchemaType.JSON, + schema: ` + { + "$id": "https://example.com/schemas/ThirdLevel", + "type": "object", + "properties": { + "id3": { "type": "number" } + } + } + `, + } as JsonConfluentSchema, + + SecondLevelASchema: { + type: SchemaType.JSON, + schema: ` + { + "$id": "https://example.com/schemas/SecondLevelA", + "type": "object", + "properties": { + "id2a": { "type": "number" }, + "level2a": { "$ref": "https://example.com/schemas/ThirdLevel" } + } + } + `, + references: [ + { + name: 'https://example.com/schemas/ThirdLevel', + subject: 'JSON:ThirdLevel', + version: undefined, + }, + ], + } as JsonConfluentSchema, + + SecondLevelBSchema: { + type: SchemaType.JSON, + schema: ` + { + "$id": "https://example.com/schemas/SecondLevelB", + "type": "object", + "properties": { + "id2b": { "type": "number" }, + "level2b": { "$ref": "https://example.com/schemas/ThirdLevel" } + } + } + `, + references: [ + { + name: 'https://example.com/schemas/ThirdLevel', + subject: 'JSON:ThirdLevel', + version: undefined, + }, + ], + } as JsonConfluentSchema, + + FirstLevelSchema: { + type: SchemaType.JSON, + schema: ` + { + "$id": "https://example.com/schemas/FirstLevel", + "type": "object", + "properties": { + "id1": { "type": "number" }, + "level1a": { "$ref": "https://example.com/schemas/SecondLevelA" }, + "level1b": { "$ref": "https://example.com/schemas/SecondLevelB" } + } + } + `, + references: [ + { + name: 'https://example.com/schemas/SecondLevelA', + subject: 'JSON:SecondLevelA', + version: undefined, + }, + { + name: 'https://example.com/schemas/SecondLevelB', + subject: 'JSON:SecondLevelB', + version: undefined, + }, + ], + } as JsonConfluentSchema, +} + +function apiResponse(result) { + return JSON.parse(result.responseData) +} + +describe('SchemaRegistry', () => { + let schemaRegistry: SchemaRegistry + let registeredSchema: RegisteredSchema + let api + + beforeEach(async () => { + api = API(schemaRegistryAPIClientArgs) + schemaRegistry = new SchemaRegistry(schemaRegistryArgs) + }) + + describe('when register', () => { + describe('when no reference', () => { + beforeEach(async () => { + registeredSchema = await schemaRegistry.register(TestSchemas.ThirdLevelSchema, { + subject: 'JSON:ThirdLevel', + }) + }) + it('should return schema id', async () => { + expect(registeredSchema.id).toEqual(expect.any(Number)) + }) + + it('should be able to encode/decode', async () => { + const obj = { id3: 3 } + + const buffer = await schemaRegistry.encode(registeredSchema.id, obj) + const resultObj = await schemaRegistry.decode(buffer) + + expect(resultObj).toEqual(obj) + }) + }) + + describe('with reference', () => { + let schemaId + let referenceSchema + + beforeEach(async () => { + await schemaRegistry.register(TestSchemas.ThirdLevelSchema, { + subject: 'JSON:ThirdLevel', + }) + + const latest = apiResponse(await api.Subject.latestVersion({ subject: 'JSON:ThirdLevel' })) + TestSchemas.SecondLevelASchema.references[0].version = latest.version + registeredSchema = await schemaRegistry.register(TestSchemas.SecondLevelASchema, { + subject: 'JSON:SecondLevelA', + }) + schemaId = registeredSchema.id + + const schemaRaw = apiResponse(await api.Schema.find({ id: schemaId })) + referenceSchema = schemaRaw.references[0].subject + }) + + it('should return schema id', async () => { + expect(schemaId).toEqual(expect.any(Number)) + }) + + it('should create a schema with reference', async () => { + expect(referenceSchema).toEqual('JSON:ThirdLevel') + }) + + it('should be able to encode/decode', async () => { + const obj = { id2a: 2, level2a: { id3: 3 } } + + const buffer = await schemaRegistry.encode(registeredSchema.id, obj) + const resultObj = await schemaRegistry.decode(buffer) + + expect(resultObj).toEqual(obj) + }) + }) + + describe('with multiple reference', () => { + beforeEach(async () => { + let latest + + await schemaRegistry.register(TestSchemas.ThirdLevelSchema, { + subject: 'JSON:ThirdLevel', + }) + + latest = apiResponse(await api.Subject.latestVersion({ subject: 'JSON:ThirdLevel' })) + TestSchemas.SecondLevelASchema.references[0].version = latest.version + registeredSchema = await schemaRegistry.register(TestSchemas.SecondLevelASchema, { + subject: 'JSON:SecondLevelA', + }) + + latest = apiResponse(await api.Subject.latestVersion({ subject: 'JSON:ThirdLevel' })) + TestSchemas.SecondLevelBSchema.references[0].version = latest.version + registeredSchema = await schemaRegistry.register(TestSchemas.SecondLevelBSchema, { + subject: 'JSON:SecondLevelB', + }) + + latest = apiResponse(await api.Subject.latestVersion({ subject: 'JSON:SecondLevelA' })) + TestSchemas.FirstLevelSchema.references[0].version = latest.version + latest = apiResponse(await api.Subject.latestVersion({ subject: 'JSON:SecondLevelB' })) + TestSchemas.FirstLevelSchema.references[1].version = latest.version + registeredSchema = await schemaRegistry.register(TestSchemas.FirstLevelSchema, { + subject: 'JSON:FirstLevel', + }) + }) + + it('should be able to encode/decode', async () => { + const obj = { + id1: 1, + level1a: { id2a: 2, level2a: { id3: 3 } }, + level1b: { id2b: 4, level2b: { id3: 5 } }, + } + + const buffer = await schemaRegistry.encode(registeredSchema.id, obj) + const resultObj = await schemaRegistry.decode(buffer) + + expect(resultObj).toEqual(obj) + }) + + it('should be able to encode/decode independent', async () => { + const obj = { + id1: 1, + level1a: { id2a: 2, level2a: { id3: 3 } }, + level1b: { id2b: 4, level2b: { id3: 5 } }, + } + + schemaRegistry = new SchemaRegistry(schemaRegistryArgs) + const buffer = await schemaRegistry.encode(registeredSchema.id, obj) + + schemaRegistry = new SchemaRegistry(schemaRegistryArgs) + const resultObj = await schemaRegistry.decode(buffer) + + expect(resultObj).toEqual(obj) + }) + }) + }) + + describe('_getSchema', () => { + let schema + + describe('no references', () => { + beforeEach(async () => { + registeredSchema = await schemaRegistry.register(TestSchemas.ThirdLevelSchema, { + subject: 'JSON:ThirdLevel', + }) + ;({ schema } = await schemaRegistry['_getSchema'](registeredSchema.id)) + }) + + it('should be able to encode/decode', async () => { + const obj = { id3: 3 } + + const buffer = await schema.toBuffer(obj) + const resultObj = await schema.fromBuffer(buffer) + + expect(resultObj).toEqual(obj) + }) + }) + + describe('with references', () => { + beforeEach(async () => { + await schemaRegistry.register(TestSchemas.ThirdLevelSchema, { subject: 'JSON:ThirdLevel' }) + + const latest = apiResponse(await api.Subject.latestVersion({ subject: 'JSON:ThirdLevel' })) + TestSchemas.SecondLevelASchema.references[0].version = latest.version + registeredSchema = await schemaRegistry.register(TestSchemas.SecondLevelASchema, { + subject: 'JSON:SecondLevelA', + }) + ;({ schema } = await schemaRegistry['_getSchema'](registeredSchema.id)) + }) + + it('should be able to encode/decode', async () => { + const obj = { id2a: 2, level2a: { id3: 3 } } + + const buffer = await schema.toBuffer(obj) + const resultObj = await schema.fromBuffer(buffer) + + expect(resultObj).toEqual(obj) + }) + }) + + describe('with multi references', () => { + beforeEach(async () => { + let latest + + await schemaRegistry.register(TestSchemas.ThirdLevelSchema, { + subject: 'JSON:ThirdLevel', + }) + + latest = apiResponse(await api.Subject.latestVersion({ subject: 'JSON:ThirdLevel' })) + TestSchemas.SecondLevelASchema.references[0].version = latest.version + registeredSchema = await schemaRegistry.register(TestSchemas.SecondLevelASchema, { + subject: 'JSON:SecondLevelA', + }) + + latest = apiResponse(await api.Subject.latestVersion({ subject: 'JSON:ThirdLevel' })) + TestSchemas.SecondLevelBSchema.references[0].version = latest.version + registeredSchema = await schemaRegistry.register(TestSchemas.SecondLevelBSchema, { + subject: 'JSON:SecondLevelB', + }) + + latest = apiResponse(await api.Subject.latestVersion({ subject: 'JSON:SecondLevelA' })) + TestSchemas.FirstLevelSchema.references[0].version = latest.version + latest = apiResponse(await api.Subject.latestVersion({ subject: 'JSON:SecondLevelB' })) + TestSchemas.FirstLevelSchema.references[1].version = latest.version + registeredSchema = await schemaRegistry.register(TestSchemas.FirstLevelSchema, { + subject: 'JSON:FirstLevel', + }) + ;({ schema } = await schemaRegistry['_getSchema'](registeredSchema.id)) + }) + + it('should be able to encode/decode', async () => { + const obj = { + id1: 1, + level1a: { id2a: 2, level2a: { id3: 3 } }, + level1b: { id2b: 4, level2b: { id3: 5 } }, + } + + const buffer = await schema.toBuffer(obj) + const resultObj = await schema.fromBuffer(buffer) + + expect(resultObj).toEqual(obj) + }) + }) + }) + + describe('when document example', () => { + it('should encode/decode', async () => { + const schemaA = { + $id: 'https://example.com/schemas/A', + type: 'object', + properties: { + id: { type: 'number' }, + b: { $ref: 'https://example.com/schemas/B' }, + }, + } + + const schemaB = { + $id: 'https://example.com/schemas/B', + type: 'object', + properties: { + id: { type: 'number' }, + }, + } + + await schemaRegistry.register( + { type: SchemaType.JSON, schema: JSON.stringify(schemaB) }, + { subject: 'JSON:B' }, + ) + + const response = await schemaRegistry.api.Subject.latestVersion({ subject: 'JSON:B' }) + const { version } = JSON.parse(response.responseData) + + const { id } = await schemaRegistry.register( + { + type: SchemaType.JSON, + schema: JSON.stringify(schemaA), + references: [ + { + name: 'https://example.com/schemas/B', + subject: 'JSON:B', + version, + }, + ], + }, + { subject: 'JSON:A' }, + ) + + const obj = { id: 1, b: { id: 2 } } + + const buffer = await schemaRegistry.encode(id, obj) + const decodedObj = await schemaRegistry.decode(buffer) + + expect(decodedObj).toEqual(obj) + }) + }) +}) diff --git a/src/SchemaRegistry.protobuf.spec.ts b/src/SchemaRegistry.protobuf.spec.ts new file mode 100644 index 0000000..7477a9f --- /dev/null +++ b/src/SchemaRegistry.protobuf.spec.ts @@ -0,0 +1,372 @@ +import SchemaRegistry, { RegisteredSchema } from './SchemaRegistry' +import API from './api' +import { ProtoConfluentSchema, SchemaType } from './@types' + +const REGISTRY_HOST = 'http://localhost:8982' +const schemaRegistryAPIClientArgs = { host: REGISTRY_HOST } +const schemaRegistryArgs = { host: REGISTRY_HOST } + +const TestSchemas = { + FirstLevelSchema: { + type: SchemaType.PROTOBUF, + schema: ` + syntax = "proto3"; + package test; + import "test/second_level_A.proto"; + import "test/second_level_B.proto"; + + message FirstLevel { + int32 id1 = 1; + SecondLevelA level1a = 2; + SecondLevelB level1b = 3; + }`, + references: [ + { + name: 'test/second_level_A.proto', + subject: 'Proto:SecondLevelA', + version: undefined, + }, + { + name: 'test/second_level_B.proto', + subject: 'Proto:SecondLevelB', + version: undefined, + }, + ], + } as ProtoConfluentSchema, + + SecondLevelASchema: { + type: SchemaType.PROTOBUF, + schema: ` + syntax = "proto3"; + package test; + import "test/third_level.proto"; + + message SecondLevelA { + int32 id2a = 1; + ThirdLevel level2a = 2; + }`, + references: [ + { + name: 'test/third_level.proto', + subject: 'Proto:ThirdLevel', + version: undefined, + }, + ], + } as ProtoConfluentSchema, + + SecondLevelBSchema: { + type: SchemaType.PROTOBUF, + schema: ` + syntax = "proto3"; + package test; + import "test/third_level.proto"; + + message SecondLevelB { + int32 id2b = 1; + ThirdLevel level2b = 2; + }`, + references: [ + { + name: 'test/third_level.proto', + subject: 'Proto:ThirdLevel', + version: undefined, + }, + ], + } as ProtoConfluentSchema, + + ThirdLevelSchema: { + type: SchemaType.PROTOBUF, + schema: ` + syntax = "proto3"; + package test; + + message ThirdLevel { + int32 id3 = 1; + }`, + } as ProtoConfluentSchema, +} + +function apiResponse(result) { + return JSON.parse(result.responseData) +} + +describe('SchemaRegistry', () => { + let schemaRegistry: SchemaRegistry + let registeredSchema: RegisteredSchema + let api + + beforeEach(async () => { + api = API(schemaRegistryAPIClientArgs) + schemaRegistry = new SchemaRegistry(schemaRegistryArgs) + }) + + describe('when register', () => { + describe('when no reference', () => { + beforeEach(async () => { + registeredSchema = await schemaRegistry.register(TestSchemas.ThirdLevelSchema, { + subject: 'Proto:ThirdLevel', + }) + }) + it('should return schema id', async () => { + expect(registeredSchema.id).toEqual(expect.any(Number)) + }) + + it('should be able to encode/decode', async () => { + const obj = { id3: 3 } + + const buffer = await schemaRegistry.encode(registeredSchema.id, obj) + const resultObj = await schemaRegistry.decode(buffer) + + expect(resultObj).toEqual(obj) + }) + }) + + describe('with reference', () => { + let schemaId + let referenceSchema + + beforeEach(async () => { + await schemaRegistry.register(TestSchemas.ThirdLevelSchema, { + subject: 'Proto:ThirdLevel', + }) + + const latest = apiResponse(await api.Subject.latestVersion({ subject: 'Proto:ThirdLevel' })) + TestSchemas.SecondLevelASchema.references[0].version = latest.version + registeredSchema = await schemaRegistry.register(TestSchemas.SecondLevelASchema, { + subject: 'Proto:SecondLevelA', + }) + schemaId = registeredSchema.id + + const schemaRaw = apiResponse(await api.Schema.find({ id: schemaId })) + referenceSchema = schemaRaw.references[0].subject + }) + + it('should return schema id', async () => { + expect(schemaId).toEqual(expect.any(Number)) + }) + it('should create a schema with reference', async () => { + expect(referenceSchema).toEqual('Proto:ThirdLevel') + }) + + it('should be able to encode/decode', async () => { + const obj = { id2a: 2, level2a: { id3: 3 } } + + const buffer = await schemaRegistry.encode(registeredSchema.id, obj) + const resultObj = await schemaRegistry.decode(buffer) + + expect(resultObj).toEqual(obj) + }) + }) + + describe('with multiple reference', () => { + beforeEach(async () => { + let latest + + await schemaRegistry.register(TestSchemas.ThirdLevelSchema, { + subject: 'Proto:ThirdLevel', + }) + + latest = apiResponse(await api.Subject.latestVersion({ subject: 'Proto:ThirdLevel' })) + TestSchemas.SecondLevelASchema.references[0].version = latest.version + registeredSchema = await schemaRegistry.register(TestSchemas.SecondLevelASchema, { + subject: 'Proto:SecondLevelA', + }) + + latest = apiResponse(await api.Subject.latestVersion({ subject: 'Proto:ThirdLevel' })) + TestSchemas.SecondLevelBSchema.references[0].version = latest.version + registeredSchema = await schemaRegistry.register(TestSchemas.SecondLevelBSchema, { + subject: 'Proto:SecondLevelB', + }) + + latest = apiResponse(await api.Subject.latestVersion({ subject: 'Proto:SecondLevelA' })) + TestSchemas.FirstLevelSchema.references[0].version = latest.version + latest = apiResponse(await api.Subject.latestVersion({ subject: 'Proto:SecondLevelB' })) + TestSchemas.FirstLevelSchema.references[1].version = latest.version + registeredSchema = await schemaRegistry.register(TestSchemas.FirstLevelSchema, { + subject: 'Proto:FirstLevel', + }) + }) + + it('should be able to encode/decode', async () => { + const obj = { + id1: 1, + level1a: { id2a: 2, level2a: { id3: 3 } }, + level1b: { id2b: 4, level2b: { id3: 5 } }, + } + + const buffer = await schemaRegistry.encode(registeredSchema.id, obj) + const resultObj = await schemaRegistry.decode(buffer) + + expect(resultObj).toEqual(obj) + }) + + it('should be able to encode/decode independent', async () => { + const obj = { + id1: 1, + level1a: { id2a: 2, level2a: { id3: 3 } }, + level1b: { id2b: 4, level2b: { id3: 5 } }, + } + + schemaRegistry = new SchemaRegistry(schemaRegistryArgs) + const buffer = await schemaRegistry.encode(registeredSchema.id, obj) + + schemaRegistry = new SchemaRegistry(schemaRegistryArgs) + const resultObj = await schemaRegistry.decode(buffer) + + expect(resultObj).toEqual(obj) + }) + }) + }) + + describe('_getSchema', () => { + let schema + + describe('no references', () => { + beforeEach(async () => { + registeredSchema = await schemaRegistry.register(TestSchemas.ThirdLevelSchema, { + subject: 'Proto:ThirdLevel', + }) + ;({ schema } = await schemaRegistry['_getSchema'](registeredSchema.id)) + }) + + it('should return schema that match message', async () => { + expect(schema.message.name).toEqual('ThirdLevel') + }) + + it('should be able to encode/decode', async () => { + const obj = { id3: 3 } + + const buffer = await schema.toBuffer(obj) + const resultObj = await schema.fromBuffer(buffer) + + expect(resultObj).toEqual(obj) + }) + }) + + describe('with references', () => { + beforeEach(async () => { + await schemaRegistry.register(TestSchemas.ThirdLevelSchema, { subject: 'Proto:ThirdLevel' }) + + const latest = apiResponse(await api.Subject.latestVersion({ subject: 'Proto:ThirdLevel' })) + TestSchemas.SecondLevelASchema.references[0].version = latest.version + registeredSchema = await schemaRegistry.register(TestSchemas.SecondLevelASchema, { + subject: 'Proto:SecondLevelA', + }) + ;({ schema } = await schemaRegistry['_getSchema'](registeredSchema.id)) + }) + + it('should return schema that match message', async () => { + expect(schema.message.name).toEqual('SecondLevelA') + }) + + it('should be able to encode/decode', async () => { + const obj = { id2a: 2, level2a: { id3: 3 } } + + const buffer = await schema.toBuffer(obj) + const resultObj = await schema.fromBuffer(buffer) + + expect(resultObj).toEqual(obj) + }) + }) + + describe('with multi references', () => { + beforeEach(async () => { + let latest + + await schemaRegistry.register(TestSchemas.ThirdLevelSchema, { + subject: 'Proto:ThirdLevel', + }) + + latest = apiResponse(await api.Subject.latestVersion({ subject: 'Proto:ThirdLevel' })) + TestSchemas.SecondLevelASchema.references[0].version = latest.version + registeredSchema = await schemaRegistry.register(TestSchemas.SecondLevelASchema, { + subject: 'Proto:SecondLevelA', + }) + + latest = apiResponse(await api.Subject.latestVersion({ subject: 'Proto:ThirdLevel' })) + TestSchemas.SecondLevelBSchema.references[0].version = latest.version + registeredSchema = await schemaRegistry.register(TestSchemas.SecondLevelBSchema, { + subject: 'Proto:SecondLevelB', + }) + + latest = apiResponse(await api.Subject.latestVersion({ subject: 'Proto:SecondLevelA' })) + TestSchemas.FirstLevelSchema.references[0].version = latest.version + latest = apiResponse(await api.Subject.latestVersion({ subject: 'Proto:SecondLevelB' })) + TestSchemas.FirstLevelSchema.references[1].version = latest.version + registeredSchema = await schemaRegistry.register(TestSchemas.FirstLevelSchema, { + subject: 'Proto:FirstLevel', + }) + ;({ schema } = await schemaRegistry['_getSchema'](registeredSchema.id)) + }) + + it('should return schema that match message', async () => { + expect(schema.message.name).toEqual('FirstLevel') + }) + + it('should be able to encode/decode', async () => { + const obj = { + id1: 1, + level1a: { id2a: 2, level2a: { id3: 3 } }, + level1b: { id2b: 4, level2b: { id3: 5 } }, + } + + const buffer = await schema.toBuffer(obj) + const resultObj = await schema.fromBuffer(buffer) + + expect(resultObj).toEqual(obj) + }) + }) + }) + + describe('when document example', () => { + it('should encode/decode', async () => { + const schemaA = ` + syntax = "proto3"; + package test; + import "test/B.proto"; + + message A { + int32 id = 1; + B b = 2; + }` + + const schemaB = ` + syntax = "proto3"; + package test; + + message B { + int32 id = 1; + }` + + await schemaRegistry.register( + { type: SchemaType.PROTOBUF, schema: schemaB }, + { subject: 'Proto:B' }, + ) + + const response = await schemaRegistry.api.Subject.latestVersion({ subject: 'Proto:B' }) + const { version } = JSON.parse(response.responseData) + + const { id } = await schemaRegistry.register( + { + type: SchemaType.PROTOBUF, + schema: schemaA, + references: [ + { + name: 'test/B.proto', + subject: 'Proto:B', + version, + }, + ], + }, + { subject: 'Proto:A' }, + ) + + const obj = { id: 1, b: { id: 2 } } + + const buffer = await schemaRegistry.encode(id, obj) + const decodedObj = await schemaRegistry.decode(buffer) + + expect(decodedObj).toEqual(obj) + }) + }) +}) diff --git a/src/SchemaRegistry.ts b/src/SchemaRegistry.ts index c98a02a..cb8ce32 100644 --- a/src/SchemaRegistry.ts +++ b/src/SchemaRegistry.ts @@ -21,6 +21,11 @@ import { ConfluentSubject, SchemaRegistryAPIClientOptions, AvroConfluentSchema, + SchemaResponse, + ProtocolOptions, + SchemaHelper, + SchemaReference, + LegacyOptions, } from './@types' import { helperTypeFromSchemaType, @@ -28,7 +33,7 @@ import { schemaFromConfluentSchema, } from './schemaTypeResolver' -interface RegisteredSchema { +export interface RegisteredSchema { id: number } @@ -49,7 +54,6 @@ const DEFAULT_OPTS = { compatibility: COMPATIBILITY.BACKWARD, separator: DEFAULT_SEPERATOR, } - export default class SchemaRegistry { private api: SchemaRegistryAPIClient private cacheMissRequests: { [key: number]: Promise } = {} @@ -110,7 +114,9 @@ export default class SchemaRegistry { const confluentSchema: ConfluentSchema = this.getConfluentSchema(schema) const helper = helperTypeFromSchemaType(confluentSchema.type) - const schemaInstance = schemaFromConfluentSchema(confluentSchema, this.options) + + const options = await this.updateOptionsWithSchemaReferences(confluentSchema, this.options) + const schemaInstance = schemaFromConfluentSchema(confluentSchema, options) helper.validate(schemaInstance) let isFirstTimeRegistration = false let subject: ConfluentSubject @@ -144,6 +150,7 @@ export default class SchemaRegistry { body: { schemaType: confluentSchema.type === SchemaType.AVRO ? undefined : confluentSchema.type, schema: confluentSchema.schema, + references: confluentSchema.references, }, }) @@ -158,6 +165,77 @@ export default class SchemaRegistry { return registeredSchema } + private async updateOptionsWithSchemaReferences( + schema: ConfluentSchema, + options?: SchemaRegistryAPIClientOptions, + ) { + const helper = helperTypeFromSchemaType(schema.type) + const referencedSchemas = await this.getreferencedSchemas(schema, helper) + + const protocolOptions = this.asProtocolOptions(options) + return helper.updateOptionsFromSchemaReferences(referencedSchemas, protocolOptions) + } + + private asProtocolOptions(options?: SchemaRegistryAPIClientOptions): ProtocolOptions | undefined { + if (!(options as LegacyOptions)?.forSchemaOptions) { + return options as ProtocolOptions | undefined + } + return { + [SchemaType.AVRO]: (options as LegacyOptions)?.forSchemaOptions, + } + } + + private async getreferencedSchemas( + schema: ConfluentSchema, + helper: SchemaHelper, + ): Promise { + const referencesSet = new Set() + return this.getreferencedSchemasRecursive(schema, helper, referencesSet) + } + + private async getreferencedSchemasRecursive( + schema: ConfluentSchema, + helper: SchemaHelper, + referencesSet: Set, + ): Promise { + const references = schema.references || [] + + let referencedSchemas: ConfluentSchema[] = [] + for (const reference of references) { + const schemas = await this.getreferencedSchemasFromReference(reference, helper, referencesSet) + referencedSchemas = referencedSchemas.concat(schemas) + } + return referencedSchemas + } + + async getreferencedSchemasFromReference( + reference: SchemaReference, + helper: SchemaHelper, + referencesSet: Set, + ): Promise { + const { name, subject, version } = reference + const key = `${name}-${subject}-${version}` + + // avoid duplicates + if (referencesSet.has(key)) { + return [] + } + referencesSet.add(key) + + const versionResponse = await this.api.Subject.version(reference) + const foundSchema = versionResponse.data() as SchemaResponse + + const schema = helper.toConfluentSchema(foundSchema) + const referencedSchemas = await this.getreferencedSchemasRecursive( + schema, + helper, + referencesSet, + ) + + referencedSchemas.push(schema) + return referencedSchemas + } + private async _getSchema( registryId: number, ): Promise<{ type: SchemaType; schema: Schema | AvroSchema }> { @@ -168,19 +246,15 @@ export default class SchemaRegistry { } const response = await this.getSchemaOriginRequest(registryId) - const foundSchema: { schema: string; schemaType: string } = response.data() - const rawSchema = foundSchema.schema + const foundSchema: SchemaResponse = response.data() + const schemaType = schemaTypeFromString(foundSchema.schemaType) - if (schemaType === SchemaType.UNKNOWN) { - throw new ConfluentSchemaRegistryError(`Unknown schema type ${foundSchema.schemaType}`) - } + const helper = helperTypeFromSchemaType(schemaType) + const confluentSchema = helper.toConfluentSchema(foundSchema) - const confluentSchema: ConfluentSchema = { - type: schemaType, - schema: rawSchema, - } - const schemaInstance = schemaFromConfluentSchema(confluentSchema, this.options) + const options = await this.updateOptionsWithSchemaReferences(confluentSchema, this.options) + const schemaInstance = schemaFromConfluentSchema(confluentSchema, options) return this.cache.setSchema(registryId, schemaType, schemaInstance) }