Skip to content

Commit

Permalink
Add gauge metrics to track missedEvents and cache sizes (#5411)
Browse files Browse the repository at this point in the history
Signed-off-by: stevend <[email protected]>
Co-authored-by: Agustín Martínez Fayó <[email protected]>
  • Loading branch information
stevend-uber and amartinezfayo committed Aug 27, 2024
1 parent 4f34e43 commit 7b79272
Show file tree
Hide file tree
Showing 12 changed files with 303 additions and 68 deletions.
10 changes: 9 additions & 1 deletion doc/telemetry/telemetry.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,15 @@ The following metrics are emitted:
| Call Counter | `datastore`, `registration_entry_event`, `list` | | The Datastore is listing a registration entry events. |
| Call Counter | `datastore`, `registration_entry_event`, `prune` | | The Datastore is pruning expired registration entry events. |
| Call Counter | `datastore`, `registration_entry_event`, `fetch` | | The Datastore is fetching a specific registration entry event. |
| Call Counter | `entry`, `cache`, `reload` | | The Server is reloading its in-memory entry cache from the datastore. |
| Call Counter | `entry`, `cache`, `reload` | | The Server is reloading its in-memory entry cache from the datastore |
| Gauge | `node`, `agents_by_id_cache`, `count` | | The Server is re-hydrating the agents-by-id event-based cache |
| Gauge | `node`, `agents_by_expiresat_cache`, `count` | | The Server is re-hydrating the agents-by-expiresat event-based cache |
| Gauge | `node`, `skipped_node_event_ids`, `count` | | The count of skipped ids detected in the last `sql_transaction_timout` period. For databases that autoincrement ids by more than one, this number will overreport the skipped ids. [Issue](https://github.com/spiffe/spire/issues/5341) |
| Gauge | `entry`, `nodealiases_by_entryid_cache`, `count` | | The Server is re-hydrating the nodealiases-by-entryid event-based cache |
| Gauge | `entry`, `nodealiases_by_selector_cache`, `count` | | The Server is re-hydrating the nodealiases-by-selector event-based cache |
| Gauge | `entry`, `entries_by_entryid_cache`, `count` | | The Server is re-hydrating the entries-by-entryid event-based cache |
| Gauge | `entry`, `entries_by_parentid_cache`, `count` | | The Server is re-hydrating the entries-by-parentid event-based cache |
| Gauge | `entry`, `skipped_entry_event_ids`, `count` | | The count of skipped ids detected in the last sql_transaction_timout period. For databases that autoincrement ids by more than one, this number will overreport the skipped ids. [Issue](https://github.com/spiffe/spire/issues/5341) |
| Counter | `manager`, `jwt_key`, `activate` | | The CA manager has successfully activated a JWT Key. |
| Gauge | `manager`, `x509_ca`, `rotate`, `ttl` | `trust_domain_id` | The CA manager is rotating the X.509 CA with a given TTL for a specific Trust Domain. |
| Call Counter | `registration_entry`, `manager`, `prune` | | The Registration manager is pruning entries. |
Expand Down
25 changes: 23 additions & 2 deletions pkg/common/telemetry/names.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,24 @@ const (
// Cache functionality related to a cache
Cache = "cache"

// AgentsByIDCache functionality related to the agent btree cache indexed by ID
AgentsByIDCache = "agents_by_id_cache"

// AgentsByExpiresAtCache functionality related to the agent btree cache indexed by ExpiresAt
AgentsByExpiresAtCache = "agents_by_expiresat_cache"

// NodeAliasesByEntryIDCache functionality related to the node-aliases btree cache indexed by EntryID
NodeAliasesByEntryIDCache = "nodealiases_by_entryid_cache"

// NodeAliasesBySelectorCache functionality related to the node-aliases btree cache indexed by Selector
NodeAliasesBySelectorCache = "nodealiases_by_selector_cache"

// EntriesByEntryIDCache functionality related to the entries btree cache indexed by EntryID
EntriesByEntryIDCache = "entries_by_entryid_cache"

// EntriesByParentIDCache functionality related to the entries btree cache indexed by ParentID
EntriesByParentIDCache = "entries_by_parentid_cache"

// Cache type tag
CacheType = "cache_type"

Expand Down Expand Up @@ -861,8 +879,11 @@ const (
// ListAgents functionality related to listing agents
ListAgents = "list_agents"

// CountEntries functionality related to counting all registration entries
CountEntries = "count_entries"
// SkippedEntryEventIDs functionality related to counting missed entry event IDs
SkippedEntryEventIDs = "skipped_entry_event_ids"

// SkippedNodeEventIDs functionality related to counting missed node event IDs
SkippedNodeEventIDs = "skipped_node_event_ids"

// ListAllEntriesWithPages functionality related to listing all registration entries with pagination
ListAllEntriesWithPages = "list_all_entries_with_pages"
Expand Down
48 changes: 48 additions & 0 deletions pkg/common/telemetry/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,51 @@ import "github.com/spiffe/spire/pkg/common/telemetry"
func SetEntryDeletedGauge(m telemetry.Metrics, deleted int) {
m.SetGauge([]string{telemetry.Entry, telemetry.Deleted}, float32(deleted))
}

// SetAgentsByIDCacheCountGauge emits a gauge with the number of agents by ID that are
// currently in the node cache.
func SetAgentsByIDCacheCountGauge(m telemetry.Metrics, size int) {
m.SetGauge([]string{telemetry.Node, telemetry.AgentsByIDCache, telemetry.Count}, float32(size))
}

// SetAgentsByExpiresAtCacheCountGauge emits a gauge with the number of agents by expiresAt that are
// currently in the node cache.
func SetAgentsByExpiresAtCacheCountGauge(m telemetry.Metrics, size int) {
m.SetGauge([]string{telemetry.Node, telemetry.AgentsByExpiresAtCache, telemetry.Count}, float32(size))
}

// SetSkippedNodeEventIDsCacheCountGauge emits a gauge with the number of entries that are
// currently in the skipped-node events cache.
func SetSkippedNodeEventIDsCacheCountGauge(m telemetry.Metrics, size int) {
m.SetGauge([]string{telemetry.Node, telemetry.SkippedNodeEventIDs, telemetry.Count}, float32(size))
}

// SetNodeAliasesByEntryIDCacheCountGauge emits a gauge with the number of Node Aliases by EntryID that are
// currently in the entry cache.
func SetNodeAliasesByEntryIDCacheCountGauge(m telemetry.Metrics, size int) {
m.SetGauge([]string{telemetry.Entry, telemetry.NodeAliasesByEntryIDCache, telemetry.Count}, float32(size))
}

// SetNodeAliasesBySelectorCacheCountGauge emits a gauge with the number of Node Aliases by Selector that are
// currently in the entry cache.
func SetNodeAliasesBySelectorCacheCountGauge(m telemetry.Metrics, size int) {
m.SetGauge([]string{telemetry.Entry, telemetry.NodeAliasesBySelectorCache, telemetry.Count}, float32(size))
}

// SetEntriesByEntryIDCacheCountGauge emits a gauge with the number of entries by entryID that are
// currently in the entry cache.
func SetEntriesByEntryIDCacheCountGauge(m telemetry.Metrics, size int) {
m.SetGauge([]string{telemetry.Entry, telemetry.EntriesByEntryIDCache, telemetry.Count}, float32(size))
}

// SetEntriesByParentIDCacheCountGauge emits a gauge with the number of entries by parentID that are
// currently in the entry cache.
func SetEntriesByParentIDCacheCountGauge(m telemetry.Metrics, size int) {
m.SetGauge([]string{telemetry.Entry, telemetry.EntriesByParentIDCache, telemetry.Count}, float32(size))
}

// SetSkippedEntryEventIDsCacheCountGauge emits a gauge with the number of entries that are
// currently in the skipped-entry events cache.
func SetSkippedEntryEventIDsCacheCountGauge(m telemetry.Metrics, size int) {
m.SetGauge([]string{telemetry.Entry, telemetry.SkippedEntryEventIDs, telemetry.Count}, float32(size))
}
2 changes: 1 addition & 1 deletion pkg/server/authorizedentries/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func (c *Cache) removeEntry(entryID string) {
}
}

func (c *Cache) stats() cacheStats {
func (c *Cache) Stats() cacheStats {
return cacheStats{
AgentsByID: c.agentsByID.Len(),
AgentsByExpiresAt: c.agentsByExpiresAt.Len(),
Expand Down
24 changes: 12 additions & 12 deletions pkg/server/authorizedentries/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func TestCacheInternalStats(t *testing.T) {
clk := clock.NewMock(t)
t.Run("pristine", func(t *testing.T) {
cache := NewCache(clk)
require.Zero(t, cache.stats())
require.Zero(t, cache.Stats())
})

t.Run("entries and aliases", func(t *testing.T) {
Expand All @@ -189,34 +189,34 @@ func TestCacheInternalStats(t *testing.T) {
require.Equal(t, cacheStats{
EntriesByEntryID: 1,
EntriesByParentID: 1,
}, cache.stats())
}, cache.Stats())

cache.UpdateEntry(entry2a)
require.Equal(t, cacheStats{
EntriesByEntryID: 2,
EntriesByParentID: 2,
}, cache.stats())
}, cache.Stats())

cache.UpdateEntry(entry2b)
require.Equal(t, cacheStats{
EntriesByEntryID: 1,
EntriesByParentID: 1,
AliasesByEntryID: 2, // one for each selector
AliasesBySelector: 2, // one for each selector
}, cache.stats())
}, cache.Stats())

cache.RemoveEntry(entry1.Id)
require.Equal(t, cacheStats{
AliasesByEntryID: 2, // one for each selector
AliasesBySelector: 2, // one for each selector
}, cache.stats())
}, cache.Stats())

cache.RemoveEntry(entry2b.Id)
require.Zero(t, cache.stats())
require.Zero(t, cache.Stats())

// Remove again and make sure nothing happens.
cache.RemoveEntry(entry2b.Id)
require.Zero(t, cache.stats())
require.Zero(t, cache.Stats())
})

t.Run("agents", func(t *testing.T) {
Expand All @@ -225,28 +225,28 @@ func TestCacheInternalStats(t *testing.T) {
require.Equal(t, cacheStats{
AgentsByID: 1,
AgentsByExpiresAt: 1,
}, cache.stats())
}, cache.Stats())

cache.UpdateAgent(agent2.String(), now.Add(time.Hour*2), []*types.Selector{sel2})
require.Equal(t, cacheStats{
AgentsByID: 2,
AgentsByExpiresAt: 2,
}, cache.stats())
}, cache.Stats())

