Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(fxgcppubsub): Added Avro and Protobuf schemas support #16

Merged
merged 24 commits into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
cd12977
feat(fxgcppubsub): Added Avro and Protobuf schemas support
ekkinox Jun 10, 2024
d7ed86d
feat(fxgcppubsub): Added Avro and Protobuf schemas support
ekkinox Jun 10, 2024
f9bc3bc
feat(fxgcppubsub): Added Avro and Protobuf schemas support
ekkinox Jun 11, 2024
8868769
feat(fxgcppubsub): Added Avro and Protobuf schemas support
ekkinox Jun 11, 2024
2309e8d
feat(fxgcppubsub): Added Avro and Protobuf schemas support
ekkinox Jul 4, 2024
7fa4b40
feat(fxgcppubsub): Added Avro and Protobuf schemas support
ekkinox Jul 4, 2024
13aacb4
feat(fxgcppubsub): Added Avro and Protobuf schemas support
ekkinox Jul 4, 2024
8ed8547
feat(fxgcppubsub): Added Avro and Protobuf schemas support
ekkinox Jul 5, 2024
96e233a
feat(fxgcppubsub): Added Avro and Protobuf schemas support
ekkinox Jul 5, 2024
1425173
feat(fxgcppubsub): Added Avro and Protobuf schemas support
ekkinox Jul 5, 2024
7bffaca
feat(fxgcppubsub): Added Avro and Protobuf schemas support
ekkinox Jul 5, 2024
4d5bb45
feat(fxgcppubsub): Added Avro and Protobuf schemas support
ekkinox Jul 5, 2024
c2986b4
feat(fxgcppubsub): Added Avro and Protobuf schemas support
ekkinox Jul 5, 2024
9fd32de
feat(fxgcppubsub): Added Avro and Protobuf schemas support
ekkinox Jul 8, 2024
3d2c974
feat(fxgcppubsub): Added Avro and Protobuf schemas support
ekkinox Jul 8, 2024
f754f9d
feat(fxgcppubsub): Added Avro and Protobuf schemas support
ekkinox Jul 8, 2024
8b8758b
feat(fxgcppubsub): Added Avro and Protobuf schemas support
ekkinox Jul 8, 2024
5fa49af
feat(fxgcppubsub): Added Avro and Protobuf schemas support
ekkinox Jul 8, 2024
5566e6d
feat(fxgcppubsub): Added Avro and Protobuf schemas support
ekkinox Jul 8, 2024
759f653
feat(fxgcppubsub): Added Avro and Protobuf schemas support
ekkinox Jul 8, 2024
538a845
feat(fxgcppubsub): Added Avro and Protobuf schemas support
ekkinox Jul 8, 2024
90cd753
feat(fxgcppubsub): Added Avro and Protobuf schemas support
ekkinox Jul 8, 2024
2e01a6d
feat(fxgcppubsub): Added Avro and Protobuf schemas support
ekkinox Jul 8, 2024
0a539f8
feat(fxgcppubsub): Added Avro and Protobuf schemas support
ekkinox Jul 8, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion fxgcppubsub/.golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ linters:
- importas
- ineffassign
- interfacebloat
- logrlint
- loggercheck
- maintidx
- makezero
- misspell
Expand Down
254 changes: 254 additions & 0 deletions fxgcppubsub/codec/codec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
package codec

import (
"encoding/json"
"fmt"

"cloud.google.com/go/pubsub"
"github.com/hamba/avro/v2"
"github.com/linkedin/goavro/v2"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
)

var _ Codec = (*DefaultCodec)(nil)

// Codec is the interface for components in charge to handle raw, avro or protobuf encoding and decoding.
type Codec interface {
Encode(in any) ([]byte, error)
Decode(enc []byte, out any) error
}

// DefaultCodec is the default Codec implementation.
type DefaultCodec struct {
schemaType pubsub.SchemaType
schemaEncoding pubsub.SchemaEncoding
schemaDefinition string
}

// NewDefaultCodec returns a new DefaultCodec instance.
func NewDefaultCodec(schemaType pubsub.SchemaType, schemaEncoding pubsub.SchemaEncoding, schemaDefinition string) *DefaultCodec {
return &DefaultCodec{
schemaType: schemaType,
schemaEncoding: schemaEncoding,
schemaDefinition: schemaDefinition,
}
}

