Skip to content

Commit

Permalink
Add semaphore to MultiLimiter
Browse files Browse the repository at this point in the history
 Add MaxConcurrency to rate limiter def
 Remove concurrency manager
  • Loading branch information
kaidaguerre committed Aug 7, 2023
1 parent 4bcd5c6 commit 2d56ee9
Show file tree
Hide file tree
Showing 10 changed files with 168 additions and 290 deletions.
157 changes: 84 additions & 73 deletions grpc/proto/plugin.pb.go

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions grpc/proto/plugin.proto
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,9 @@ message RateLimiterDefinition {
string name = 1;
float fill_rate = 2;
int64 bucket_size = 3;
repeated string scope = 4;
string where = 5;
int64 max_concurrency = 4;
repeated string scope = 5;
string where = 6;
}

message SetRateLimitersResponse {
Expand Down
169 changes: 1 addition & 168 deletions plugin/concurrency.go
Original file line number Diff line number Diff line change
@@ -1,176 +1,9 @@
package plugin

import (
"log"
"sync"
)

/*
DefaultConcurrencyConfig sets the default maximum number of concurrent [HydrateFunc] calls.
Limit total concurrent hydrate calls:
DefaultConcurrency: &plugin.DefaultConcurrencyConfig{
TotalMaxConcurrency: 500,
}
Limit concurrent hydrate calls to any single HydrateFunc which does not have a [HydrateConfig]:
DefaultConcurrency: &plugin.DefaultConcurrencyConfig{
DefaultMaxConcurrency: 100,
}
Do both:
DefaultConcurrency: &plugin.DefaultConcurrencyConfig{
TotalMaxConcurrency: 500,
DefaultMaxConcurrency: 200,
}
Plugin examples:
- [hackernews]
[hackernews]: https://github.com/turbot/steampipe-plugin-hackernews/blob/bbfbb12751ad43a2ca0ab70901cde6a88e92cf44/hackernews/plugin.go#L18-L21
*/
// Deprecated
type DefaultConcurrencyConfig struct {
// sets how many HydrateFunc calls can run concurrently in total
TotalMaxConcurrency int
// sets the default for how many calls to each HydrateFunc can run concurrently
DefaultMaxConcurrency int
}

// concurrencyManager struct ensures that hydrate functions stay within concurrency limits
type concurrencyManager struct {
mut sync.RWMutex
// the maximum number of all hydrate calls which can run concurrently
maxConcurrency int
// the maximum concurrency for a single hydrate call
// (this may be overridden by the HydrateConfig for the call)
defaultMaxConcurrencyPerCall int
// total number of hydrate calls in progress
callsInProgress int
// map of the number of instances of each call in progress
callMap map[string]int
// instrumentaton properties
maxCallsInProgress int
maxCallMap map[string]int
}

func newConcurrencyManager(t *Table) *concurrencyManager {
// if plugin does not define max concurrency, use default
var totalMax int
// if hydrate calls do not define max concurrency, use default
var maxPerCall int
if config := t.Plugin.DefaultConcurrency; config != nil {
if config.TotalMaxConcurrency != 0 {
totalMax = config.TotalMaxConcurrency
}
if config.DefaultMaxConcurrency != 0 {
maxPerCall = config.DefaultMaxConcurrency
} else if totalMax < maxPerCall {
// if the default call concurrency is greater than the total max concurrency, clamp to total
maxPerCall = totalMax
}
}
return &concurrencyManager{
maxConcurrency: totalMax,
defaultMaxConcurrencyPerCall: maxPerCall,
callMap: make(map[string]int),
maxCallMap: make(map[string]int),
}
}

// StartIfAllowed checks whether the named hydrate call is permitted to start
// based on the number of running instances of that call, and the total calls in progress
func (c *concurrencyManager) StartIfAllowed(name string, maxCallConcurrency int) (res bool) {
// acquire a Read lock
c.mut.RLock()
// how many concurrent executions of this function are in progress right now?
currentExecutions := c.callMap[name]
// ensure we unlock
c.mut.RUnlock()

if !c.canStart(currentExecutions, maxCallConcurrency) {
return false
}

// upgrade the mutex to a Write lock
c.mut.Lock()
// ensure we unlock
defer c.mut.Unlock()

// check again in case another thread grabbed the Write lock before us
currentExecutions = c.callMap[name]
if !c.canStart(currentExecutions, maxCallConcurrency) {
return false
}

// to get here we are allowed to execute - increment the call counters
c.callMap[name] = currentExecutions + 1
c.callsInProgress++

// update instrumentation
if c.callMap[name] > c.maxCallMap[name] {
c.maxCallMap[name] = c.callMap[name]
}
if c.callsInProgress > c.maxCallsInProgress {
c.maxCallsInProgress = c.callsInProgress
}

return true
}

func (c *concurrencyManager) canStart(currentExecutions int, maxCallConcurrency int) bool {
// is the total call limit exceeded?
if c.maxConcurrency > 0 && c.callsInProgress == c.maxConcurrency {
return false
}

// if there is no config or empty config, the maxCallConcurrency will be 0
// - use defaultMaxConcurrencyPerCall set on the concurrencyManager
if maxCallConcurrency == 0 {
maxCallConcurrency = c.defaultMaxConcurrencyPerCall
}

// if we at the call limit return
if maxCallConcurrency > 0 && currentExecutions == maxCallConcurrency {
return false
}
return true
}

// Finished decrements the counter for the named function
func (c *concurrencyManager) Finished(name string) {
defer func() {
if r := recover(); r != nil {
log.Printf("[WARN] concurrencyManager Finished caught a panic %v", r)
}
}()
// acquire a Write lock
c.mut.Lock()
c.callMap[name]--
c.callsInProgress--
c.mut.Unlock()
}

// Close executes when the query is complete and dumps out the concurrency stats
func (c *concurrencyManager) Close() {
c.DisplayConcurrencyStats()
}

// DisplayConcurrencyStats displays the summary of all the concurrent hydrate calls
func (c *concurrencyManager) DisplayConcurrencyStats() {
if len(c.maxCallMap) == 0 {
return
}
log.Printf("[TRACE] ------------------------------------")
log.Printf("[TRACE] Concurrency Summary")
log.Printf("[TRACE] ------------------------------------")
for call, concurrency := range c.maxCallMap {
log.Printf("[TRACE] %-30s: %d", call, concurrency)
}
log.Printf("[TRACE] ------------------------------------")
log.Printf("[TRACE] %-30s: %d", "Total", c.maxCallsInProgress)

log.Printf("[TRACE] ------------------------------------")
}
23 changes: 13 additions & 10 deletions plugin/hydrate_call.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,23 +61,21 @@ func (h *hydrateCall) initialiseRateLimiter() error {
// CanStart returns whether this hydrate call can execute
// - check whether all dependency hydrate functions have been completed
// - check whether the concurrency limits would be exceeded
func (h *hydrateCall) canStart(rowData *rowData, name string, concurrencyManager *concurrencyManager) bool {
func (h *hydrateCall) canStart(rowData *rowData) bool {
// check whether all hydrate functions we depend on have saved their results
for _, dep := range h.Depends {
if !helpers.StringSliceContains(rowData.getHydrateKeys(), dep) {
return false
}
}
// ask the concurrency manager whether the call can start
// NOTE: if the call is allowed to start, the concurrency manager ASSUMES THE CALL WILL START
// and increments the counters
// it may seem more logical to do this in the Start() function below, but we need to check and increment the counters
// within the same mutex lock to ensure another call does not start between checking and starting
return concurrencyManager.StartIfAllowed(name, h.Config.MaxConcurrency)
if h.rateLimiter == nil {
return true
}
return h.rateLimiter.TryToAcquireSemaphore()
}

// Start starts a hydrate call
func (h *hydrateCall) start(ctx context.Context, r *rowData, d *QueryData, concurrencyManager *concurrencyManager) time.Duration {
func (h *hydrateCall) start(ctx context.Context, r *rowData, d *QueryData) time.Duration {
rateLimitDelay := h.rateLimit(ctx, d)

// tell the rowdata to wait for this call to complete
Expand All @@ -88,8 +86,7 @@ func (h *hydrateCall) start(ctx context.Context, r *rowData, d *QueryData, concu
// call callHydrate async, ignoring return values
go func() {
r.callHydrate(ctx, d, h.Func, h.Name, h.Config)
// decrement number of hydrate functions running
concurrencyManager.Finished(h.Name)
h.onFinished()
}()
return rateLimitDelay
}
Expand All @@ -105,3 +102,9 @@ func (h *hydrateCall) rateLimit(ctx context.Context, d *QueryData) time.Duration

return delay
}

func (h *hydrateCall) onFinished() {
if h.rateLimiter != nil {
h.rateLimiter.ReleaseSemaphore()
}
}
8 changes: 5 additions & 3 deletions plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,10 @@ type Plugin struct {
Logger hclog.Logger
// TableMap is a map of all the tables in the plugin, keyed by the table name
// NOTE: it must be NULL for plugins with dynamic schema
TableMap map[string]*Table
TableMapFunc TableMapFunc
DefaultTransform *transform.ColumnTransforms
TableMap map[string]*Table
TableMapFunc TableMapFunc
DefaultTransform *transform.ColumnTransforms
// deprecated - use RateLimiters to control concurrency
DefaultConcurrency *DefaultConcurrencyConfig
DefaultRetryConfig *RetryConfig
DefaultIgnoreConfig *IgnoreConfig
Expand Down Expand Up @@ -120,6 +121,7 @@ type Plugin struct {
resolvedRateLimiterDefs map[string]*rate_limiter.Definition
// lock for this map
rateLimiterDefsMut sync.RWMutex

// map of call ids to avoid duplicates
callIdLookup map[string]struct{}
callIdLookupMut sync.RWMutex
Expand Down
29 changes: 12 additions & 17 deletions plugin/query_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,9 @@ type QueryData struct {
fetchLimiters *fetchCallRateLimiters

// all the columns that will be returned by this query
columns map[string]*QueryColumn
concurrencyManager *concurrencyManager
rowDataChan chan *rowData
errorChan chan error
columns map[string]*QueryColumn
rowDataChan chan *rowData
errorChan chan error
// channel to send results
outputChan chan *proto.ExecuteResponse
// wait group used to synchronise parent-child list fetches - each child hydrate function increments this wait group
Expand Down Expand Up @@ -207,7 +206,6 @@ func newQueryData(connectionCallId string, p *Plugin, queryContext *QueryContext

// build list of all columns returned by these hydrate calls (and the fetch call)
d.populateColumns()
d.concurrencyManager = newConcurrencyManager(table)
// populate the query status
// if a limit is set, use this to set rows required - otherwise just set to MaxInt32
d.queryStatus = newQueryStatus(d.QueryContext.Limit)
Expand Down Expand Up @@ -244,16 +242,15 @@ func (d *QueryData) ShallowCopy() *QueryData {
cacheTtl: d.cacheTtl,
cacheEnabled: d.cacheEnabled,

fetchLimiters: d.fetchLimiters,
filteredMatrix: d.filteredMatrix,
hydrateCalls: d.hydrateCalls,
concurrencyManager: d.concurrencyManager,
rowDataChan: d.rowDataChan,
errorChan: d.errorChan,
outputChan: d.outputChan,
listWg: d.listWg,
columns: d.columns,
queryStatus: d.queryStatus,
fetchLimiters: d.fetchLimiters,
filteredMatrix: d.filteredMatrix,
hydrateCalls: d.hydrateCalls,
rowDataChan: d.rowDataChan,
errorChan: d.errorChan,
outputChan: d.outputChan,
listWg: d.listWg,
columns: d.columns,
queryStatus: d.queryStatus,
}

// NOTE: we create a deep copy of the keyColumnQuals
Expand Down Expand Up @@ -715,8 +712,6 @@ func (d *QueryData) streamRows(ctx context.Context, rowChan chan *proto.Row, don
log.Printf("[INFO] QueryData streamRows (%s)", d.connectionCallId)

defer func() {
// tell the concurrency manage we are done (it may log the concurrency stats)
d.concurrencyManager.Close()
log.Printf("[INFO] QueryData streamRows DONE (%s)", d.connectionCallId)

// if there is an error or cancellation, abort the pending set
Expand Down
4 changes: 2 additions & 2 deletions plugin/row_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,9 @@ func (r *rowData) startAllHydrateCalls(rowDataCtx context.Context, rowQueryData
}

// so call needs to start - can it?
if call.canStart(r, hydrateFuncName, r.queryData.concurrencyManager) {
if call.canStart(r) {
// execute the hydrate call asynchronously
rateLimitDelay := call.start(rowDataCtx, r, rowQueryData, r.queryData.concurrencyManager)
rateLimitDelay := call.start(rowDataCtx, r, rowQueryData)
// store the call metadata
r.hydrateMetadata = append(r.hydrateMetadata, &hydrateMetadata{
Type: "hydrate",
Expand Down
20 changes: 11 additions & 9 deletions rate_limiter/definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,27 @@ type Definition struct {
Name string
// the actual limiter config
FillRate rate.Limit
BucketSize int
BucketSize int64

// the scopes which identify this limiter instance
// one limiter instance will be created for each combination of scopes which is encountered
MaxConcurrency int64
// the scope properties which identify this limiter instance
// one limiter instance will be created for each combination of these properties which is encountered
Scope []string

// filter used to target the limiter
Where string
parsedFilter *scopeFilter
}

// DefintionsFromProto converts the proto format RateLimiterDefinition into a Defintion
// DefinitionFromProto converts the proto format RateLimiterDefinition into a Defintion
func DefinitionFromProto(p *proto.RateLimiterDefinition) (*Definition, error) {
var res = &Definition{
Name: p.Name,
FillRate: rate.Limit(p.FillRate),
BucketSize: int(p.BucketSize),
Scope: p.Scope,
Where: p.Where,
Name: p.Name,
FillRate: rate.Limit(p.FillRate),
BucketSize: p.BucketSize,
MaxConcurrency: p.MaxConcurrency,
Scope: p.Scope,
Where: p.Where,
}
if err := res.Initialise(); err != nil {
return nil, err
Expand Down
7 changes: 4 additions & 3 deletions rate_limiter/limiter_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,10 @@ func (m *LimiterMap) GetOrCreate(l *Definition, scopeValues map[string]string) (

// ok we need to create one
limiter = &Limiter{
Limiter: rate.NewLimiter(l.FillRate, l.BucketSize),
Name: l.Name,
scopeValues: scopeValues,
Limiter: rate.NewLimiter(l.FillRate, int(l.BucketSize)),
Name: l.Name,
MaxConcurrency: l.MaxConcurrency,
scopeValues: scopeValues,
}
// put it in the map
m.limiters[key] = limiter
Expand Down
Loading

0 comments on commit 2d56ee9

Please sign in to comment.