cache.UpdateAgent(agent2.String(), now.Add(time.Hour*3), []*types.Selector{sel2})
require.Equal(t, cacheStats{
AgentsByID: 2,
AgentsByExpiresAt: 2,
}, cache.stats())
}, cache.Stats())

cache.RemoveAgent(agent1.String())
require.Equal(t, cacheStats{
AgentsByID: 1,
AgentsByExpiresAt: 1,
}, cache.stats())
}, cache.Stats())

cache.RemoveAgent(agent2.String())
require.Zero(t, cache.stats())
require.Zero(t, cache.Stats())
})
}

Expand Down
11 changes: 6 additions & 5 deletions pkg/server/endpoints/authorized_entryfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/sirupsen/logrus"
"github.com/spiffe/go-spiffe/v2/spiffeid"
"github.com/spiffe/spire-api-sdk/proto/spire/api/types"
"github.com/spiffe/spire/pkg/common/telemetry"
"github.com/spiffe/spire/pkg/server/api"
"github.com/spiffe/spire/pkg/server/authorizedentries"
"github.com/spiffe/spire/pkg/server/datastore"
Expand Down Expand Up @@ -36,9 +37,9 @@ type eventsBasedCache interface {
pruneMissedEvents()
}

func NewAuthorizedEntryFetcherWithEventsBasedCache(ctx context.Context, log logrus.FieldLogger, clk clock.Clock, ds datastore.DataStore, cacheReloadInterval, pruneEventsOlderThan, sqlTransactionTimeout time.Duration) (*AuthorizedEntryFetcherWithEventsBasedCache, error) {
func NewAuthorizedEntryFetcherWithEventsBasedCache(ctx context.Context, log logrus.FieldLogger, metrics telemetry.Metrics, clk clock.Clock, ds datastore.DataStore, cacheReloadInterval, pruneEventsOlderThan, sqlTransactionTimeout time.Duration) (*AuthorizedEntryFetcherWithEventsBasedCache, error) {
log.Info("Building event-based in-memory entry cache")
cache, registrationEntries, attestedNodes, err := buildCache(ctx, log, ds, clk, sqlTransactionTimeout)
cache, registrationEntries, attestedNodes, err := buildCache(ctx, log, metrics, ds, clk, sqlTransactionTimeout)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -111,15 +112,15 @@ func (a *AuthorizedEntryFetcherWithEventsBasedCache) updateCache(ctx context.Con
return errors.Join(updateRegistrationEntriesCacheErr, updateAttestedNodesCacheErr)
}

func buildCache(ctx context.Context, log logrus.FieldLogger, ds datastore.DataStore, clk clock.Clock, sqlTransactionTimeout time.Duration) (*authorizedentries.Cache, *registrationEntries, *attestedNodes, error) {
func buildCache(ctx context.Context, log logrus.FieldLogger, metrics telemetry.Metrics, ds datastore.DataStore, clk clock.Clock, sqlTransactionTimeout time.Duration) (*authorizedentries.Cache, *registrationEntries, *attestedNodes, error) {
cache := authorizedentries.NewCache(clk)

registrationEntries, err := buildRegistrationEntriesCache(ctx, log, ds, clk, cache, buildCachePageSize, sqlTransactionTimeout)
registrationEntries, err := buildRegistrationEntriesCache(ctx, log, metrics, ds, clk, cache, buildCachePageSize, sqlTransactionTimeout)
if err != nil {
return nil, nil, nil, err
}

attestedNodes, err := buildAttestedNodesCache(ctx, log, ds, clk, cache, sqlTransactionTimeout)
attestedNodes, err := buildAttestedNodesCache(ctx, log, metrics, ds, clk, cache, sqlTransactionTimeout)
if err != nil {
return nil, nil, nil, err
}
Expand Down
21 changes: 15 additions & 6 deletions pkg/server/endpoints/authorized_entryfetcher_attested_nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import (

"github.com/andres-erbsen/clock"
"github.com/sirupsen/logrus"

"github.com/spiffe/spire/pkg/common/telemetry"
server_telemetry "github.com/spiffe/spire/pkg/common/telemetry/server"
"github.com/spiffe/spire/pkg/server/api"
"github.com/spiffe/spire/pkg/server/authorizedentries"
"github.com/spiffe/spire/pkg/server/datastore"
Expand All @@ -17,11 +19,12 @@ import (
)

type attestedNodes struct {
cache *authorizedentries.Cache
clk clock.Clock
ds datastore.DataStore
log logrus.FieldLogger
mu sync.RWMutex
cache *authorizedentries.Cache
clk clock.Clock
ds datastore.DataStore
log logrus.FieldLogger
metrics telemetry.Metrics
mu sync.RWMutex

firstEventID uint
firstEventTime time.Time
Expand All @@ -33,7 +36,7 @@ type attestedNodes struct {

// buildAttestedNodesCache fetches all attested nodes and adds the unexpired ones to the cache.
// It runs once at startup.
func buildAttestedNodesCache(ctx context.Context, log logrus.FieldLogger, ds datastore.DataStore, clk clock.Clock, cache *authorizedentries.Cache, sqlTransactionTimeout time.Duration) (*attestedNodes, error) {
func buildAttestedNodesCache(ctx context.Context, log logrus.FieldLogger, metrics telemetry.Metrics, ds datastore.DataStore, clk clock.Clock, cache *authorizedentries.Cache, sqlTransactionTimeout time.Duration) (*attestedNodes, error) {
resp, err := ds.ListAttestedNodesEvents(ctx, &datastore.ListAttestedNodesEventsRequest{})
if err != nil {
return nil, err
Expand Down Expand Up @@ -82,6 +85,7 @@ func buildAttestedNodesCache(ctx context.Context, log logrus.FieldLogger, ds dat
firstEventID: firstEventID,
firstEventTime: firstEventTime,
log: log,
metrics: metrics,
lastEventID: lastEventID,
missedEvents: missedEvents,
seenMissedStartupEvents: make(map[uint]struct{}),
Expand Down Expand Up @@ -142,6 +146,10 @@ func (a *attestedNodes) updateCache(ctx context.Context) error {
a.lastEventID = event.EventID
}

// These two should be the same value but it's valuable to have them both be emitted for incident triage.
server_telemetry.SetAgentsByExpiresAtCacheCountGauge(a.metrics, a.cache.Stats().AgentsByExpiresAt)
server_telemetry.SetAgentsByIDCacheCountGauge(a.metrics, a.cache.Stats().AgentsByID)

return nil
}

Expand Down Expand Up @@ -201,6 +209,7 @@ func (a *attestedNodes) replayMissedEvents(ctx context.Context) {

delete(a.missedEvents, eventID)
}
server_telemetry.SetSkippedNodeEventIDsCacheCountGauge(a.metrics, len(a.missedEvents))
}

// updatedCacheEntry update/deletes/creates an individual attested node in the cache.
Expand Down
Loading

0 comments on commit 7b79272

Please sign in to comment.