// Encode encodes an input into an avro or protobuf encoded slice of bytes.
//
//nolint:cyclop,exhaustive
func (c *DefaultCodec) Encode(in any) ([]byte, error) {
Skyld marked this conversation as resolved.
Show resolved Hide resolved
switch c.schemaType {
case pubsub.SchemaTypeUnspecified:
return []byte(fmt.Sprintf("%s", in)), nil
case pubsub.SchemaAvro:
switch c.schemaEncoding {
case pubsub.EncodingBinary:
return c.encodeAvroBinary(in)
case pubsub.EncodingJSON:
return c.encodeAvroJSON(in)
default:
return nil, fmt.Errorf("invalid avro encoding")
}
case pubsub.SchemaProtocolBuffer:
switch c.schemaEncoding {
case pubsub.EncodingBinary:
return c.encodeProtoBinary(in)
case pubsub.EncodingJSON:
return c.encodeProtoJSON(in)
default:
return nil, fmt.Errorf("invalid proto encoding")
}
default:
return nil, fmt.Errorf("invalid schema type")
}
}

// Decode decodes an avro or protobuf encoded slice of bytes into a provided output.
//
//nolint:cyclop,exhaustive
func (c *DefaultCodec) Decode(enc []byte, out any) error {
switch c.schemaType {
//nolint:exhaustive
case pubsub.SchemaTypeUnspecified:
return fmt.Errorf("data without schema cannot be decoded")
case pubsub.SchemaAvro:
switch c.schemaEncoding {
case pubsub.EncodingBinary:
return c.decodeAvroBinary(enc, out)
case pubsub.EncodingJSON:
return c.decodeAvroJSON(enc, out)
default:
return fmt.Errorf("invalid avro encoding")
}
case pubsub.SchemaProtocolBuffer:
switch c.schemaEncoding {
case pubsub.EncodingBinary:
return c.decodeProtoBinary(enc, out)
case pubsub.EncodingJSON:
return c.decodeProtoJSON(enc, out)
default:
return fmt.Errorf("invalid proto encoding")
}
default:
return fmt.Errorf("invalid schema type")
}
}

func (c *DefaultCodec) encodeAvroBinary(in any) ([]byte, error) {
avroSchema, err := avro.Parse(c.schemaDefinition)
if err != nil {
return nil, fmt.Errorf("cannot parse avro schema: %w", err)
}

Check warning on line 103 in fxgcppubsub/codec/codec.go

View check run for this annotation

Codecov / codecov/patch

fxgcppubsub/codec/codec.go#L102-L103

Added lines #L102 - L103 were not covered by tests

out, err := avro.Marshal(avroSchema, in)
if err != nil {
return nil, fmt.Errorf("cannot encode avro binary: %w", err)
}

return out, nil
}

func (c *DefaultCodec) decodeAvroBinary(enc []byte, out any) error {
avroSchema, err := avro.Parse(c.schemaDefinition)
if err != nil {
return fmt.Errorf("cannot parse avro schema: %w", err)
}

Check warning on line 117 in fxgcppubsub/codec/codec.go

View check run for this annotation

Codecov / codecov/patch

fxgcppubsub/codec/codec.go#L116-L117

Added lines #L116 - L117 were not covered by tests

err = avro.Unmarshal(avroSchema, enc, out)
if err != nil {
return fmt.Errorf("cannot decode avro binary: %w", err)
}

return nil
}

func (c *DefaultCodec) encodeAvroJSON(in any) ([]byte, error) {
avroSchema, err := goavro.NewCodec(c.schemaDefinition)
if err != nil {
return nil, fmt.Errorf("cannot parse avro schema: %w", err)
}

Check warning on line 131 in fxgcppubsub/codec/codec.go

View check run for this annotation

Codecov / codecov/patch

fxgcppubsub/codec/codec.go#L130-L131

Added lines #L130 - L131 were not covered by tests

inMap, err := c.convertStructIntoMap(in)
if err != nil {
return nil, fmt.Errorf("cannot convert struct into map: %w", err)
}

Check warning on line 136 in fxgcppubsub/codec/codec.go

View check run for this annotation

Codecov / codecov/patch

fxgcppubsub/codec/codec.go#L135-L136

Added lines #L135 - L136 were not covered by tests

out, err := avroSchema.TextualFromNative(nil, inMap)
if err != nil {
return nil, fmt.Errorf("cannot encode avro json: %w", err)
}

return out, nil
}

func (c *DefaultCodec) decodeAvroJSON(enc []byte, out any) error {
avroSchema, err := goavro.NewCodec(c.schemaDefinition)
if err != nil {
return fmt.Errorf("cannot parse avro schema: %w", err)
}

Check warning on line 150 in fxgcppubsub/codec/codec.go

View check run for this annotation

Codecov / codecov/patch

fxgcppubsub/codec/codec.go#L149-L150

Added lines #L149 - L150 were not covered by tests

data, _, err := avroSchema.NativeFromTextual(enc)
if err != nil {
return fmt.Errorf("cannot decode avro json: %w", err)
}

dataMap, ok := data.(map[string]interface{})
if !ok {
return fmt.Errorf("cannot convert avro json into map: %w", err)
}

Check warning on line 160 in fxgcppubsub/codec/codec.go

View check run for this annotation

Codecov / codecov/patch

fxgcppubsub/codec/codec.go#L159-L160

Added lines #L159 - L160 were not covered by tests

err = c.convertMapIntoStruct(dataMap, out)
if err != nil {
return fmt.Errorf("cannot convert map into struct: %w", err)
}

Check warning on line 165 in fxgcppubsub/codec/codec.go

View check run for this annotation

Codecov / codecov/patch

fxgcppubsub/codec/codec.go#L164-L165

Added lines #L164 - L165 were not covered by tests

return nil
}

