Skip to content

Commit

Permalink
feat(ofrep): implement OFREP bulk provider
Browse files Browse the repository at this point in the history
Signed-off-by: Roman Dmytrenko <[email protected]>
  • Loading branch information
erka committed Nov 10, 2024
1 parent 629a082 commit e7ffc6b
Show file tree
Hide file tree
Showing 8 changed files with 534 additions and 10 deletions.
137 changes: 137 additions & 0 deletions providers/ofrep/bulk_provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package ofrep

import (
"context"
"sync"
"time"

"github.com/open-feature/go-sdk-contrib/providers/ofrep/internal/evaluate"
"github.com/open-feature/go-sdk-contrib/providers/ofrep/internal/outbound"
of "github.com/open-feature/go-sdk/openfeature"
)

const providerName = "OFREP Bulk Provider"

func NewBulkProvider(baseUri string, options ...Option) *BulkProvider {
cfg := outbound.Configuration{
BaseURI: baseUri,
PollingInterval: 30 * time.Second,
}

for _, option := range options {
option(&cfg)
}

return &BulkProvider{
events: make(chan of.Event, 10),
cfg: cfg,
state: of.NotReadyState,
}
}

var (
_ of.FeatureProvider = (*BulkProvider)(nil) // ensure BulkProvider implements FeatureProvider
_ of.StateHandler = (*BulkProvider)(nil) // ensure BulkProvider implements StateHandler
_ of.EventHandler = (*BulkProvider)(nil) // ensure BulkProvider implements EventHandler
)

type BulkProvider struct {
Provider
cfg outbound.Configuration
state of.State
mu sync.RWMutex
events chan of.Event
cancelFunc context.CancelFunc
}

func (p *BulkProvider) Status() of.State {
p.mu.RLock()
defer p.mu.RUnlock()
return p.state
}

func (p *BulkProvider) Init(evalCtx of.EvaluationContext) error {
p.mu.Lock()
defer p.mu.Unlock()

if p.state != of.NotReadyState {
// avoid reinitialization if initialized
return nil
}

ctx, cancel := context.WithCancel(context.Background())
p.cancelFunc = cancel

client := outbound.NewHttp(p.cfg)
// TODO: Implement
// TODO: fetch configuration
//
var polling struct {
Enabled bool
MinInterval time.Duration
}
polling.Enabled = true

flatCtx := evalCtx.Attributes()
if evalCtx.TargetingKey() != "" {
flatCtx[of.TargetingKey] = evalCtx.TargetingKey()
}

evaluator := evaluate.NewBulkEvaluator(client, flatCtx)
err := evaluator.Fetch(ctx)
if err != nil {
// TODO: How to handle this error correctly?
return err
}

if polling.Enabled && p.cfg.PollingInterval > 0 {
p.startPolling(ctx, evaluator, max(polling.MinInterval, p.cfg.PollingInterval))
}

p.evaluator = evaluator
p.state = of.ReadyState
p.events <- of.Event{
ProviderName: providerName, EventType: of.ProviderReady,
ProviderEventDetails: of.ProviderEventDetails{Message: "Provider is ready"},
}
return nil
}

func (p *BulkProvider) Shutdown() {
p.mu.Lock()
defer p.mu.Unlock()

if p.cancelFunc != nil {
p.cancelFunc()
p.cancelFunc = nil
}

p.state = of.NotReadyState
p.evaluator = nil
}

func (p *BulkProvider) EventChannel() <-chan of.Event {
return p.events
}

func (p *BulkProvider) startPolling(ctx context.Context, evaluator *evaluate.BulkEvaluator, pollingInterval time.Duration) {
go func() {
ticker := time.NewTicker(pollingInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
err := evaluator.Fetch(ctx)
if err != nil && err != context.Canceled {
p.events <- of.Event{
ProviderName: providerName, EventType: of.ProviderError,
ProviderEventDetails: of.ProviderEventDetails{Message: err.Error()},
}
// TODO: What should happen to the provider state?
}
}
}
}()
}
113 changes: 113 additions & 0 deletions providers/ofrep/bulk_provider_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package ofrep_test

import (
"context"
"io"
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/open-feature/go-sdk-contrib/providers/ofrep"
of "github.com/open-feature/go-sdk/openfeature"
)

var evalCtx = of.NewEvaluationContext("keyboard", map[string]any{
"color": "red",
})

func TestBulkProviderEvaluationE2EBasic(t *testing.T) {
of.SetEvaluationContext(evalCtx)
baseUrl := setupTestServer(t)
p := ofrep.NewBulkProvider(baseUrl, ofrep.WithBearerToken("api-key"))

err := of.SetProviderAndWait(p)
if err != nil {
t.Errorf("expected ready provider, but got %v", err)
}

client := of.NewClient("app")
ctx := context.Background()

result := client.Boolean(ctx, "flag-bool", false, evalCtx)
if !result {
t.Errorf("expected %v, but got %v", true, result)
}

_, err = client.BooleanValueDetails(ctx, "flag-error", false, evalCtx)

if err == nil {
t.Errorf("expected error, but got nil")
}

if err.Error() != "error code: GENERAL: something wrong" {
t.Errorf("expected error message '%v', but got '%v'", "error code: GENERAL: something wrong", err.Error())
}

of.Shutdown()

if p.Status() != of.NotReadyState {
t.Errorf("expected %v, but got %v", of.NotReadyState, p.Status())
}
}

