Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add function_name into hydrate call scope values. Closes #662 #663

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions plugin/get_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/gertd/go-pluralize"
"github.com/turbot/go-kit/helpers"
"github.com/turbot/steampipe-plugin-sdk/v5/rate_limiter"
)

/*
Expand Down Expand Up @@ -75,7 +76,7 @@ type GetConfig struct {
// Deprecated: use IgnoreConfig
ShouldIgnoreError ErrorPredicate
MaxConcurrency int
named namedHydrateFunc
namedHydrate namedHydrateFunc
}

// initialise the GetConfig
Expand All @@ -102,8 +103,10 @@ func (c *GetConfig) initialise(table *Table) {

// create empty tags if needed
if c.Tags == nil {
c.Tags = map[string]string{}
c.Tags = make(map[string]string)
}
// add in function name to tags
c.Tags[rate_limiter.RateLimiterScopeFunction] = c.namedHydrate.Name

// copy the (deprecated) top level ShouldIgnoreError property into the ignore config
if c.IgnoreConfig.ShouldIgnoreError == nil {
Expand All @@ -124,8 +127,8 @@ func (c *GetConfig) initialise(table *Table) {
}
log.Printf("[TRACE] GetConfig.initialise complete: RetryConfig: %s, IgnoreConfig: %s", c.RetryConfig.String(), c.IgnoreConfig.String())

// populate the namedHydrateFunc hydrate func
c.named = newNamedHydrateFunc(c.Hydrate)
// populate the named hydrate func
c.namedHydrate = newNamedHydrateFunc(c.Hydrate)

}

Expand Down Expand Up @@ -170,7 +173,3 @@ func (c *GetConfig) Validate(table *Table) []string {

return validationErrors
}

func (c *GetConfig) namedHydrateFunc() namedHydrateFunc {
return c.named
}
12 changes: 7 additions & 5 deletions plugin/hydrate_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ type HydrateConfig struct {
// Deprecated: use IgnoreConfig
ShouldIgnoreError ErrorPredicate

namedFunc namedHydrateFunc
namedHydrate namedHydrateFunc
}

func (c *HydrateConfig) String() string {
Expand All @@ -127,7 +127,7 @@ RetryConfig: %s
IgnoreConfig: %s
Depends: %s
ScopeValues: %s`,
c.namedFunc.Name,
c.namedHydrate.Name,
c.RetryConfig,
c.IgnoreConfig,
strings.Join(dependsStrings, ","),
Expand All @@ -137,9 +137,9 @@ ScopeValues: %s`,
}

func (c *HydrateConfig) initialise(table *Table) {
c.namedFunc = newNamedHydrateFunc(c.Func)
c.namedHydrate = newNamedHydrateFunc(c.Func)

log.Printf("[TRACE] HydrateConfig.initialise func %s, table %s", c.namedFunc.Name, table.Name)
log.Printf("[TRACE] HydrateConfig.initialise func %s, table %s", c.namedHydrate.Name, table.Name)

// create RetryConfig if needed
if c.RetryConfig == nil {
Expand All @@ -153,8 +153,10 @@ func (c *HydrateConfig) initialise(table *Table) {

// create empty Tags if needed
if c.Tags == nil {
c.Tags = map[string]string{}
c.Tags = make(map[string]string)
}
// add in function name to tags
c.Tags[rate_limiter.RateLimiterScopeFunction] = c.namedHydrate.Name

// copy the (deprecated) top level ShouldIgnoreError property into the ignore config
if c.IgnoreConfig.ShouldIgnoreError == nil {
Expand Down
27 changes: 17 additions & 10 deletions plugin/list_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package plugin
import (
"fmt"
"github.com/gertd/go-pluralize"
"github.com/turbot/steampipe-plugin-sdk/v5/rate_limiter"
"log"
)

Expand Down Expand Up @@ -49,9 +50,9 @@ type ListConfig struct {
ParentTags map[string]string

// Deprecated: Use IgnoreConfig
ShouldIgnoreError ErrorPredicate
namedHydrateFunc *namedHydrateFunc
namedParentHydrateFunc *namedHydrateFunc
ShouldIgnoreError ErrorPredicate
namedHydrate *namedHydrateFunc
namedParentHydrate *namedHydrateFunc
}

func (c *ListConfig) initialise(table *Table) {
Expand All @@ -68,10 +69,11 @@ func (c *ListConfig) initialise(table *Table) {
}

if c.Tags == nil {
c.Tags = map[string]string{}
c.Tags = make(map[string]string)
}

if c.ParentTags == nil {
c.ParentTags = map[string]string{}
c.ParentTags = make(map[string]string)
}

// copy the (deprecated) top level ShouldIgnoreError property into the ignore config
Expand All @@ -83,12 +85,17 @@ func (c *ListConfig) initialise(table *Table) {
c.RetryConfig.DefaultTo(table.DefaultRetryConfig)
c.IgnoreConfig.DefaultTo(table.DefaultIgnoreConfig)

// populate the namedHydrateFunc hydrate func
// populate the named hydrate funcs
n := newNamedHydrateFunc(c.Hydrate)
c.namedHydrateFunc = &n
c.namedHydrate = &n
// add in function name to tags
c.Tags[rate_limiter.RateLimiterScopeFunction] = c.namedHydrate.Name

if c.ParentHydrate != nil {
p := newNamedHydrateFunc(c.ParentHydrate)
c.namedParentHydrateFunc = &p
c.namedParentHydrate = &p
// add in parent function name to tags
c.ParentTags[rate_limiter.RateLimiterScopeFunction] = c.namedParentHydrate.Name
}

log.Printf("[TRACE] ListConfig.initialise complete: RetryConfig: %s, IgnoreConfig %s", c.RetryConfig.String(), c.IgnoreConfig.String())
Expand All @@ -107,9 +114,9 @@ func (c *ListConfig) Validate(table *Table) []string {
}

// ensure that if there is an explicit hydrate config for the list hydrate, it does not declare dependencies
listHydrateName := table.List.namedHydrateFunc.Name
listHydrateName := table.List.namedHydrate.Name
for _, h := range table.HydrateConfig {
if h.namedFunc.Name == listHydrateName {
if h.namedHydrate.Name == listHydrateName {
if len(h.Depends) > 0 {
validationErrors = append(validationErrors, fmt.Sprintf("table '%s' List hydrate function '%s' defines dependencies in its `HydrateConfig`", table.Name, listHydrateName))
}
Expand Down
2 changes: 1 addition & 1 deletion plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,7 @@ func (p *Plugin) buildHydrateConfigMap() {
h := &p.HydrateConfig[i]
// initialise before adding to map
h.initialise(nil)
funcName := h.namedFunc.Name
funcName := h.namedHydrate.Name
p.hydrateConfigMap[funcName] = h
}
}
Expand Down
4 changes: 2 additions & 2 deletions plugin/plugin_rate_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ import (
"strings"
)

func (p *Plugin) getHydrateCallRateLimiter(hydrateCallScopeValues map[string]string, queryData *QueryData) (*rate_limiter.MultiLimiter, error) {
func (p *Plugin) getHydrateCallRateLimiter(hydrateCallTags map[string]string, queryData *QueryData) (*rate_limiter.MultiLimiter, error) {
log.Printf("[INFO] getHydrateCallRateLimiter (%s)", queryData.connectionCallId)

// now build the set of all tag values which applies to this call
rateLimiterScopeValues := queryData.resolveRateLimiterScopeValues(hydrateCallScopeValues)
rateLimiterScopeValues := queryData.resolveRateLimiterScopeValues(hydrateCallTags)

// add scope values _even for an empty rate limiter_ so they appear in the _ctx field
res := &rate_limiter.MultiLimiter{
Expand Down
6 changes: 3 additions & 3 deletions plugin/query_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,8 +537,8 @@ func (d *QueryData) verifyCallerIsListCall(callingFunction string) bool {
if d.Table.List == nil {
return false
}
listFunction := d.Table.List.namedHydrateFunc.Name
listParentFunction := d.Table.List.namedParentHydrateFunc.Name
listFunction := d.Table.List.namedHydrate.Name
listParentFunction := d.Table.List.namedParentHydrate.Name
if callingFunction != listFunction && callingFunction != listParentFunction {
// if the calling function is NOT one of the other registered hydrate functions,
//it must be an anonymous function so let it go
Expand Down Expand Up @@ -651,7 +651,7 @@ func (d *QueryData) streamLeafListItem(ctx context.Context, items ...interface{}
// set the parent item on the row data
rd.parentItem = d.parentItem
// NOTE: add the item as the hydrate data for the list call
rd.set(d.Table.List.namedHydrateFunc.Name, item)
rd.set(d.Table.List.namedHydrate.Name, item)

d.rowDataChan <- rd
}
Expand Down
10 changes: 5 additions & 5 deletions plugin/query_data_rate_limiters.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (d *QueryData) resolveGetRateLimiters() error {
// NOTE: RateLimit cannot be nil as it is initialized to an empty struct if needed
getLimiter, err := d.plugin.getHydrateCallRateLimiter(d.Table.Get.Tags, d)
if err != nil {
log.Printf("[WARN] get call %s getHydrateCallRateLimiter failed: %s (%s)", d.Table.Get.named.Name, err.Error(), d.connectionCallId)
log.Printf("[WARN] get call %s getHydrateCallRateLimiter failed: %s (%s)", d.Table.Get.namedHydrate.Name, err.Error(), d.connectionCallId)
return err
}

Expand All @@ -111,7 +111,7 @@ func (d *QueryData) resolveParentChildRateLimiters() error {
// resolve the parent hydrate rate limiter
parentRateLimiter, err := d.plugin.getHydrateCallRateLimiter(d.Table.List.ParentTags, d)
if err != nil {
log.Printf("[WARN] resolveParentChildRateLimiters: %s: getHydrateCallRateLimiter failed: %s (%s)", d.Table.List.namedParentHydrateFunc.Name, err.Error(), d.connectionCallId)
log.Printf("[WARN] resolveParentChildRateLimiters: %s: getHydrateCallRateLimiter failed: %s (%s)", d.Table.List.namedParentHydrate.Name, err.Error(), d.connectionCallId)
return err
}
// assign the parent rate limiter to d.fetchLimiters
Expand All @@ -120,7 +120,7 @@ func (d *QueryData) resolveParentChildRateLimiters() error {
// resolve the child hydrate rate limiter
childRateLimiter, err := d.plugin.getHydrateCallRateLimiter(d.Table.List.Tags, d)
if err != nil {
log.Printf("[WARN] resolveParentChildRateLimiters: %s: getHydrateCallRateLimiter failed: %s (%s)", d.Table.List.namedHydrateFunc.Name, err.Error(), d.connectionCallId)
log.Printf("[WARN] resolveParentChildRateLimiters: %s: getHydrateCallRateLimiter failed: %s (%s)", d.Table.List.namedHydrate.Name, err.Error(), d.connectionCallId)
return err
}
d.fetchLimiters.childListRateLimiter = childRateLimiter
Expand All @@ -132,7 +132,7 @@ func (d *QueryData) resolveListRateLimiters() error {
// NOTE: RateLimit cannot be nil as it is initialized to an empty struct if needed
listLimiter, err := d.plugin.getHydrateCallRateLimiter(d.Table.List.Tags, d)
if err != nil {
log.Printf("[WARN] get call %s getHydrateCallRateLimiter failed: %s (%s)", d.Table.Get.named.Name, err.Error(), d.connectionCallId)
log.Printf("[WARN] get call %s getHydrateCallRateLimiter failed: %s (%s)", d.Table.Get.namedHydrate.Name, err.Error(), d.connectionCallId)
return err
}
d.fetchLimiters.rateLimiter = listLimiter
Expand Down Expand Up @@ -164,7 +164,7 @@ func (d *QueryData) setListLimiterMetadata(fetchDelay time.Duration) {
func (d *QueryData) setGetLimiterMetadata(fetchDelay time.Duration) {
d.fetchMetadata = &hydrateMetadata{
Type: string(fetchTypeGet),
FuncName: d.Table.Get.named.Name,
FuncName: d.Table.Get.namedHydrate.Name,
RateLimiters: d.fetchLimiters.rateLimiter.LimiterNames(),
ScopeValues: d.fetchLimiters.rateLimiter.ScopeValues,
DelayMs: fetchDelay.Milliseconds(),
Expand Down
10 changes: 5 additions & 5 deletions plugin/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (t *Table) buildHydrateConfigMap() {

// initialise before adding to map
h.initialise(t)
t.hydrateConfigMap[h.namedFunc.Name] = h
t.hydrateConfigMap[h.namedHydrate.Name] = h
}
// add in hydrate config for all hydrate dependencies declared using legacy property HydrateDependencies
for _, d := range t.HydrateDependencies {
Expand All @@ -171,7 +171,7 @@ func (t *Table) buildHydrateConfigMap() {
}
// NOTE: the get config may be used as a column hydrate function so add this into the map
if get := t.Get; get != nil {
hydrateName := get.named.Name
hydrateName := get.namedHydrate.Name
t.hydrateConfigMap[hydrateName] = &HydrateConfig{
Func: get.Hydrate,
IgnoreConfig: get.IgnoreConfig,
Expand All @@ -187,7 +187,7 @@ func (t *Table) buildHydrateConfigMap() {
if c.Hydrate == nil {
continue
}
// to get name, create a namedFunc - this will take care of mapping memoized function named
// to get name, create a namedHydrate - this will take care of mapping memoized function namedHydrate
hydrateName := newNamedHydrateFunc(c.Hydrate).Name

if _, ok := t.hydrateConfigMap[hydrateName]; !ok {
Expand All @@ -200,7 +200,7 @@ func (t *Table) buildHydrateConfigMap() {

func (t *Table) getFetchFunc(fetchType fetchType) namedHydrateFunc {
if fetchType == fetchTypeList {
return *t.List.namedHydrateFunc
return *t.List.namedHydrate
}
return t.Get.named
return t.Get.namedHydrate
}
16 changes: 8 additions & 8 deletions plugin/table_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (t *Table) executeGetCall(ctx context.Context, queryData *QueryData) (err e
// we can now close the item chan
queryData.fetchComplete(ctx)
if r := recover(); r != nil {
err = status.Error(codes.Internal, fmt.Sprintf("get call %s failed with panic %v", t.Get.named.Name, r))
err = status.Error(codes.Internal, fmt.Sprintf("get call %s failed with panic %v", t.Get.namedHydrate.Name, r))
}
}()

Expand Down Expand Up @@ -179,7 +179,7 @@ func (t *Table) doGetForQualValues(ctx context.Context, queryData *QueryData, ke
// execute a get call for a single key column qual value
// if a matrix is defined, call for every matrix item
func (t *Table) doGet(ctx context.Context, queryData *QueryData) (err error) {
hydrateKey := t.Get.named.Name
hydrateKey := t.Get.namedHydrate.Name
defer func() {
if p := recover(); p != nil {
err = status.Error(codes.Internal, fmt.Sprintf("table '%s': Get hydrate call %s failed with panic %v", t.Name, hydrateKey, p))
Expand Down Expand Up @@ -228,7 +228,7 @@ func (t *Table) get(ctx context.Context, queryData *QueryData) (*rowData, error)
rd := newRowData(queryData, nil)
// just invoke callHydrateWithRetries()
var getItem any
getItem, err := rd.callHydrateWithRetries(ctx, queryData, t.Get.named, t.Get.IgnoreConfig, t.Get.RetryConfig)
getItem, err := rd.callHydrateWithRetries(ctx, queryData, t.Get.namedHydrate, t.Get.IgnoreConfig, t.Get.RetryConfig)
rd.item = getItem
return rd, err
}
Expand Down Expand Up @@ -280,7 +280,7 @@ func (t *Table) getForEachMatrixItem(ctx context.Context, queryData *QueryData)
matrixRd := newRowData(matrixQueryData, nil)

// now call hydrate from the matrix rowdata
item, err := matrixRd.callHydrateWithRetries(fetchContext, matrixQueryData, t.Get.namedHydrateFunc(), t.Get.IgnoreConfig, t.Get.RetryConfig)
item, err := matrixRd.callHydrateWithRetries(fetchContext, matrixQueryData, t.Get.namedHydrate, t.Get.IgnoreConfig, t.Get.RetryConfig)

if err != nil {
log.Printf("[WARN] callHydrateWithRetries returned error %v", err)
Expand Down Expand Up @@ -358,7 +358,7 @@ func (t *Table) executeListCall(ctx context.Context, queryData *QueryData) {
defer log.Printf("[TRACE] executeListCall COMPLETE (%s)", queryData.connectionCallId)
defer func() {
if r := recover(); r != nil {
queryData.streamError(status.Error(codes.Internal, fmt.Sprintf("list call %s failed with panic %v", t.List.namedHydrateFunc.Name, r)))
queryData.streamError(status.Error(codes.Internal, fmt.Sprintf("list call %s failed with panic %v", t.List.namedHydrate.Name, r)))
}
// list call will return when it has streamed all items so close rowDataChan
queryData.fetchComplete(ctx)
Expand All @@ -374,12 +374,12 @@ func (t *Table) executeListCall(ctx context.Context, queryData *QueryData) {

// invoke list call - hydrateResults is nil as list call does not use it (it must comply with HydrateFunc signature)
var childHydrate *namedHydrateFunc = nil
listCall := t.List.namedHydrateFunc
listCall := t.List.namedHydrate
// if there is a parent hydrate function, call that
// - the child 'Hydrate' function will be called by QueryData.StreamListItem,
if t.List.ParentHydrate != nil {
listCall = t.List.namedParentHydrateFunc
childHydrate = t.List.namedHydrateFunc
listCall = t.List.namedParentHydrate
childHydrate = t.List.namedHydrate
}

// store the list call and child hydrate call - these will be used later when we call setListLimiterMetadata
Expand Down
2 changes: 1 addition & 1 deletion rate_limiter/config_values.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
)

const (
// todo should these be more unique to avoid clash
RateLimiterScopeFunction = "function_name"
RateLimiterScopeConnection = "connection"
RateLimiterScopeTable = "table"

Expand Down
Loading