func (c *DefaultCodec) encodeProtoBinary(in any) ([]byte, error) {
protoIn, ok := in.(proto.Message)
if !ok {
return nil, fmt.Errorf("invalid proto message")
}

out, err := proto.Marshal(protoIn)
if err != nil {
return nil, fmt.Errorf("cannot encode proto binary: %w", err)
}

Check warning on line 179 in fxgcppubsub/codec/codec.go

View check run for this annotation

Codecov / codecov/patch

fxgcppubsub/codec/codec.go#L178-L179

Added lines #L178 - L179 were not covered by tests

return out, nil
}

func (c *DefaultCodec) decodeProtoBinary(enc []byte, out any) error {
protoOut, ok := out.(proto.Message)
if !ok {
return fmt.Errorf("invalid proto message")
}

Check warning on line 188 in fxgcppubsub/codec/codec.go

View check run for this annotation

Codecov / codecov/patch

fxgcppubsub/codec/codec.go#L187-L188

Added lines #L187 - L188 were not covered by tests

err := proto.Unmarshal(enc, protoOut)
if err != nil {
return fmt.Errorf("cannot decode proto binary: %w", err)
}

return nil
}

func (c *DefaultCodec) encodeProtoJSON(in any) ([]byte, error) {
protoIn, ok := in.(proto.Message)
if !ok {
return nil, fmt.Errorf("invalid proto message")
}

out, err := protojson.Marshal(protoIn)
if err != nil {
return nil, fmt.Errorf("cannot encode proto json: %w", err)
}

Check warning on line 207 in fxgcppubsub/codec/codec.go

View check run for this annotation

Codecov / codecov/patch

fxgcppubsub/codec/codec.go#L206-L207

Added lines #L206 - L207 were not covered by tests

return out, nil
}

func (c *DefaultCodec) decodeProtoJSON(enc []byte, out any) error {
protoOut, ok := out.(proto.Message)
if !ok {
return fmt.Errorf("invalid proto message")
}

Check warning on line 216 in fxgcppubsub/codec/codec.go

View check run for this annotation

Codecov / codecov/patch

fxgcppubsub/codec/codec.go#L215-L216

Added lines #L215 - L216 were not covered by tests

err := protojson.Unmarshal(enc, protoOut)
if err != nil {
return fmt.Errorf("cannot decode proto json: %w", err)
}

return nil
}

func (c *DefaultCodec) convertStructIntoMap(in any) (map[string]interface{}, error) {
var out map[string]interface{}

jsonIn, err := json.Marshal(in)
if err != nil {
return nil, fmt.Errorf("cannot marshal json: %w", err)
}

Check warning on line 232 in fxgcppubsub/codec/codec.go

View check run for this annotation

Codecov / codecov/patch

fxgcppubsub/codec/codec.go#L231-L232

Added lines #L231 - L232 were not covered by tests

err = json.Unmarshal(jsonIn, &out)
if err != nil {
return nil, fmt.Errorf("cannot unmarshal json: %w", err)
}

Check warning on line 237 in fxgcppubsub/codec/codec.go

View check run for this annotation

Codecov / codecov/patch

fxgcppubsub/codec/codec.go#L236-L237

Added lines #L236 - L237 were not covered by tests

return out, nil
}

func (c *DefaultCodec) convertMapIntoStruct(in map[string]interface{}, out any) error {
jsonIn, err := json.Marshal(in)
if err != nil {
return fmt.Errorf("cannot marshal json: %w", err)
}

Check warning on line 246 in fxgcppubsub/codec/codec.go

View check run for this annotation

Codecov / codecov/patch

fxgcppubsub/codec/codec.go#L245-L246

Added lines #L245 - L246 were not covered by tests

err = json.Unmarshal(jsonIn, &out)
if err != nil {
return fmt.Errorf("cannot unmarshal json: %w", err)
}

Check warning on line 251 in fxgcppubsub/codec/codec.go

View check run for this annotation

Codecov / codecov/patch

fxgcppubsub/codec/codec.go#L250-L251

Added lines #L250 - L251 were not covered by tests

return nil
}
Loading
Loading