Skip to content

Commit

Permalink
feat(fxgcppubsub): Added Avro and Protobuf schemas support
Browse files Browse the repository at this point in the history
  • Loading branch information
ekkinox committed Jun 10, 2024
1 parent 10a1f03 commit cd12977
Show file tree
Hide file tree
Showing 17 changed files with 1,001 additions and 112 deletions.
197 changes: 197 additions & 0 deletions fxgcppubsub/codec/codec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
package codec

import (
"encoding/json"
"fmt"

"cloud.google.com/go/pubsub"
"github.com/hamba/avro/v2"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
)

type Codec struct {
schemaConfig *pubsub.SchemaConfig
schemaSettings *pubsub.SchemaSettings
}

func NewCodec(schemaConfig *pubsub.SchemaConfig, schemaSettings *pubsub.SchemaSettings) *Codec {
return &Codec{
schemaConfig: schemaConfig,
schemaSettings: schemaSettings,
}
}

func (c *Codec) Encode(in any) ([]byte, error) {
if c.schemaConfig == nil || c.schemaSettings == nil {
inBytes, ok := in.([]byte)
if !ok {
return nil, fmt.Errorf("data without schema must be of type []byte")
}

return inBytes, nil
}

var out []byte
var err error

switch c.schemaConfig.Type {
case pubsub.SchemaAvro:
switch c.schemaSettings.Encoding {
case pubsub.EncodingBinary:
out, err = c.encodeAvroBinary(in)
case pubsub.EncodingJSON:
out, err = c.encodeAvroJSON(in)
default:
err = fmt.Errorf("invalid avro encoding")
}
case pubsub.SchemaProtocolBuffer:
switch c.schemaSettings.Encoding {
case pubsub.EncodingBinary:
out, err = c.encodeProtoBinary(in)
case pubsub.EncodingJSON:
out, err = c.encodeProtoJSON(in)
default:
err = fmt.Errorf("invalid proto encoding")
}
default:
err = fmt.Errorf("invalid schema type")
}

return out, err
}

func (c *Codec) Decode(enc []byte, out any) error {
if c.schemaConfig == nil || c.schemaSettings == nil {
return fmt.Errorf("no schema associated, nothing to decode, use message data instead")
}

var err error

switch c.schemaConfig.Type {
case pubsub.SchemaAvro:
switch c.schemaSettings.Encoding {
case pubsub.EncodingBinary:
err = c.decodeAvroBinary(enc, out)
case pubsub.EncodingJSON:
err = c.decodeAvroJSON(enc, out)
default:
err = fmt.Errorf("invalid avro encoding")
}
case pubsub.SchemaProtocolBuffer:
switch c.schemaSettings.Encoding {
case pubsub.EncodingBinary:
err = c.decodeProtoBinary(enc, out)
case pubsub.EncodingJSON:
err = c.decodeProtoJSON(enc, out)
default:
err = fmt.Errorf("invalid proto encoding")
}
default:
err = fmt.Errorf("invalid schema type")
}

return err
}

func (c *Codec) encodeAvroBinary(in any) ([]byte, error) {
avroSchema, err := avro.Parse(c.schemaConfig.Definition)
if err != nil {
return nil, fmt.Errorf("cannot parse avro schema: %w", err)
}

out, err := avro.Marshal(avroSchema, in)
if err != nil {
return nil, fmt.Errorf("cannot encode avro binary: %w", err)
}

return out, nil
}

func (c *Codec) decodeAvroBinary(enc []byte, out any) error {
avroSchema, err := avro.Parse(c.schemaConfig.Definition)
if err != nil {
return fmt.Errorf("cannot parse avro schema: %w", err)
}

err = avro.Unmarshal(avroSchema, enc, out)
if err != nil {
return fmt.Errorf("cannot decode avro binary: %w", err)
}

return nil
}

func (c *Codec) encodeAvroJSON(in any) ([]byte, error) {
out, err := json.Marshal(in)
if err != nil {
return nil, fmt.Errorf("cannot encode avro json: %w", err)
}

return out, nil
}

func (c *Codec) decodeAvroJSON(enc []byte, out any) error {
err := json.Unmarshal(enc, out)
if err != nil {
return fmt.Errorf("cannot decode avro json: %w", err)
}

return nil
}

