Skip to content

Commit

Permalink
Merge pull request #197 from debitoor/master
Browse files Browse the repository at this point in the history
Support schema references for Avro, Protocol Buffer, and JSON schema
  • Loading branch information
Nevon authored Oct 4, 2022
2 parents 05a3613 + 0b2d05c commit 3f4dea4
Show file tree
Hide file tree
Showing 13 changed files with 1,766 additions and 33 deletions.
87 changes: 87 additions & 0 deletions docs/schema-avro.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
---
id: schema-avro
title: Example Avro Schemas
sidebar_label: Example Avro Schemas
---

## Schema with references to other schemas

You might want to split the Avro definition into several schemas one for each type.

```json
{
"type" : "record",
"namespace" : "test",
"name" : "A",
"fields" : [
{ "name" : "id" , "type" : "int" },
{ "name" : "b" , "type" : "test.B" }
]
}
```

```json
{
"type" : "record",
"namespace" : "test",
"name" : "B",
"fields" : [
{ "name" : "id" , "type" : "int" }
]
}
```

To register schemas with references, the schemas have to be registered in reverse order. The schema that references another schema has to be registered after the schema it references. In this example B has to be registered before A. Furthermore, when registering A, a list of references have to be provided. A reference consist of:

* `name` - the fully qualified name of the referenced schema. Example: `test.B`
* `subject` - the subject the schema is registered under in the registry
* `version` - the version of the schema you want to use

The library will handle an arbitrary number of nested levels.

```js
const schemaA = {
type: 'record',
namespace: 'test',
name: 'A',
fields: [
{ name: 'id', type: 'int' },
{ name: 'b', type: 'test.B' },
],
}

const schemaB = {
type: 'record',
namespace: 'test',
name: 'B',
fields: [{ name: 'id', type: 'int' }],
}

await schemaRegistry.register(
{ type: SchemaType.AVRO, schema: JSON.stringify(schemaB) },
{ subject: 'Avro:B' },
)

const response = await schemaRegistry.api.Subject.latestVersion({ subject: 'Avro:B' })
const { version } = JSON.parse(response.responseData)

const { id } = await schemaRegistry.register(
{
type: SchemaType.AVRO,
schema: JSON.stringify(schemaA),
references: [
{
name: 'test.B',
subject: 'Avro:B',
version,
},
],
},
{ subject: 'Avro:A' },
)

const obj = { id: 1, b: { id: 2 } }

const buffer = await schemaRegistry.encode(id, obj)
const decodedObj = await schemaRegistry.decode(buffer)
```
85 changes: 85 additions & 0 deletions docs/schema-json.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
---
id: schema-json
title: Example JSON Schemas
sidebar_label: Example JSON Schemas
---

## Schema with references to other schemas

You might want to split the JSON definition into several schemas one for each type.

```JSON
{
"$id": "https://example.com/schemas/A",
"type": "object",
"properties": {
"id": { "type": "number" },
"b": { "$ref": "https://example.com/schemas/B" }
}
}
```

```JSON
{
"$id": "https://example.com/schemas/B",
"type": "object",
"properties": {
"id": { "type": "number" }
}
}
```

To register schemas with references, the schemas have to be registered in reverse order. The schema that references another schema has to be registered after the schema it references. In this example B has to be registered before A. Furthermore, when registering A, a list of references have to be provided. A reference consist of:

* `name` - A URL matching the `$ref` from the schema
* `subject` - the subject the schema is registered under in the registry
* `version` - the version of the schema you want to use

The library will handle an arbitrary number of nested levels.

```js
const schemaA = {
$id: 'https://example.com/schemas/A',
type: 'object',
properties: {
id: { type: 'number' },
b: { $ref: 'https://example.com/schemas/B' },
},
}

const schemaB = {
$id: 'https://example.com/schemas/B',
type: 'object',
properties: {
id: { type: 'number' },
},
}

await schemaRegistry.register(
{ type: SchemaType.JSON, schema: JSON.stringify(schemaB) },
{ subject: 'JSON:B' },
)

const response = await schemaRegistry.api.Subject.latestVersion({ subject: 'JSON:B' })
const { version } = JSON.parse(response.responseData)

const { id } = await schemaRegistry.register(
{
type: SchemaType.JSON,
schema: JSON.stringify(schemaA),
references: [
{
name: 'https://example.com/schemas/B',
subject: 'JSON:B',
version,
},
],
},
{ subject: 'JSON:A' },
)

const obj = { id: 1, b: { id: 2 } }

const buffer = await schemaRegistry.encode(id, obj)
const decodedObj = await schemaRegistry.decode(buffer)
```
85 changes: 85 additions & 0 deletions docs/schema-protobuf.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
---
id: schema-protobuf
title: Example Protobuf Schemas
sidebar_label: Example Protobuf Schemas
---

## Schema with references to other schemas

You might want to split the Protobuf definition into several schemas, one for each type.

```protobuf
syntax = "proto3";
package test;
import "test/B.proto";
message A {
int32 id = 1;
B b = 2;
}
```

```protobuf
syntax = "proto3";
package test;
message B {
int32 id = 1;
}
```

To register schemas with references, the schemas have to be registered in reverse order. The schema that references another schema has to be registered after the schema it references. In this example B has to be registered before A. Furthermore, when registering A, a list of references have to be provided. A reference consist of:

* `name` - String matching the import statement. For example: `test/B.proto`
* `subject` - the subject the schema is registered under in the registry
* `version` - the version of the schema you want to use

The library will handle an arbitrary number of nested levels.

