Skip to content

Latest commit

 

History

History

fxgcppubsub

Yokai GCP Pub/Sub Module

ci go report codecov Deps PkgGoDev

Yokai module for GCP Pub/Sub.

Overview

This module provides to your Yokai application the possibility to publish and/or subscribe on a GCP Pub/Sub instance.

It also provides the support of Avro and Protobuf schemas.

Installation

First install the module:

go get github.com/ankorstore/yokai-contrib/fxgcppubsub

Then activate it in your application bootstrapper:

// internal/bootstrap.go
package internal

import (
	"github.com/ankorstore/yokai/fxcore"
	"github.com/ankorstore/yokai-contrib/fxgcppubsub"
)

var Bootstrapper = fxcore.NewBootstrapper().WithOptions(
	// load fxgcppubsub module
	fxgcppubsub.FxGcpPubSubModule,
	// ...
)

Configuration

Configuration reference:

# ./configs/config.yaml
modules:
  gcppubsub:
    project:
      id: ${GCP_PROJECT_ID}  # GCP project id
    healthcheck:
      topics:                # list of topics to check for the topics probe
        - some-topic         # refers to projects/${GCP_PROJECT_ID}/topics/some-topic
      subscriptions:         # list of subscriptions to check for the subscriptions probe
        - some-subscription  # refers to projects/${GCP_PROJECT_ID}/subscriptions/some-subscription

Publish

This module provides a high level Publisher that you can inject anywhere to publish messages on a topic.

If the topic is associated to an avro or protobuf schema, the publisher will automatically handle the message encoding.

This module also provides a pubsub.Client, that you can use for low level publishing.

Raw message

To publish a raw message (without associated schema) on a topic:

// publish on projects/${GCP_PROJECT_ID}/topics/some-topic
res, err := publisher.Publish(context.Backgound(), "some-topic", "some message")

Avro message

The publisher can accept any struct, and will automatically handle the avro encoding based on the following tags:

Considering this avro schema:

{
  "namespace": "Simple",
  "type": "record",
  "name": "Avro",
  "fields": [
    {
      "name": "StringField",
      "type": "string"
    },
    {
      "name": "FloatField",
      "type": "float"
    },
    {
      "name": "BooleanField",
      "type": "boolean"
    }
  ]
}

To publish a message on a topic associated to this avro schema:

// struct with tags, representing the message
type SimpleRecord struct {
    StringField  string  `avro:"StringField" json:"StringField"`
    FloatField   float32 `avro:"FloatField" json:"FloatField"`
    BooleanField bool    `avro:"BooleanField" json:"BooleanField"`
}

// publish on projects/${GCP_PROJECT_ID}/topics/some-topic
res, err := publisher.Publish(context.Backgound(), "some-topic", &SimpleRecord{
    StringField:  "some string",
    FloatField:   12.34,
    BooleanField: true,
})

Protobuf message

The publisher can accept any proto.Message, and will automatically handle the protobuf binary or json encoding.

Considering this protobuf schema:

syntax = "proto3";

package simple;

option go_package = "github.com/ankorstore/yokai-contrib/fxgcppubsub/testdata/proto";

message SimpleRecord {
    string string_field = 1;
    float float_field = 2;
    bool boolean_field = 3;
}

To publish a message on a topic associated to this protobuf schema:

// generated protobuf stub proto.Message, representing the message
type SimpleRecord struct {
    state         protoimpl.MessageState
    sizeCache     protoimpl.SizeCache
    unknownFields protoimpl.UnknownFields
    
    StringField  string  `protobuf:"bytes,1,opt,name=string_field,json=stringField,proto3" json:"string_field,omitempty"`
    FloatField   float32 `protobuf:"fixed32,2,opt,name=float_field,json=floatField,proto3" json:"float_field,omitempty"`
    BooleanField bool    `protobuf:"varint,3,opt,name=boolean_field,json=booleanField,proto3" json:"boolean_field,omitempty"`
}

// publish on projects/${GCP_PROJECT_ID}/topics/some-topic
res, err := publisher.Publish(context.Backgound(), "some-topic", &SimpleRecord{
    StringField:  "test proto",
    FloatField:   56.78,
    BooleanField: false,
})

Subscribe

This module provides a high level Subscriber that you can inject anywhere to subscribe messages from a subscription.

If the subscription's topic is associated to an avro or protobuf schema, the subscriber will offer a message from which you can handle the decoding.

This module also provides a pubsub.Client, that you can use for low level subscribing.

Raw message

To subscribe on a subscription receiving raw messages (without associated schema):

// subscribe from projects/${GCP_PROJECT_ID}/subscriptions/some-subscription
err := subscriber.Subscribe(ctx, "some-subscription", func(ctx context.Context, m *message.Message) {
    fmt.Printf("%s", m.Data())
    
    m.Ack()
})

Avro message

The subscriber message can be decoded into any struct with the following tags:

Considering this avro schema:

{
  "namespace": "Simple",
  "type": "record",
  "name": "Avro",
  "fields": [
    {
      "name": "StringField",
      "type": "string"
    },
    {
      "name": "FloatField",
      "type": "float"
    },
    {
      "name": "BooleanField",
      "type": "boolean"
    }
  ]
}

To subscribe from a subscription associated to this avro schema:

// struct with tags, representing the message
type SimpleRecord struct {
    StringField  string  `avro:"StringField" json:"StringField"`
    FloatField   float32 `avro:"FloatField" json:"FloatField"`
    BooleanField bool    `avro:"BooleanField" json:"BooleanField"`
}