func (c *Codec) encodeProtoBinary(in any) ([]byte, error) {
protoIn, ok := in.(proto.Message)
if !ok {
return nil, fmt.Errorf("invalid proto message")
}

out, err := proto.Marshal(protoIn)
if err != nil {
return nil, fmt.Errorf("cannot encode proto binary: %w", err)
}

return out, nil
}

func (c *Codec) decodeProtoBinary(enc []byte, out any) error {
protoOut, ok := out.(proto.Message)
if !ok {
return fmt.Errorf("invalid proto message")
}

err := proto.Unmarshal(enc, protoOut)
if err != nil {
return fmt.Errorf("cannot decode proto binary: %w", err)
}

return nil
}

func (c *Codec) encodeProtoJSON(in any) ([]byte, error) {
protoIn, ok := in.(proto.Message)
if !ok {
return nil, fmt.Errorf("invalid proto message")
}

out, err := protojson.Marshal(protoIn)
if err != nil {
return nil, fmt.Errorf("cannot encode proto json: %w", err)
}

return out, nil
}

func (c *Codec) decodeProtoJSON(enc []byte, out any) error {
protoOut, ok := out.(proto.Message)
if !ok {
return fmt.Errorf("invalid proto message")
}

err := protojson.Unmarshal(enc, protoOut)
if err != nil {
return fmt.Errorf("cannot decode proto json: %w", err)
}

return nil
}
46 changes: 25 additions & 21 deletions fxgcppubsub/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,24 @@ module github.com/ankorstore/yokai-contrib/fxgcppubsub
go 1.20

require (
cloud.google.com/go/pubsub v1.37.0
cloud.google.com/go/pubsub v1.38.0
github.com/ankorstore/yokai/config v1.3.0
github.com/ankorstore/yokai/fxconfig v1.1.0
github.com/ankorstore/yokai/healthcheck v1.1.0
github.com/hamba/avro/v2 v2.22.1
github.com/stretchr/testify v1.9.0
go.uber.org/fx v1.21.0
google.golang.org/api v0.170.0
google.golang.org/grpc v1.62.1
go.uber.org/fx v1.22.0
google.golang.org/api v0.183.0
google.golang.org/grpc v1.64.0
google.golang.org/protobuf v1.34.1
)

require (
cloud.google.com/go v0.112.1 // indirect
cloud.google.com/go/compute v1.25.1 // indirect
cloud.google.com/go/compute/metadata v0.2.3 // indirect
cloud.google.com/go/iam v1.1.7 // indirect
cloud.google.com/go v0.114.0 // indirect
cloud.google.com/go/auth v0.5.1 // indirect
cloud.google.com/go/auth/oauth2adapt v0.2.2 // indirect
cloud.google.com/go/compute/metadata v0.3.0 // indirect
cloud.google.com/go/iam v1.1.8 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
Expand All @@ -28,10 +31,13 @@ require (
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/s2a-go v0.1.7 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
github.com/googleapis/gax-go/v2 v2.12.3 // indirect
github.com/googleapis/gax-go/v2 v2.12.4 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pelletier/go-toml/v2 v2.1.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/rogpeppe/go-internal v1.12.0 // indirect
Expand All @@ -43,7 +49,7 @@ require (
github.com/spf13/pflag v1.0.5 // indirect
github.com/spf13/viper v1.18.2 // indirect
github.com/subosito/gotenv v1.6.0 // indirect
go.einride.tech/aip v0.66.0 // indirect
go.einride.tech/aip v0.67.1 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect
Expand All @@ -54,19 +60,17 @@ require (
go.uber.org/dig v1.17.1 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/crypto v0.21.0 // indirect
golang.org/x/crypto v0.23.0 // indirect
golang.org/x/exp v0.0.0-20240314144324-c7f7c6466f7f // indirect
golang.org/x/net v0.22.0 // indirect
golang.org/x/oauth2 v0.18.0 // indirect
golang.org/x/sync v0.6.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/net v0.25.0 // indirect
golang.org/x/oauth2 v0.21.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.20.0 // indirect
golang.org/x/text v0.15.0 // indirect
golang.org/x/time v0.5.0 // indirect
google.golang.org/appengine v1.6.8 // indirect
google.golang.org/genproto v0.0.0-20240314234333-6e1732d8331c // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240314234333-6e1732d8331c // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240314234333-6e1732d8331c // indirect
google.golang.org/protobuf v1.33.0 // indirect
google.golang.org/genproto v0.0.0-20240528184218-531527333157 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240521202816-d264139d666e // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
Loading

0 comments on commit cd12977

Please sign in to comment.