Skip to content

Commit

Permalink
tidy for release
Browse files Browse the repository at this point in the history
  • Loading branch information
kaidaguerre committed Nov 1, 2023
1 parent eba46cb commit 29f4bcb
Show file tree
Hide file tree
Showing 4 changed files with 4 additions and 124 deletions.
74 changes: 0 additions & 74 deletions connection/connection_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ import (
type ConnectionCache struct {
connectionName string
cache *cache.Cache[any]
//keys map[string]struct{}
//keysLock sync.Mutex
}

func NewConnectionCache(connectionName string, maxCost int64) (*ConnectionCache, error) {
Expand All @@ -36,7 +34,6 @@ func NewConnectionCache(connectionName string, maxCost int64) (*ConnectionCache,
cache := &ConnectionCache{
connectionName: connectionName,
cache: connectionCacheStore,
//keys: make(map[string]struct{}),
}

log.Printf("[INFO] Created connection cache for connection '%s'", connectionName)
Expand All @@ -51,18 +48,12 @@ func (c *ConnectionCache) Set(ctx context.Context, key string, value interface{}
func (c *ConnectionCache) SetWithTTL(ctx context.Context, key string, value interface{}, ttl time.Duration) error {
// build a key which includes the connection name
key = c.buildCacheKey(key)
//c.keysLock.Lock()
//c.keys[key] = struct{}{}
//c.keysLock.Unlock()

//log.Printf("[INFO] SetWithTTL (connection %s, cache key %s) ", c.connectionName, key)
err := c.cache.Set(ctx,
key,
value,
// if ttl is zero there is no expiration
store.WithExpiration(ttl),
//// put connection name in tags
//store.WithTags([]string{c.connectionName}),
)

// wait for value to pass through buffers (necessary for ristretto)
Expand All @@ -71,25 +62,6 @@ func (c *ConnectionCache) SetWithTTL(ctx context.Context, key string, value inte
if err != nil {
log.Printf("[WARN] SetWithTTL (connection %s, cache key %s) failed - error %v", c.connectionName, key, err)
}
// TACTICAL
// verify this key has been set with the correct tag
//var foundKeyForTag bool
//var cacheKeys []string
//tagKey := fmt.Sprintf("gocache_tag_%s", c.connectionName)
//if result, err := c.cache.Get(ctx, tagKey); err == nil {
// if bytes, ok := result.([]byte); ok {
// cacheKeys = strings.Split(string(bytes), ",")
// for _, k := range cacheKeys {
// if k == key {
// foundKeyForTag = true
// break
// }
// }
// }
//}
//if !foundKeyForTag {
// log.Printf("[WARN] SetWithTTL (connection %s, cache key %s) - key NOT found for tag %s", c.connectionName, key, c.connectionName)
//}

return err
}
Expand All @@ -114,52 +86,6 @@ func (c *ConnectionCache) Delete(ctx context.Context, key string) {
func (c *ConnectionCache) Clear(ctx context.Context) error {
log.Printf("[INFO] ConnectionCache.Clear (%s)", c.connectionName)
return c.cache.Clear(ctx)

//// read tags keys from ristretto and verify they exist
//var cacheKeys []string
//tagKey := fmt.Sprintf("gocache_tag_%s", c.connectionName)
//if result, err := c.cache.Get(ctx, tagKey); err == nil {
// log.Printf("[INFO] existing cache key for connection '%s': %s ", c.connectionName, result)
//
// if bytes, ok := result.([]byte); ok {
// cacheKeys = strings.Split(string(bytes), ",")
// for _, k := range cacheKeys {
// _, err := c.cache.Get(ctx, k)
// if err != nil {
// log.Printf("[INFO] %s does not exists in cache", k)
// }
// }
// }
//}
//
////c.cache.Invalidate(ctx, store.WithInvalidateTags([]string{c.connectionName}))
////
////// now verify the tags have been deleted
////for _, k := range cacheKeys {
//// _, err := c.cache.Get(ctx, k)
//// if err == nil {
//// log.Printf("[INFO] cache key for connection '%s' : %s still exists in cache after clear", c.connectionName, k)
//// }
////}
////
////TODO TACTICAL
//c.keysLock.Lock()
//log.Printf("[INFO] ConnectionCache clear for connection '%s', deleting %d %s",
// c.connectionName,
// len(c.keys),
// pluralize.NewClient().Pluralize("key", len(c.keys), false),
//)
//for key := range c.keys {
// //_, err := c.cache.Get(ctx, key)
// //log.Printf("[INFO] before delete get %s returns err %v", key, err)
// c.cache.Delete(ctx, key)
// //time.Sleep(10 * time.Millisecond)
// //_, err = c.cache.Get(ctx, key)
// //log.Printf("[INFO] after delete get %s returns %v ", key, err)
//}
//c.keys = make(map[string]struct{})
//c.keysLock.Unlock()

}

func (c *ConnectionCache) buildCacheKey(key string) string {
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ require (
github.com/allegro/bigcache/v3 v3.1.0
github.com/danwakefield/fnmatch v0.0.0-20160403171240-cbb64ac3d964
github.com/eko/gocache/lib/v4 v4.1.5
github.com/eko/gocache/store/bigcache/v4 v4.2.1
github.com/eko/gocache/store/ristretto/v4 v4.2.1
github.com/fsnotify/fsnotify v1.6.0
github.com/hashicorp/go-getter v1.7.2
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.40.0
Expand Down Expand Up @@ -56,8 +58,6 @@ require (
github.com/btubbs/datetime v0.1.1 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/eko/gocache/store/bigcache/v4 v4.2.1 // indirect
github.com/eko/gocache/store/ristretto/v4 v4.2.1 // indirect
github.com/fatih/color v1.15.0 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
Expand Down
20 changes: 0 additions & 20 deletions plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,6 @@ type Plugin struct {
HydrateConfig []HydrateConfig

queryCache *query_cache.QueryCache
// shared connection cache - this is the underlying cache used for all queryData ConnectionCache
//connectionCacheStore *cache.Cache[any]
// map of the connection caches, keyed by connection name
connectionCacheMap map[string]*connectionmanager.ConnectionCache
connectionCacheMapLock sync.Mutex
Expand Down Expand Up @@ -185,10 +183,6 @@ func (p *Plugin) initialise(logger hclog.Logger) {
p.WatchedFileChangedFunc = defaultWatchedFilesChangedFunc
}

//if err := p.createConnectionCacheStore(); err != nil {
// panic(fmt.Sprintf("failed to create connection cache: %s", err.Error()))
//}

// set temporary dir for this plugin
// this will only created if getSourceFiles is used
p.tempDir = path.Join(os.TempDir(), p.Name)
Expand Down Expand Up @@ -241,20 +235,6 @@ func (p *Plugin) shutdown() {
}
}

//func (p *Plugin) createConnectionCacheStore() error {
// ristrettoCache, err := ristretto.NewCache(&ristretto.Config{
// NumCounters: 1000,
// MaxCost: 100000,
// BufferItems: 64,
// })
// if err != nil {
// return err
// }
// ristrettoStore := ristretto_store.NewRistretto(ristrettoCache)
// p.connectionCacheStore = cache.New[any](ristrettoStore)
// return nil
//}

func (p *Plugin) ensureConnectionCache(connectionName string) (*connectionmanager.ConnectionCache, error) {
maxCost := int64(100000 / len(p.ConnectionMap))
connectionCache, err := connectionmanager.NewConnectionCache(connectionName, maxCost)
Expand Down
30 changes: 2 additions & 28 deletions plugin/plugin_connection_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@ package plugin

import (
"context"
"encoding/json"
"fmt"
"github.com/fsnotify/fsnotify"
"github.com/turbot/go-kit/helpers"
typehelpers "github.com/turbot/go-kit/types"
"github.com/turbot/steampipe-plugin-sdk/v5/grpc"
"github.com/turbot/steampipe-plugin-sdk/v5/grpc/proto"
"github.com/turbot/steampipe-plugin-sdk/v5/plugin/context_key"
Expand Down Expand Up @@ -65,7 +63,7 @@ func (p *Plugin) updateConnections(ctx context.Context, changed []*proto.Connect
connectionData, ok := p.getConnectionData(changedConnection.Connection)
if !ok {
// possible this connection may have failed to parse or something
// in which cadse there will be an error in updateData
// in which case there will be an error in updateData
continue
}
p.ConnectionConfigChangedFunc(ctx, p, existingConnections[c], connectionData.Connection)
Expand Down Expand Up @@ -140,7 +138,7 @@ func (p *Plugin) upsertConnectionData(config *proto.ConnectionConfig, updateData
// this is because its possible a query is executing already with the Connection object in it's QueryData
// if we replace the Connection with a new struct, any update we make to the Connection.Config will not be
// picked up by those running queries
// worst case scenario is that (for example) trhe Aws plugin may refresh the Client using the previous credentials
// worst case scenario is that (for example) the Aws plugin may refresh the Client using the previous credentials
d, alreadyHaveConnectionData := p.getConnectionData(connectionName)
if !alreadyHaveConnectionData {
// no data stored for this connection - create
Expand Down Expand Up @@ -214,31 +212,7 @@ func (p *Plugin) parseConnectionConfig(config *proto.ConnectionConfig) (any, err
log.Printf("[WARN] upsertConnectionData failed for connection %s, config validation failed: %s", config.Connection, err.Error())
return nil, err
}
type awsConfig struct {
Profile *string
AccessKey *string
SecretKey *string
SessionToken *string
}

// TODO TACTICAL log redacted AWS credentials
j, err := json.Marshal(configStruct)
if err == nil {
var target awsConfig
if err := json.Unmarshal(j, &target); err == nil {
secretKey := typehelpers.SafeString(target.SecretKey)
if l := len(secretKey); l > 0 {
secretKey = secretKey[l-4:]
}
sessionToken := typehelpers.SafeString(target.SessionToken)
if l := len(sessionToken); l > 0 {
sessionToken = sessionToken[l-4:]
}

log.Println("Info", "setConnectionData", "connection_name", config.Connection, "profile", typehelpers.SafeString(target.Profile), "accessKey", typehelpers.SafeString(target.AccessKey), "secretKey", secretKey, "sessionToken", sessionToken)

}
}
return configStruct, err
}

Expand Down

0 comments on commit 29f4bcb

Please sign in to comment.