```js
const schemaA = `
syntax = "proto3";
package test;
import "test/B.proto";
message A {
int32 id = 1;
B b = 2;
}`

const schemaB = `
syntax = "proto3";
package test;
message B {
int32 id = 1;
}`

await schemaRegistry.register(
{ type: SchemaType.PROTOBUF, schema: schemaB },
{ subject: 'Proto:B' },
)

const response = await schemaRegistry.api.Subject.latestVersion({ subject: 'Proto:B' })
const { version } = JSON.parse(response.responseData)

const { id } = await schemaRegistry.register(
{
type: SchemaType.PROTOBUF,
schema: schemaA,
references: [
{
name: 'test/B.proto',
subject: 'Proto:B',
version,
},
],
},
{ subject: 'Proto:A' },
)

const obj = { id: 1, b: { id: 2 } }

const buffer = await schemaRegistry.encode(id, obj)
const decodedObj = await schemaRegistry.decode(buffer)
```
29 changes: 25 additions & 4 deletions src/@types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,28 @@ export enum SchemaType {
PROTOBUF = 'PROTOBUF',
UNKNOWN = 'UNKNOWN',
}

export interface SchemaHelper {
validate(schema: Schema): void
getSubject(confluentSchema: ConfluentSchema, schema: Schema, separator: string): ConfluentSubject
toConfluentSchema(data: SchemaResponse): ConfluentSchema
updateOptionsFromSchemaReferences(
referencedSchemas: ConfluentSchema[],
options?: ProtocolOptions,
): ProtocolOptions
}

export type AvroOptions = Partial<ForSchemaOptions> & {
referencedSchemas?: AvroConfluentSchema[]
}

export type AvroOptions = Partial<ForSchemaOptions>
export type JsonOptions = ConstructorParameters<typeof Ajv>[0] & {
ajvInstance?: {
addSchema: Ajv['addSchema']
compile: (schema: any) => ValidateFunction
}
referencedSchemas?: JsonConfluentSchema[]
}
export type ProtoOptions = { messageName: string }
export type ProtoOptions = { messageName?: string; referencedSchemas?: ProtoConfluentSchema[] }

export interface LegacyOptions {
forSchemaOptions?: AvroOptions
Expand Down Expand Up @@ -60,16 +69,28 @@ export interface ConfluentSubject {
export interface AvroConfluentSchema {
type: SchemaType.AVRO
schema: string | RawAvroSchema
references?: SchemaReference[]
}

export type SchemaReference = {
name: string
subject: string
version: number
}
export interface ProtoConfluentSchema {
type: SchemaType.PROTOBUF
schema: string
references?: SchemaReference[]
}

export interface JsonConfluentSchema {
type: SchemaType.JSON
schema: string
references?: SchemaReference[]
}
export interface SchemaResponse {
schema: string
schemaType: string
references?: SchemaReference[]
}

export type ConfluentSchema = AvroConfluentSchema | ProtoConfluentSchema | JsonConfluentSchema
Expand Down
41 changes: 38 additions & 3 deletions src/AvroHelper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@ import {
ConfluentSchema,
SchemaHelper,
ConfluentSubject,
ProtocolOptions,
AvroConfluentSchema,
} from './@types'
import { ConfluentSchemaRegistryArgumentError } from './errors'
import avro from 'avsc'
import avro, { ForSchemaOptions, Schema, Type } from 'avsc'
import { SchemaResponse, SchemaType } from './@types'

type TypeHook = (schema: Schema, opts: ForSchemaOptions) => Type
export default class AvroHelper implements SchemaHelper {
private getRawAvroSchema(schema: ConfluentSchema): RawAvroSchema {
return (typeof schema.schema === 'string'
Expand All @@ -21,7 +25,27 @@ export default class AvroHelper implements SchemaHelper {
? schema
: this.getRawAvroSchema(schema)
// @ts-ignore TODO: Fix typings for Schema...
const avroSchema: AvroSchema = avro.Type.forSchema(rawSchema, opts)

const addReferencedSchemas = (userHook?: TypeHook): TypeHook => (
schema: avro.Schema,
opts: ForSchemaOptions,
) => {
const avroOpts = opts as AvroOptions
avroOpts?.referencedSchemas?.forEach(subSchema => {
const rawSubSchema = this.getRawAvroSchema(subSchema)
avroOpts.typeHook = userHook
avro.Type.forSchema(rawSubSchema, avroOpts)
})
if (userHook) {
return userHook(schema, opts)
}
}

const avroSchema = avro.Type.forSchema(rawSchema, {
...opts,
typeHook: addReferencedSchemas(opts?.typeHook),
})

return avroSchema
}

Expand All @@ -32,7 +56,7 @@ export default class AvroHelper implements SchemaHelper {
}

public getSubject(
schema: ConfluentSchema,
schema: AvroConfluentSchema,
// @ts-ignore
avroSchema: AvroSchema,
separator: string,
Expand All @@ -53,4 +77,15 @@ export default class AvroHelper implements SchemaHelper {
const asRawAvroSchema = schema as RawAvroSchema
return asRawAvroSchema.name != null && asRawAvroSchema.type != null
}

public toConfluentSchema(data: SchemaResponse): AvroConfluentSchema {
return { type: SchemaType.AVRO, schema: data.schema, references: data.references }
}

updateOptionsFromSchemaReferences(
referencedSchemas: AvroConfluentSchema[],
options: ProtocolOptions = {},
): ProtocolOptions {
return { ...options, [SchemaType.AVRO]: { ...options[SchemaType.AVRO], referencedSchemas } }
}
}
Loading

0 comments on commit 3f4dea4

Please sign in to comment.