// subscribe from projects/${GCP_PROJECT_ID}/subscriptions/some-subscription
err := subscriber.Subscribe(ctx, "some-subscription", func(ctx context.Context, m *message.Message) {
    var rec SimpleRecord
    
    err = m.Decode(&rec)
    if err != nil {
        m.Nack()
    }
    
    fmt.Printf("%v", rec)
    
    m.Ack()
})

Protobuf message

The subscriber message can be decoded into any proto.Message, for protobuf binary or json encoding.

Considering this protobuf schema:

syntax = "proto3";

package simple;

option go_package = "github.com/ankorstore/yokai-contrib/fxgcppubsub/testdata/proto";

message SimpleRecord {
  string string_field = 1;
  float float_field = 2;
  bool boolean_field = 3;
}

To subscribe from a subscription associated to this protobuf schema:

// generated protobuf stub proto.Message, representing the message
type SimpleRecord struct {
    state         protoimpl.MessageState
    sizeCache     protoimpl.SizeCache
    unknownFields protoimpl.UnknownFields
    
    StringField  string  `protobuf:"bytes,1,opt,name=string_field,json=stringField,proto3" json:"string_field,omitempty"`
    FloatField   float32 `protobuf:"fixed32,2,opt,name=float_field,json=floatField,proto3" json:"float_field,omitempty"`
    BooleanField bool    `protobuf:"varint,3,opt,name=boolean_field,json=booleanField,proto3" json:"boolean_field,omitempty"`
}

// subscribe from projects/${GCP_PROJECT_ID}/subscriptions/some-subscription
err := subscriber.Subscribe(ctx, "some-subscription", func(ctx context.Context, m *message.Message) {
    var rec SimpleRecord
    
    err = m.Decode(&rec)
    if err != nil {
        m.Nack()
    }
    
    fmt.Printf("%v", rec)
    
    m.Ack()
})

Health Check

This module provides ready to use health check probes, to be used by the Health Check module:

Considering the following configuration:

# ./configs/config.yaml
app:
modules:
  gcppubsub:
    project:
      id: ${GCP_PROJECT_ID}   # GCP project id
    healthcheck:
      topics:                 # list of topics to check for the topics probe
        - some-topic          # refers to projects/${GCP_PROJECT_ID}/topics/some-topic
        - other-topic         # refers to projects/${GCP_PROJECT_ID}/topics/other-topic
      subscriptions:          # list of subscriptions to check for the subscriptions probe
        - some-subscription   # refers to projects/${GCP_PROJECT_ID}/subscriptions/some-subscription
        - other-subscription  # refers to projects/${GCP_PROJECT_ID}/subscriptions/other-subscription

To activate those probes, you just need to register them:

// internal/services.go
package internal

import (
	"github.com/ankorstore/yokai/fxhealthcheck"
	"github.com/ankorstore/yokai-contrib/fxgcppubsub/healthcheck"
	"go.uber.org/fx"
)

func ProvideServices() fx.Option {
	return fx.Options(
		// register the GcpPubSubTopicsProbe for some-topic and other-topic
		fxhealthcheck.AsCheckerProbe(healthcheck.NewGcpPubSubTopicsProbe),
		// register the GcpPubSubSubscriptionsProbe for some-subscription and other-subscription
		fxhealthcheck.AsCheckerProbe(healthcheck.NewGcpPubSubSubscriptionsProbe),
		// ...
	)
}

Notes:

  • if your application is interested only in publishing, activate the GcpPubSubTopicsProbe only
  • if it is interested only in subscribing, activate the GcpPubSubSubscriptionsProbe only

Testing

In test mode:

are all configured to work against a pstest.Server, avoiding the need to spin up any Pub/Sub real (or emulator) instance, for better tests portability.

This means that you can create topics, subscriptions and schemas locally only for your tests.

For example:

// internal/example/example_test.go
package example_test

import (
	"context"
	"testing"
	"time"
	
	"github.com/ankorstore/yokai-contrib/fxgcppubsub"
	"github.com/ankorstore/yokai-contrib/fxgcppubsub/message"
	"github.com/ankorstore/yokai-contrib/fxgcppubsub/reactor/ack"
	"github.com/ankorstore/yokai/fxconfig"
	"github.com/stretchr/testify/assert"
	"github.com/foo/bar/internal"
	"go.uber.org/fx"
	"go.uber.org/fx/fxtest"
)

func TestPubSub(t *testing.T) {
	t.Setenv("APP_ENV", "test")
	t.Setenv("APP_CONFIG_PATH", "testdata/config")
	t.Setenv("GCP_PROJECT_ID", "test-project")

	var publisher fxgcppubsub.Publisher
	var subscriber fxgcppubsub.Subscriber
	var supervisor ack.AckSupervisor

	ctx := context.Background()

	// test app
	internal.RunTest(
		t,
		// prepare test topic and subscription
		fxgcppubsub.PrepareTopicAndSubscription(fxgcppubsub.PrepareTopicAndSubscriptionParams{
			TopicID:        "test-topic",
			SubscriptionID: "test-subscription",
		}),
		fx.Populate(&publisher, &subscriber, &supervisor),
	)
	
	// publish to test-topic
	_, err := publisher.Publish(ctx, "test-topic", "test data")
	assert.NoError(t, err)

	// subscribe from test-subscription
	waiter := supervisor.StartAckWaiter("test-subscription")
	
	go subscriber.Subscribe(ctx, "test-subscription", func(ctx context.Context, m *message.Message) {
		assert.Equal(t, []byte("test data"), m.Data())

		m.Ack()
	})

	// wait for subscriber message ack
	_, err = waiter.WaitMaxDuration(ctx, time.Second)
	assert.NoError(t, err)
}

Notes:

  • you can prepare the test topics, subscriptions and schemas using the provided helpers
  • you can find tests involving avro and protobuf schemas in the module test examples