Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(flagd): Support supplying a custom sync provider for in-process flagd #598

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions providers/flagd/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,27 @@ openfeature.SetProvider(provider)
The provider will attempt to detect file changes, but this is a best-effort attempt as file system events differ between operating systems.
This mode is useful for local development, tests and offline applications.

#### Custom sync provider

In-process resolver can also be configured with a custom sync provider to change how the in-process resolver fetches flags.
The custom sync provider must implement the [sync.ISync interface](https://github.com/open-feature/flagd/blob/main/core/pkg/sync/isync.go)

```go
var syncProvider sync.ISync = MyAwesomeSyncProvider{}
var syncProviderUri string = "myawesome://sync.uri"

provider := flagd.NewProvider(
flagd.WithInProcessResolver(),
flagd.WithCustomSyncProvider(syncProvider, syncProviderUri))
openfeature.SetProvider(provider)
```

> [!IMPORTANT]
> Note that you can only use a single flag source (either gRPC or offline file) for the in-process resolver.
> If both sources are configured, offline mode will be selected.
> Note that the in-process resolver can only use a single flag source.
> If multiple sources are configured then only one would be selected based on the following order of preference:
> 1. Custom sync provider
> 2. Offline file
> 3. gRPC

## Configuration options

Expand Down
8 changes: 6 additions & 2 deletions providers/flagd/pkg/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package flagd

import (
"fmt"
"github.com/go-logr/logr"
"github.com/open-feature/go-sdk-contrib/providers/flagd/internal/cache"
"os"
"strconv"

"github.com/go-logr/logr"
"github.com/open-feature/flagd/core/pkg/sync"
"github.com/open-feature/go-sdk-contrib/providers/flagd/internal/cache"
)

type ResolverType string
Expand Down Expand Up @@ -52,6 +54,8 @@ type providerConfiguration struct {
Selector string
SocketPath string
TLSEnabled bool
CustomSyncProvider sync.ISync
CustomSyncProviderUri string
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need the URI as well?
Is it only to be returned from the makeSyncProvider?

It worries me a little bit because the URI here is not used at all. Caller needs to make sure they return a valid URI for the provider. At the same time most (all?) providers have the uri inside. Maybe it's possible to extend the interface to include a GetURI method?


log logr.Logger
}
Expand Down
32 changes: 23 additions & 9 deletions providers/flagd/pkg/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@ package flagd
import (
"context"
"fmt"

parallel "sync"

"github.com/go-logr/logr"
"github.com/open-feature/flagd/core/pkg/sync"
"github.com/open-feature/go-sdk-contrib/providers/flagd/internal/cache"
"github.com/open-feature/go-sdk-contrib/providers/flagd/internal/logger"
"github.com/open-feature/go-sdk-contrib/providers/flagd/pkg/service/in_process"
process "github.com/open-feature/go-sdk-contrib/providers/flagd/pkg/service/in_process"
rpcService "github.com/open-feature/go-sdk-contrib/providers/flagd/pkg/service/rpc"
of "github.com/open-feature/go-sdk/openfeature"
"sync"
)

type Provider struct {
Expand All @@ -18,7 +21,7 @@ type Provider struct {
providerConfiguration *providerConfiguration
service IService
status of.State
mtx sync.RWMutex
mtx parallel.RWMutex

eventStream chan of.Event
}
Expand Down Expand Up @@ -71,12 +74,14 @@ func NewProvider(opts ...ProviderOption) *Provider {
provider.providerConfiguration.EventStreamConnectionMaxAttempts)
} else {
service = process.NewInProcessService(process.Configuration{
Host: provider.providerConfiguration.Host,
Port: provider.providerConfiguration.Port,
Selector: provider.providerConfiguration.Selector,
TargetUri: provider.providerConfiguration.TargetUri,
TLSEnabled: provider.providerConfiguration.TLSEnabled,
OfflineFlagSource: provider.providerConfiguration.OfflineFlagSourcePath,
Host: provider.providerConfiguration.Host,
Port: provider.providerConfiguration.Port,
Selector: provider.providerConfiguration.Selector,
TargetUri: provider.providerConfiguration.TargetUri,
TLSEnabled: provider.providerConfiguration.TLSEnabled,
OfflineFlagSource: provider.providerConfiguration.OfflineFlagSourcePath,
CustomSyncProvider: provider.providerConfiguration.CustomSyncProvider,
CustomSyncProviderUri: provider.providerConfiguration.CustomSyncProviderUri,
})
}

Expand Down Expand Up @@ -324,3 +329,12 @@ func FromEnv() ProviderOption {
p.providerConfiguration.updateFromEnvVar()
}
}