func TestBulkProviderEvaluationE2EPolling(t *testing.T) {
of.SetEvaluationContext(evalCtx)
baseUrl := setupTestServer(t)
p := ofrep.NewBulkProvider(baseUrl, ofrep.WithBearerToken("api-key"), ofrep.WithPollingInterval(30*time.Millisecond))

err := of.SetProviderAndWait(p)
if err != nil {
t.Errorf("expected ready provider, but got %v", err)
}
if p.Status() != of.ReadyState {
t.Errorf("expected %v, but got %v", of.ReadyState, p.Status())
}

// let the provider poll for flags in background at least once
time.Sleep(60 * time.Millisecond)

of.Shutdown()
if p.Status() != of.NotReadyState {
t.Errorf("expected %v, but got %v", of.NotReadyState, p.Status())
}
}

func setupTestServer(t testing.TB) string {
t.Helper()
mux := http.NewServeMux()
mux.HandleFunc("/ofrep/v1/evaluate/flags", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
t.Errorf("expected post request, got: %v", r.Method)
}

if r.Header.Get("Authorization") != "Bearer api-key" {
t.Errorf("expected Authorization header, got: %v", r.Header.Get("Authorization"))
}

requestData, err := io.ReadAll(r.Body)
if err != nil {
t.Errorf("error reading request data: %v", err)
}

// FIXME: map could have different order
if string(requestData) != `{"context":{"color":"red","targetingKey":"keyboard"}}` {
t.Errorf("expected request data %v, but got %v", ``, string(requestData))
}

w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
data := `{"flags":[
{"key":"flag-bool","reason":"DEFAULT","variant":"true","metadata":{},"value":true},
{"key":"flag-error", "errorCode": "INVALID", "errorDetails": "something wrong" }
]}`
_, err = w.Write([]byte(data))
if err != nil {
t.Errorf("error writing response: %v", err)
}
})

s := httptest.NewServer(mux)
t.Cleanup(s.Close)
return s.URL
}
110 changes: 110 additions & 0 deletions providers/ofrep/internal/evaluate/bulk_evaluator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package evaluate

import (
"context"
"encoding/json"
"fmt"
"net/http"
"sync"

"github.com/open-feature/go-sdk-contrib/providers/ofrep/internal/outbound"
of "github.com/open-feature/go-sdk/openfeature"
)

// Outbound defines the contract for resolver's outbound communication, matching OFREP API.
type BulkOutbound interface {
// Bulk flags resolving
Bulk(ctx context.Context, paylod []byte) (*outbound.Resolution, error)
}

func NewBulkEvaluator(client BulkOutbound, evalCtx of.FlattenedContext) *BulkEvaluator {
b := &BulkEvaluator{
client: client,
evalCtx: evalCtx,
values: map[string]bulkEvaluationValue{},
}
b.resolver = b
return b
}

type BulkEvaluator struct {
Flags
client BulkOutbound
evalCtx of.FlattenedContext
values map[string]bulkEvaluationValue
mu sync.RWMutex
}

func (b *BulkEvaluator) Fetch(ctx context.Context) error {
payload, err := json.Marshal(requestFrom(b.evalCtx))
if err != nil {
return err
}
res, err := b.client.Bulk(ctx, payload)
if err != nil {
return err
}

switch res.Status {
case http.StatusOK: // 200
var data bulkEvaluationSuccess
err := json.Unmarshal(res.Data, &data)
if err != nil {
return err
}
values := make(map[string]bulkEvaluationValue)
for _, value := range data.Flags {
values[value.Key] = value
}
b.setValues(values)
case http.StatusNotModified: // 304
// No changes
case http.StatusBadRequest: // 400
return parseError400(res.Data)
case http.StatusUnauthorized, http.StatusForbidden: // 401, 403
return of.NewGeneralResolutionError("authentication/authorization error")
case http.StatusTooManyRequests: // 429
after := parse429(res)
if after == 0 {
return of.NewGeneralResolutionError("rate limit exceeded")
}
return of.NewGeneralResolutionError(
fmt.Sprintf("rate limit exceeded, try again after %f seconds", after.Seconds()))
case http.StatusInternalServerError: // 500
return parseError500(res.Data)
default:
return parseError500(res.Data)
}

return nil
}

func (b *BulkEvaluator) setValues(values map[string]bulkEvaluationValue) {
b.mu.Lock()
defer b.mu.Unlock()
b.values = values
}

func (b *BulkEvaluator) resolveSingle(ctx context.Context, key string, evalCtx map[string]any) (*successDto, *of.ResolutionError) {
b.mu.RLock()
defer b.mu.RUnlock()
if s, ok := b.values[key]; ok {
if s.ErrorCode != "" {
// FIXME: should not hide original error code
resErr := of.NewGeneralResolutionError(s.ErrorDetails)
return nil, &resErr
}
return &s.successDto, nil
}
resErr := of.NewFlagNotFoundResolutionError(fmt.Sprintf("flag for key '%s' does not exist", key))
return nil, &resErr
}

type bulkEvaluationSuccess struct {
Flags []bulkEvaluationValue
}

type bulkEvaluationValue struct {
successDto
evaluationError
}
Loading

0 comments on commit e7ffc6b

Please sign in to comment.