Skip to content

Commit

Permalink
Add function_name into hydrate call scope values. Closes #662
Browse files Browse the repository at this point in the history
  • Loading branch information
kaidaguerre committed Sep 22, 2023
1 parent a3d0194 commit 8cd8fa0
Show file tree
Hide file tree
Showing 10 changed files with 50 additions and 44 deletions.
13 changes: 6 additions & 7 deletions plugin/get_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/gertd/go-pluralize"
"github.com/turbot/go-kit/helpers"
"github.com/turbot/steampipe-plugin-sdk/v5/rate_limiter"
)

/*
Expand Down Expand Up @@ -75,7 +76,7 @@ type GetConfig struct {
// Deprecated: use IgnoreConfig
ShouldIgnoreError ErrorPredicate
MaxConcurrency int
named namedHydrateFunc
namedHydrate namedHydrateFunc
}

// initialise the GetConfig
Expand Down Expand Up @@ -104,6 +105,8 @@ func (c *GetConfig) initialise(table *Table) {
if c.Tags == nil {
c.Tags = map[string]string{}
}
// add in function name to tags
c.Tags[rate_limiter.RateLimiterScopeFunction] = c.namedHydrate.Name

// copy the (deprecated) top level ShouldIgnoreError property into the ignore config
if c.IgnoreConfig.ShouldIgnoreError == nil {
Expand All @@ -124,8 +127,8 @@ func (c *GetConfig) initialise(table *Table) {
}
log.Printf("[TRACE] GetConfig.initialise complete: RetryConfig: %s, IgnoreConfig: %s", c.RetryConfig.String(), c.IgnoreConfig.String())

// populate the namedHydrateFunc hydrate func
c.named = newNamedHydrateFunc(c.Hydrate)
// populate the named hydrate func
c.namedHydrate = newNamedHydrateFunc(c.Hydrate)

}

Expand Down Expand Up @@ -170,7 +173,3 @@ func (c *GetConfig) Validate(table *Table) []string {

return validationErrors
}

func (c *GetConfig) namedHydrateFunc() namedHydrateFunc {
return c.named
}
10 changes: 6 additions & 4 deletions plugin/hydrate_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ type HydrateConfig struct {
// Deprecated: use IgnoreConfig
ShouldIgnoreError ErrorPredicate

namedFunc namedHydrateFunc
namedHydrate namedHydrateFunc
}

func (c *HydrateConfig) String() string {
Expand All @@ -127,7 +127,7 @@ RetryConfig: %s
IgnoreConfig: %s
Depends: %s
ScopeValues: %s`,
c.namedFunc.Name,
c.namedHydrate.Name,
c.RetryConfig,
c.IgnoreConfig,
strings.Join(dependsStrings, ","),
Expand All @@ -137,9 +137,9 @@ ScopeValues: %s`,
}

func (c *HydrateConfig) initialise(table *Table) {
c.namedFunc = newNamedHydrateFunc(c.Func)
c.namedHydrate = newNamedHydrateFunc(c.Func)

log.Printf("[TRACE] HydrateConfig.initialise func %s, table %s", c.namedFunc.Name, table.Name)
log.Printf("[TRACE] HydrateConfig.initialise func %s, table %s", c.namedHydrate.Name, table.Name)

// create RetryConfig if needed
if c.RetryConfig == nil {
Expand All @@ -155,6 +155,8 @@ func (c *HydrateConfig) initialise(table *Table) {
if c.Tags == nil {
c.Tags = map[string]string{}
}
// add in function name to tags
c.Tags[rate_limiter.RateLimiterScopeFunction] = c.namedHydrate.Name

// copy the (deprecated) top level ShouldIgnoreError property into the ignore config
if c.IgnoreConfig.ShouldIgnoreError == nil {
Expand Down
21 changes: 13 additions & 8 deletions plugin/list_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package plugin
import (
"fmt"
"github.com/gertd/go-pluralize"
"github.com/turbot/steampipe-plugin-sdk/v5/rate_limiter"
"log"
)

Expand Down Expand Up @@ -49,9 +50,9 @@ type ListConfig struct {
ParentTags map[string]string

// Deprecated: Use IgnoreConfig
ShouldIgnoreError ErrorPredicate
namedHydrateFunc *namedHydrateFunc
namedParentHydrateFunc *namedHydrateFunc
ShouldIgnoreError ErrorPredicate
namedHydrate *namedHydrateFunc
namedParentHydrate *namedHydrateFunc
}

func (c *ListConfig) initialise(table *Table) {
Expand All @@ -70,9 +71,13 @@ func (c *ListConfig) initialise(table *Table) {
if c.Tags == nil {
c.Tags = map[string]string{}
}

if c.ParentTags == nil {
c.ParentTags = map[string]string{}
}
// add in function name to tags
c.Tags[rate_limiter.RateLimiterScopeFunction] = c.namedHydrate.Name
c.ParentTags[rate_limiter.RateLimiterScopeFunction] = c.namedParentHydrate.Name

// copy the (deprecated) top level ShouldIgnoreError property into the ignore config
if c.IgnoreConfig.ShouldIgnoreError == nil {
Expand All @@ -83,12 +88,12 @@ func (c *ListConfig) initialise(table *Table) {
c.RetryConfig.DefaultTo(table.DefaultRetryConfig)
c.IgnoreConfig.DefaultTo(table.DefaultIgnoreConfig)

// populate the namedHydrateFunc hydrate func
// populate the named hydrate funcs
n := newNamedHydrateFunc(c.Hydrate)
c.namedHydrateFunc = &n
c.namedHydrate = &n
if c.ParentHydrate != nil {
p := newNamedHydrateFunc(c.ParentHydrate)
c.namedParentHydrateFunc = &p
c.namedParentHydrate = &p
}

log.Printf("[TRACE] ListConfig.initialise complete: RetryConfig: %s, IgnoreConfig %s", c.RetryConfig.String(), c.IgnoreConfig.String())
Expand All @@ -107,9 +112,9 @@ func (c *ListConfig) Validate(table *Table) []string {
}

// ensure that if there is an explicit hydrate config for the list hydrate, it does not declare dependencies
listHydrateName := table.List.namedHydrateFunc.Name
listHydrateName := table.List.namedHydrate.Name
for _, h := range table.HydrateConfig {
if h.namedFunc.Name == listHydrateName {
if h.namedHydrate.Name == listHydrateName {
if len(h.Depends) > 0 {
validationErrors = append(validationErrors, fmt.Sprintf("table '%s' List hydrate function '%s' defines dependencies in its `HydrateConfig`", table.Name, listHydrateName))
}
Expand Down
2 changes: 1 addition & 1 deletion plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,7 @@ func (p *Plugin) buildHydrateConfigMap() {
h := &p.HydrateConfig[i]
// initialise before adding to map
h.initialise(nil)
funcName := h.namedFunc.Name
funcName := h.namedHydrate.Name
p.hydrateConfigMap[funcName] = h
}
}
Expand Down
4 changes: 2 additions & 2 deletions plugin/plugin_rate_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ import (
"strings"
)

func (p *Plugin) getHydrateCallRateLimiter(hydrateCallScopeValues map[string]string, queryData *QueryData) (*rate_limiter.MultiLimiter, error) {
func (p *Plugin) getHydrateCallRateLimiter(hydrateCallTags map[string]string, queryData *QueryData) (*rate_limiter.MultiLimiter, error) {
log.Printf("[INFO] getHydrateCallRateLimiter (%s)", queryData.connectionCallId)

// now build the set of all tag values which applies to this call
rateLimiterScopeValues := queryData.resolveRateLimiterScopeValues(hydrateCallScopeValues)
rateLimiterScopeValues := queryData.resolveRateLimiterScopeValues(hydrateCallTags)

// add scope values _even for an empty rate limiter_ so they appear in the _ctx field
res := &rate_limiter.MultiLimiter{
Expand Down
6 changes: 3 additions & 3 deletions plugin/query_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,8 +537,8 @@ func (d *QueryData) verifyCallerIsListCall(callingFunction string) bool {
if d.Table.List == nil {
return false
}
listFunction := d.Table.List.namedHydrateFunc.Name
listParentFunction := d.Table.List.namedParentHydrateFunc.Name
listFunction := d.Table.List.namedHydrate.Name
listParentFunction := d.Table.List.namedParentHydrate.Name
if callingFunction != listFunction && callingFunction != listParentFunction {
// if the calling function is NOT one of the other registered hydrate functions,
//it must be an anonymous function so let it go
Expand Down Expand Up @@ -651,7 +651,7 @@ func (d *QueryData) streamLeafListItem(ctx context.Context, items ...interface{}
// set the parent item on the row data
rd.parentItem = d.parentItem
// NOTE: add the item as the hydrate data for the list call
rd.set(d.Table.List.namedHydrateFunc.Name, item)
rd.set(d.Table.List.namedHydrate.Name, item)

Check failure on line 654 in plugin/query_data.go

View workflow job for this annotation

GitHub Actions / Build

Error return value of `rd.set` is not checked (errcheck)

d.rowDataChan <- rd
}
Expand Down
10 changes: 5 additions & 5 deletions plugin/query_data_rate_limiters.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (d *QueryData) resolveGetRateLimiters() error {
// NOTE: RateLimit cannot be nil as it is initialized to an empty struct if needed
getLimiter, err := d.plugin.getHydrateCallRateLimiter(d.Table.Get.Tags, d)
if err != nil {
log.Printf("[WARN] get call %s getHydrateCallRateLimiter failed: %s (%s)", d.Table.Get.named.Name, err.Error(), d.connectionCallId)
log.Printf("[WARN] get call %s getHydrateCallRateLimiter failed: %s (%s)", d.Table.Get.namedHydrate.Name, err.Error(), d.connectionCallId)
return err
}

Expand All @@ -111,7 +111,7 @@ func (d *QueryData) resolveParentChildRateLimiters() error {
// resolve the parent hydrate rate limiter
parentRateLimiter, err := d.plugin.getHydrateCallRateLimiter(d.Table.List.ParentTags, d)
if err != nil {
log.Printf("[WARN] resolveParentChildRateLimiters: %s: getHydrateCallRateLimiter failed: %s (%s)", d.Table.List.namedParentHydrateFunc.Name, err.Error(), d.connectionCallId)
log.Printf("[WARN] resolveParentChildRateLimiters: %s: getHydrateCallRateLimiter failed: %s (%s)", d.Table.List.namedParentHydrate.Name, err.Error(), d.connectionCallId)
return err
}
// assign the parent rate limiter to d.fetchLimiters
Expand All @@ -120,7 +120,7 @@ func (d *QueryData) resolveParentChildRateLimiters() error {
// resolve the child hydrate rate limiter
childRateLimiter, err := d.plugin.getHydrateCallRateLimiter(d.Table.List.Tags, d)
if err != nil {
log.Printf("[WARN] resolveParentChildRateLimiters: %s: getHydrateCallRateLimiter failed: %s (%s)", d.Table.List.namedHydrateFunc.Name, err.Error(), d.connectionCallId)
log.Printf("[WARN] resolveParentChildRateLimiters: %s: getHydrateCallRateLimiter failed: %s (%s)", d.Table.List.namedHydrate.Name, err.Error(), d.connectionCallId)
return err
}
d.fetchLimiters.childListRateLimiter = childRateLimiter
Expand All @@ -132,7 +132,7 @@ func (d *QueryData) resolveListRateLimiters() error {
// NOTE: RateLimit cannot be nil as it is initialized to an empty struct if needed
listLimiter, err := d.plugin.getHydrateCallRateLimiter(d.Table.List.Tags, d)
if err != nil {
log.Printf("[WARN] get call %s getHydrateCallRateLimiter failed: %s (%s)", d.Table.Get.named.Name, err.Error(), d.connectionCallId)
log.Printf("[WARN] get call %s getHydrateCallRateLimiter failed: %s (%s)", d.Table.Get.namedHydrate.Name, err.Error(), d.connectionCallId)
return err
}
d.fetchLimiters.rateLimiter = listLimiter
Expand Down Expand Up @@ -164,7 +164,7 @@ func (d *QueryData) setListLimiterMetadata(fetchDelay time.Duration) {
func (d *QueryData) setGetLimiterMetadata(fetchDelay time.Duration) {
d.fetchMetadata = &hydrateMetadata{
Type: string(fetchTypeGet),
FuncName: d.Table.Get.named.Name,
FuncName: d.Table.Get.namedHydrate.Name,
RateLimiters: d.fetchLimiters.rateLimiter.LimiterNames(),
ScopeValues: d.fetchLimiters.rateLimiter.ScopeValues,
DelayMs: fetchDelay.Milliseconds(),
Expand Down
10 changes: 5 additions & 5 deletions plugin/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (t *Table) buildHydrateConfigMap() {

// initialise before adding to map
h.initialise(t)
t.hydrateConfigMap[h.namedFunc.Name] = h
t.hydrateConfigMap[h.namedHydrate.Name] = h
}
// add in hydrate config for all hydrate dependencies declared using legacy property HydrateDependencies
for _, d := range t.HydrateDependencies {
Expand All @@ -171,7 +171,7 @@ func (t *Table) buildHydrateConfigMap() {
}
// NOTE: the get config may be used as a column hydrate function so add this into the map
if get := t.Get; get != nil {
hydrateName := get.named.Name
hydrateName := get.namedHydrate.Name
t.hydrateConfigMap[hydrateName] = &HydrateConfig{
Func: get.Hydrate,
IgnoreConfig: get.IgnoreConfig,
Expand All @@ -187,7 +187,7 @@ func (t *Table) buildHydrateConfigMap() {
if c.Hydrate == nil {
continue
}
// to get name, create a namedFunc - this will take care of mapping memoized function named
// to get name, create a namedHydrate - this will take care of mapping memoized function namedHydrate
hydrateName := newNamedHydrateFunc(c.Hydrate).Name

if _, ok := t.hydrateConfigMap[hydrateName]; !ok {
Expand All @@ -200,7 +200,7 @@ func (t *Table) buildHydrateConfigMap() {

func (t *Table) getFetchFunc(fetchType fetchType) namedHydrateFunc {
if fetchType == fetchTypeList {
return *t.List.namedHydrateFunc
return *t.List.namedHydrate
}
return t.Get.named
return t.Get.namedHydrate
}
16 changes: 8 additions & 8 deletions plugin/table_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (t *Table) executeGetCall(ctx context.Context, queryData *QueryData) (err e
// we can now close the item chan
queryData.fetchComplete(ctx)
if r := recover(); r != nil {
err = status.Error(codes.Internal, fmt.Sprintf("get call %s failed with panic %v", t.Get.named.Name, r))
err = status.Error(codes.Internal, fmt.Sprintf("get call %s failed with panic %v", t.Get.namedHydrate.Name, r))
}
}()

Expand Down Expand Up @@ -179,7 +179,7 @@ func (t *Table) doGetForQualValues(ctx context.Context, queryData *QueryData, ke
// execute a get call for a single key column qual value
// if a matrix is defined, call for every matrix item
func (t *Table) doGet(ctx context.Context, queryData *QueryData) (err error) {
hydrateKey := t.Get.named.Name
hydrateKey := t.Get.namedHydrate.Name
defer func() {
if p := recover(); p != nil {
err = status.Error(codes.Internal, fmt.Sprintf("table '%s': Get hydrate call %s failed with panic %v", t.Name, hydrateKey, p))
Expand Down Expand Up @@ -228,7 +228,7 @@ func (t *Table) get(ctx context.Context, queryData *QueryData) (*rowData, error)
rd := newRowData(queryData, nil)
// just invoke callHydrateWithRetries()
var getItem any
getItem, err := rd.callHydrateWithRetries(ctx, queryData, t.Get.named, t.Get.IgnoreConfig, t.Get.RetryConfig)
getItem, err := rd.callHydrateWithRetries(ctx, queryData, t.Get.namedHydrate, t.Get.IgnoreConfig, t.Get.RetryConfig)
rd.item = getItem
return rd, err
}
Expand Down Expand Up @@ -280,7 +280,7 @@ func (t *Table) getForEachMatrixItem(ctx context.Context, queryData *QueryData)
matrixRd := newRowData(matrixQueryData, nil)

// now call hydrate from the matrix rowdata
item, err := matrixRd.callHydrateWithRetries(fetchContext, matrixQueryData, t.Get.namedHydrateFunc(), t.Get.IgnoreConfig, t.Get.RetryConfig)
item, err := matrixRd.callHydrateWithRetries(fetchContext, matrixQueryData, t.Get.namedHydrate, t.Get.IgnoreConfig, t.Get.RetryConfig)

if err != nil {
log.Printf("[WARN] callHydrateWithRetries returned error %v", err)
Expand Down Expand Up @@ -358,7 +358,7 @@ func (t *Table) executeListCall(ctx context.Context, queryData *QueryData) {
defer log.Printf("[TRACE] executeListCall COMPLETE (%s)", queryData.connectionCallId)
defer func() {
if r := recover(); r != nil {
queryData.streamError(status.Error(codes.Internal, fmt.Sprintf("list call %s failed with panic %v", t.List.namedHydrateFunc.Name, r)))
queryData.streamError(status.Error(codes.Internal, fmt.Sprintf("list call %s failed with panic %v", t.List.namedHydrate.Name, r)))
}
// list call will return when it has streamed all items so close rowDataChan
queryData.fetchComplete(ctx)
Expand All @@ -374,12 +374,12 @@ func (t *Table) executeListCall(ctx context.Context, queryData *QueryData) {

// invoke list call - hydrateResults is nil as list call does not use it (it must comply with HydrateFunc signature)
var childHydrate *namedHydrateFunc = nil
listCall := t.List.namedHydrateFunc
listCall := t.List.namedHydrate
// if there is a parent hydrate function, call that
// - the child 'Hydrate' function will be called by QueryData.StreamListItem,
if t.List.ParentHydrate != nil {
listCall = t.List.namedParentHydrateFunc
childHydrate = t.List.namedHydrateFunc
listCall = t.List.namedParentHydrate
childHydrate = t.List.namedHydrate
}

// store the list call and child hydrate call - these will be used later when we call setListLimiterMetadata
Expand Down
2 changes: 1 addition & 1 deletion rate_limiter/config_values.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
)

const (
// todo should these be more unique to avoid clash
RateLimiterScopeFunction = "function_name"
RateLimiterScopeConnection = "connection"
RateLimiterScopeTable = "table"

Expand Down

0 comments on commit 8cd8fa0

Please sign in to comment.