// WithCustomSyncProvider provides a custom implementation of the sync.ISync interface used by the inProcess Service
// This is only useful with inProcess resolver type
func WithCustomSyncProvider(customSyncProvider sync.ISync, customSyncProviderUri string) ProviderOption {
return func(p *Provider) {
p.providerConfiguration.CustomSyncProvider = customSyncProvider
p.providerConfiguration.CustomSyncProviderUri = customSyncProviderUri
}
}
195 changes: 121 additions & 74 deletions providers/flagd/pkg/provider_test.go
Original file line number Diff line number Diff line change
@@ -1,56 +1,67 @@
package flagd

import (
"testing"

"github.com/open-feature/flagd/core/pkg/sync"
"github.com/open-feature/go-sdk-contrib/providers/flagd/internal/cache"
"github.com/open-feature/go-sdk-contrib/providers/flagd/internal/mock"
process "github.com/open-feature/go-sdk-contrib/providers/flagd/pkg/service/in_process"
of "github.com/open-feature/go-sdk/openfeature"
"go.uber.org/mock/gomock"
"testing"
)

func TestNewProvider(t *testing.T) {
customSyncProvider := process.NewDoNothingCustomSyncProvider()

tests := []struct {
name string
expectedResolver ResolverType
expectPort uint16
expectHost string
expectTargetUri string
expectCacheType cache.Type
expectCertPath string
expectMaxRetries int
expectCacheSize int
expectOtelIntercept bool
expectSocketPath string
expectTlsEnabled bool
options []ProviderOption
name string
expectedResolver ResolverType
expectPort uint16
expectHost string
expectTargetUri string
expectCacheType cache.Type
expectCertPath string
expectMaxRetries int
expectCacheSize int
expectOtelIntercept bool
expectSocketPath string
expectTlsEnabled bool
expectCustomSyncProvider sync.ISync
expectCustomSyncProviderUri string
options []ProviderOption
}{
{
name: "default construction",
expectedResolver: rpc,
expectPort: defaultRpcPort,
expectHost: defaultHost,
expectTargetUri: "",
expectCacheType: defaultCache,
expectCertPath: "",
expectMaxRetries: defaultMaxEventStreamRetries,
expectCacheSize: defaultMaxCacheSize,
expectOtelIntercept: false,
expectSocketPath: "",
expectTlsEnabled: false,
name: "default construction",
expectedResolver: rpc,
expectPort: defaultRpcPort,
expectHost: defaultHost,
expectTargetUri: "",
expectCacheType: defaultCache,
expectCertPath: "",
expectMaxRetries: defaultMaxEventStreamRetries,
expectCacheSize: defaultMaxCacheSize,
expectOtelIntercept: false,
expectSocketPath: "",
expectTlsEnabled: false,
expectCustomSyncProvider: nil,
expectCustomSyncProviderUri: "",
},
{
name: "with options",
expectedResolver: inProcess,
expectPort: 9090,
expectHost: "myHost",
expectTargetUri: "",
expectCacheType: cache.LRUValue,
expectCertPath: "/path",
expectMaxRetries: 2,
expectCacheSize: 2500,
expectOtelIntercept: true,
expectSocketPath: "/socket",
expectTlsEnabled: true,
name: "with options",
expectedResolver: inProcess,
expectPort: 9090,
expectHost: "myHost",
expectTargetUri: "",
expectCacheType: cache.LRUValue,
expectCertPath: "/path",
expectMaxRetries: 2,
expectCacheSize: 2500,
expectOtelIntercept: true,
expectSocketPath: "/socket",
expectTlsEnabled: true,
expectCustomSyncProvider: nil,
expectCustomSyncProviderUri: "",
options: []ProviderOption{
WithInProcessResolver(),
WithSocketPath("/socket"),
Expand All @@ -63,57 +74,83 @@ func TestNewProvider(t *testing.T) {
},
},
{
name: "default port handling with in-process resolver",
expectedResolver: inProcess,
expectPort: defaultInProcessPort,
expectHost: defaultHost,
expectCacheType: defaultCache,
expectTargetUri: "",
expectCertPath: "",
expectMaxRetries: defaultMaxEventStreamRetries,
expectCacheSize: defaultMaxCacheSize,
expectOtelIntercept: false,
expectSocketPath: "",
expectTlsEnabled: false,
name: "default port handling with in-process resolver",
expectedResolver: inProcess,
expectPort: defaultInProcessPort,
expectHost: defaultHost,
expectCacheType: defaultCache,
expectTargetUri: "",
expectCertPath: "",
expectMaxRetries: defaultMaxEventStreamRetries,
expectCacheSize: defaultMaxCacheSize,
expectOtelIntercept: false,
expectSocketPath: "",
expectTlsEnabled: false,
expectCustomSyncProvider: nil,
expectCustomSyncProviderUri: "",
options: []ProviderOption{
WithInProcessResolver(),
},
},
{
name: "default port handling with in-process resolver",
expectedResolver: rpc,
expectPort: defaultRpcPort,
expectHost: defaultHost,
expectTargetUri: "",
expectCacheType: defaultCache,
expectCertPath: "",
expectMaxRetries: defaultMaxEventStreamRetries,
expectCacheSize: defaultMaxCacheSize,
expectOtelIntercept: false,
expectSocketPath: "",
expectTlsEnabled: false,
name: "default port handling with in-process resolver",
expectedResolver: rpc,
expectPort: defaultRpcPort,
expectHost: defaultHost,
expectTargetUri: "",
expectCacheType: defaultCache,
expectCertPath: "",
expectMaxRetries: defaultMaxEventStreamRetries,
expectCacheSize: defaultMaxCacheSize,
expectOtelIntercept: false,
expectSocketPath: "",
expectTlsEnabled: false,
expectCustomSyncProvider: nil,
expectCustomSyncProviderUri: "",
options: []ProviderOption{
WithRPCResolver(),
},
},
{
name: "with target uri with in-process resolver",
expectedResolver: inProcess,
expectPort: defaultInProcessPort,
expectHost: defaultHost,
expectCacheType: defaultCache,
expectTargetUri: "envoy://localhost:9211/test.service",
expectCertPath: "",
expectMaxRetries: defaultMaxEventStreamRetries,
expectCacheSize: defaultMaxCacheSize,
expectOtelIntercept: false,
expectSocketPath: "",
expectTlsEnabled: false,
name: "with target uri with in-process resolver",
expectedResolver: inProcess,
expectPort: defaultInProcessPort,
expectHost: defaultHost,
expectCacheType: defaultCache,
expectTargetUri: "envoy://localhost:9211/test.service",
expectCertPath: "",
expectMaxRetries: defaultMaxEventStreamRetries,
expectCacheSize: defaultMaxCacheSize,
expectOtelIntercept: false,
expectSocketPath: "",
expectTlsEnabled: false,
expectCustomSyncProvider: nil,
expectCustomSyncProviderUri: "",
options: []ProviderOption{
WithInProcessResolver(),
WithTargetUri("envoy://localhost:9211/test.service"),
},
},
{
name: "with custom sync provider with in-process resolver",
expectedResolver: inProcess,
expectPort: defaultInProcessPort,
expectHost: defaultHost,
expectCacheType: defaultCache,
expectTargetUri: "",
expectCertPath: "",
expectMaxRetries: defaultMaxEventStreamRetries,
expectCacheSize: defaultMaxCacheSize,
expectOtelIntercept: false,
expectSocketPath: "",
expectTlsEnabled: false,
expectCustomSyncProvider: customSyncProvider,
expectCustomSyncProviderUri: "testsyncer://custom.uri",
options: []ProviderOption{
WithInProcessResolver(),
WithCustomSyncProvider(customSyncProvider, "testsyncer://custom.uri"),
},
},
}

for _, test := range tests {
Expand Down Expand Up @@ -172,6 +209,16 @@ func TestNewProvider(t *testing.T) {
test.expectTargetUri, config.TargetUri)
}

if config.CustomSyncProvider != test.expectCustomSyncProvider {
t.Errorf("incorrect configuration CustomSyncProvider, expected %v, got %v",
test.expectCustomSyncProvider, config.CustomSyncProvider)
}

if config.CustomSyncProviderUri != test.expectCustomSyncProviderUri {
t.Errorf("incorrect configuration CustomSyncProviderUri, expected %v, got %v",
test.expectCustomSyncProviderUri, config.CustomSyncProviderUri)
}

// this line will fail linting if this provider is no longer compatible with the openfeature sdk
var _ of.FeatureProvider = flagdProvider
})
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package process

import (
context "context"

"github.com/open-feature/flagd/core/pkg/sync"
)

// Fake implementation of sync.ISync. Does not conform to the contract because it does not send any events to the DataSync.
// Only used for unit tests.
type DoNothingCustomSyncProvider struct {
}

func (fps DoNothingCustomSyncProvider) Init(ctx context.Context) error {
return nil
}

func (fps DoNothingCustomSyncProvider) IsReady() bool {
return true
}

func (fps DoNothingCustomSyncProvider) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
return nil
}

func (fps DoNothingCustomSyncProvider) ReSync(ctx context.Context, dataSync chan<- sync.DataSync) error {
return nil
}

// Returns an implementation of sync.ISync interface that does nothing at all.
// The returned implementation does not conform to the sync.DataSync contract.
// This is useful only for unit tests.
func NewDoNothingCustomSyncProvider() sync.ISync {
return DoNothingCustomSyncProvider{}
}
Loading
Loading