Skip to content

Commit

Permalink
feat!: vec-380 vec-381 vec-496 Merge pull request #19 from aerospike/…
Browse files Browse the repository at this point in the history
…vec-380-cache-records

feat!: vec-380 support record caching
  • Loading branch information
dwelch-spike authored Oct 29, 2024
2 parents be1ca23 + ca564e1 commit 9b7a2c1
Show file tree
Hide file tree
Showing 14 changed files with 444 additions and 113 deletions.
32 changes: 15 additions & 17 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,20 +76,18 @@ run:
# skip-files:
# - sample

# issues:
# exclude-rules:
# - path: info/as_parser_test\.go
# linters:
# - lll # Test code is allowed to have long lines
# - path: asconfig/generate_test\.go
# linters:
# - dupl # Test code is allowed to have duplicate code
# - path: asconfig/asconfig_test\.go
# linters:
# - dupl # Test code is allowed to have duplicate code
# - path: '(.+)test\.go'
# linters:
# - govet # Test code field alignment for sake of space is not a concern
# - linters:
# - lll
# source: "// "
issues:
exclude-rules:
- path: info/as_parser_test\.go
linters:
- lll # Test code is allowed to have long lines
- path: asconfig/generate_test\.go
linters:
- dupl # Test code is allowed to have duplicate code
- path: asconfig/asconfig_test\.go
linters:
- dupl # Test code is allowed to have duplicate code
- path: '(.+)test\.go'
linters:
- govet # Test code field alignment for sake of space is not a concern
- wsl # Auto generated tests cuddle assignments
6 changes: 4 additions & 2 deletions cmd/flags/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@ const (
BatchIndexInterval = "hnsw-batch-index-interval"
BatchMaxReindexRecords = "hnsw-batch-max-reindex-records"
BatchReindexInterval = "hnsw-batch-reindex-interval"
HnswCacheMaxEntries = "hnsw-cache-max-entries"
HnswCacheExpiry = "hnsw-cache-expiry"
HnswIndexCacheMaxEntries = "hnsw-index-cache-max-entries"
HnswIndexCacheExpiry = "hnsw-index-cache-expiry"
HnswRecordCacheMaxEntries = "hnsw-record-cache-max-entries"
HnswRecordCacheExpiry = "hnsw-record-cache-expiry"
HnswHealerMaxScanRatePerNode = "hnsw-healer-max-scan-rate-per-node"
HnswHealerMaxScanPageSize = "hnsw-healer-max-scan-page-size"
HnswHealerReindexPercent = "hnsw-healer-reindex-percent"
Expand Down
46 changes: 37 additions & 9 deletions cmd/flags/hnsw.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,31 +41,59 @@ func (cf *BatchingFlags) NewSLogAttr() []any {
}
}

type CachingFlags struct {
type IndexCachingFlags struct {
MaxEntries Uint64OptionalFlag
Expiry InfDurationOptionalFlag
}

func NewHnswCachingFlags() *CachingFlags {
return &CachingFlags{
func NewHnswIndexCachingFlags() *IndexCachingFlags {
return &IndexCachingFlags{
MaxEntries: Uint64OptionalFlag{},
Expiry: InfDurationOptionalFlag{},
}
}

//nolint:lll // For readability
func (cf *CachingFlags) NewFlagSet() *pflag.FlagSet {
func (cf *IndexCachingFlags) NewFlagSet() *pflag.FlagSet {
flagSet := &pflag.FlagSet{}
flagSet.Var(&cf.MaxEntries, HnswCacheMaxEntries, "Maximum number of entries to cache.")
flagSet.Var(&cf.Expiry, HnswCacheExpiry, "A cache entry will expire after this amount of time has passed since the entry was added to cache, or 'inf' to never expire.")
flagSet.Var(&cf.MaxEntries, HnswIndexCacheMaxEntries, "Maximum number of entries to cache.")
flagSet.Var(&cf.Expiry, HnswIndexCacheExpiry, "A cache entry will expire after this amount of time has passed since the entry was added to cache, or -1 to never expire.")

return flagSet
}

func (cf *CachingFlags) NewSLogAttr() []any {
func (cf *IndexCachingFlags) NewSLogAttr() []any {
return []any{
slog.Any(HnswCacheMaxEntries, cf.MaxEntries.Val),
slog.String(HnswCacheExpiry, cf.Expiry.String()),
slog.Any(HnswIndexCacheMaxEntries, cf.MaxEntries.Val),
slog.String(HnswIndexCacheExpiry, cf.Expiry.String()),
}
}

type RecordCachingFlags struct {
MaxEntries Uint64OptionalFlag
Expiry InfDurationOptionalFlag
}

func NewHnswRecordCachingFlags() *RecordCachingFlags {
return &RecordCachingFlags{
MaxEntries: Uint64OptionalFlag{},
Expiry: InfDurationOptionalFlag{},
}
}

//nolint:lll // For readability
func (cf *RecordCachingFlags) NewFlagSet() *pflag.FlagSet {
flagSet := &pflag.FlagSet{}
flagSet.Var(&cf.MaxEntries, HnswRecordCacheMaxEntries, "Maximum number of entries to cache.")
flagSet.Var(&cf.Expiry, HnswRecordCacheExpiry, "A cache entry will expire after this amount of time has passed since the entry was added to cache, or -1 to never expire.")

return flagSet
}

func (cf *RecordCachingFlags) NewSLogAttr() []any {
return []any{
slog.Any(HnswRecordCacheMaxEntries, cf.MaxEntries.Val),
slog.String(HnswRecordCacheExpiry, cf.Expiry.String()),
}
}

Expand Down
8 changes: 4 additions & 4 deletions cmd/flags/optionals.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,8 @@ func (f *DurationOptionalFlag) Int64() *int64 {
return &milli
}

// InfDurationOptionalFlag is a flag that can be either a time.duration or infinity.
// It is used for flags like --hnsw-cache-expiry which can be set to "infinity"
// InfDurationOptionalFlag is a flag that can be either a time.duration or -1 (never expire).
// It is used for flags like --hnsw-index-cache-expiry which can be set to never expire (-1)
type InfDurationOptionalFlag struct {
duration DurationOptionalFlag
isInfinite bool
Expand All @@ -224,7 +224,7 @@ func (f *InfDurationOptionalFlag) Set(val string) error {

val = strings.ToLower(val)

if val == "inf" || val == "infinity" || val == "-1" {
if val == strconv.Itoa(Infinity) {
f.isInfinite = true
} else {
return fmt.Errorf("invalid duration %s", val)
Expand All @@ -239,7 +239,7 @@ func (f *InfDurationOptionalFlag) Type() string {

func (f *InfDurationOptionalFlag) String() string {
if f.isInfinite {
return "infinity"
return "-1"
}

if f.duration.Val != nil {
Expand Down
22 changes: 2 additions & 20 deletions cmd/flags/optionals_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,30 +114,12 @@ func (suite *OptionalFlagSuite) TestDurationOptionalFlag() {
func (suite *OptionalFlagSuite) TestInfDurationOptionalFlag() {
f := &InfDurationOptionalFlag{}

err := f.Set("inf")
err := f.Set("-1")
if err != nil {
suite.T().Errorf("Unexpected error: %v", err)
}

suite.Equal("infinity", f.String())
suite.Equal(int64(-1), *f.Int64())
f = &InfDurationOptionalFlag{}

err = f.Set("infinity")
if err != nil {
suite.T().Errorf("Unexpected error: %v", err)
}

suite.Equal("infinity", f.String())
suite.Equal(int64(-1), *f.Int64())
f = &InfDurationOptionalFlag{}

err = f.Set("-1")
if err != nil {
suite.T().Errorf("Unexpected error: %v", err)
}

suite.Equal("infinity", f.String())
suite.Equal("-1", f.String())
suite.Equal(int64(-1), *f.Int64())
f = &InfDurationOptionalFlag{}

Expand Down
20 changes: 14 additions & 6 deletions cmd/indexCreate.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ var indexCreateFlags = &struct {
hnswConstructionEf flags.Uint32OptionalFlag
hnswMaxMemQueueSize flags.Uint32OptionalFlag
hnswBatch flags.BatchingFlags
hnswCache flags.CachingFlags
hnswIndexCache flags.IndexCachingFlags
hnswRecordCache flags.RecordCachingFlags
hnswHealer flags.HealerFlags
hnswMerge flags.MergeFlags
hnswVectorIntegrityCheck flags.BoolOptionalFlag
Expand All @@ -52,7 +53,8 @@ var indexCreateFlags = &struct {
hnswConstructionEf: flags.Uint32OptionalFlag{},
hnswMaxMemQueueSize: flags.Uint32OptionalFlag{},
hnswBatch: *flags.NewHnswBatchingFlags(),
hnswCache: *flags.NewHnswCachingFlags(),
hnswIndexCache: *flags.NewHnswIndexCachingFlags(),
hnswRecordCache: *flags.NewHnswRecordCachingFlags(),
hnswHealer: *flags.NewHnswHealerFlags(),
hnswMerge: *flags.NewHnswMergeFlags(),
hnswVectorIntegrityCheck: flags.BoolOptionalFlag{},
Expand All @@ -77,7 +79,8 @@ func newIndexCreateFlagSet() *pflag.FlagSet {
flagSet.Var(&indexCreateFlags.hnswMaxMemQueueSize, flags.HnswMaxMemQueueSize, "Maximum size of in-memory queue for inserted/updated vector records.") //nolint:lll // For readability //nolint:lll // For readability
flagSet.Var(&indexCreateFlags.hnswVectorIntegrityCheck, flags.HnswVectorIntegrityCheck, "Enable/disable vector integrity check. Defaults to enabled.") //nolint:lll // For readability
flagSet.AddFlagSet(indexCreateFlags.hnswBatch.NewFlagSet())
flagSet.AddFlagSet(indexCreateFlags.hnswCache.NewFlagSet())
flagSet.AddFlagSet(indexCreateFlags.hnswIndexCache.NewFlagSet())
flagSet.AddFlagSet(indexCreateFlags.hnswRecordCache.NewFlagSet())
flagSet.AddFlagSet(indexCreateFlags.hnswHealer.NewFlagSet())
flagSet.AddFlagSet(indexCreateFlags.hnswMerge.NewFlagSet())

Expand Down Expand Up @@ -212,7 +215,8 @@ asvec index create -i myindex -n test -s testset -d 256 -m COSINE --%s vector \
RunE: func(_ *cobra.Command, _ []string) error {
debugFlags := indexCreateFlags.clientFlags.NewSLogAttr()
debugFlags = append(debugFlags, indexCreateFlags.hnswBatch.NewSLogAttr()...)
debugFlags = append(debugFlags, indexCreateFlags.hnswCache.NewSLogAttr()...)
debugFlags = append(debugFlags, indexCreateFlags.hnswIndexCache.NewSLogAttr()...)
debugFlags = append(debugFlags, indexCreateFlags.hnswRecordCache.NewSLogAttr()...)
debugFlags = append(debugFlags, indexCreateFlags.hnswHealer.NewSLogAttr()...)
debugFlags = append(debugFlags, indexCreateFlags.hnswMerge.NewSLogAttr()...)
logger.Debug("parsed flags",
Expand Down Expand Up @@ -340,8 +344,12 @@ func runCreateIndexFromFlags(client *avs.Client) error {
ReindexInterval: indexCreateFlags.hnswBatch.ReindexInterval.Uint32(),
},
IndexCachingParams: &protos.HnswCachingParams{
MaxEntries: indexCreateFlags.hnswCache.MaxEntries.Val,
Expiry: indexCreateFlags.hnswCache.Expiry.Int64(),
MaxEntries: indexCreateFlags.hnswIndexCache.MaxEntries.Val,
Expiry: indexCreateFlags.hnswIndexCache.Expiry.Int64(),
},
RecordCachingParams: &protos.HnswCachingParams{
MaxEntries: indexCreateFlags.hnswRecordCache.MaxEntries.Val,
Expiry: indexCreateFlags.hnswRecordCache.Expiry.Int64(),
},
HealerParams: &protos.HnswHealerParams{
MaxScanRatePerNode: indexCreateFlags.hnswHealer.MaxScanRatePerNode.Val,
Expand Down
22 changes: 15 additions & 7 deletions cmd/indexUpdate.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,17 @@ var indexUpdateFlags = &struct {
indexLabels map[string]string
hnswMaxMemQueueSize flags.Uint32OptionalFlag
hnswBatch flags.BatchingFlags
hnswCache flags.CachingFlags
hnswIndexCache flags.IndexCachingFlags
hnswRecordCache flags.RecordCachingFlags
hnswHealer flags.HealerFlags
hnswMerge flags.MergeFlags
hnswVectorIntegrityCheck flags.BoolOptionalFlag
}{
clientFlags: rootFlags.clientFlags,
hnswMaxMemQueueSize: flags.Uint32OptionalFlag{},
hnswBatch: *flags.NewHnswBatchingFlags(),
hnswCache: *flags.NewHnswCachingFlags(),
hnswIndexCache: *flags.NewHnswIndexCachingFlags(),
hnswRecordCache: *flags.NewHnswRecordCachingFlags(),
hnswHealer: *flags.NewHnswHealerFlags(),
hnswMerge: *flags.NewHnswMergeFlags(),
hnswVectorIntegrityCheck: flags.BoolOptionalFlag{},
Expand All @@ -43,7 +45,8 @@ func newIndexUpdateFlagSet() *pflag.FlagSet {
flagSet.Var(&indexUpdateFlags.hnswMaxMemQueueSize, flags.HnswMaxMemQueueSize, "Maximum size of in-memory queue for inserted/updated vector records.") //nolint:lll // For readability
flagSet.Var(&indexUpdateFlags.hnswVectorIntegrityCheck, flags.HnswVectorIntegrityCheck, "Enable/disable vector integrity check. Defaults to enabled.") //nolint:lll // For readability
flagSet.AddFlagSet(indexUpdateFlags.hnswBatch.NewFlagSet())
flagSet.AddFlagSet(indexUpdateFlags.hnswCache.NewFlagSet())
flagSet.AddFlagSet(indexUpdateFlags.hnswIndexCache.NewFlagSet())
flagSet.AddFlagSet(indexUpdateFlags.hnswRecordCache.NewFlagSet())
flagSet.AddFlagSet(indexUpdateFlags.hnswHealer.NewFlagSet())
flagSet.AddFlagSet(indexUpdateFlags.hnswMerge.NewFlagSet())

Expand All @@ -70,14 +73,15 @@ For example:
%s
asvec index update -i myindex -n test --%s 10000 --%s 10000ms --%s 10s --%s 16 --%s 16
`, HelpTxtSetupEnv, flags.BatchMaxIndexRecords, flags.BatchIndexInterval,
flags.HnswCacheExpiry, flags.HnswHealerParallelism, flags.HnswMergeParallelism),
flags.HnswIndexCacheExpiry, flags.HnswHealerParallelism, flags.HnswMergeParallelism),
PreRunE: func(_ *cobra.Command, _ []string) error {
return checkSeedsAndHost()
},
RunE: func(_ *cobra.Command, _ []string) error {
debugFlags := indexUpdateFlags.clientFlags.NewSLogAttr()
debugFlags = append(debugFlags, indexUpdateFlags.hnswBatch.NewSLogAttr()...)
debugFlags = append(debugFlags, indexUpdateFlags.hnswCache.NewSLogAttr()...)
debugFlags = append(debugFlags, indexUpdateFlags.hnswIndexCache.NewSLogAttr()...)
debugFlags = append(debugFlags, indexUpdateFlags.hnswRecordCache.NewSLogAttr()...)
debugFlags = append(debugFlags, indexUpdateFlags.hnswHealer.NewSLogAttr()...)
debugFlags = append(debugFlags, indexUpdateFlags.hnswMerge.NewSLogAttr()...)
logger.Debug("parsed flags",
Expand Down Expand Up @@ -114,8 +118,12 @@ asvec index update -i myindex -n test --%s 10000 --%s 10000ms --%s 10s --%s 16 -
MaxMemQueueSize: indexUpdateFlags.hnswMaxMemQueueSize.Val,
BatchingParams: batchingParams,
IndexCachingParams: &protos.HnswCachingParams{
MaxEntries: indexUpdateFlags.hnswCache.MaxEntries.Val,
Expiry: indexUpdateFlags.hnswCache.Expiry.Int64(),
MaxEntries: indexUpdateFlags.hnswIndexCache.MaxEntries.Val,
Expiry: indexUpdateFlags.hnswIndexCache.Expiry.Int64(),
},
RecordCachingParams: &protos.HnswCachingParams{
MaxEntries: indexUpdateFlags.hnswRecordCache.MaxEntries.Val,
Expiry: indexUpdateFlags.hnswRecordCache.Expiry.Int64(),
},
HealerParams: &protos.HnswHealerParams{
MaxScanRatePerNode: indexUpdateFlags.hnswHealer.MaxScanRatePerNode.Val,
Expand Down
62 changes: 58 additions & 4 deletions cmd/writers/indexList.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@ func NewIndexTableWriter(writer io.Writer, verbose bool, logger *slog.Logger) *I
"Dimensions",
"Distance Metric",
"Unmerged",
"Vector Records",
"Size",
"Unmerged %",
}
verboseHeadings := append(table.Row{}, headings...)
verboseHeadings = append(
verboseHeadings,
"Vector Records",
"Vertices",
"Labels*",
"Storage",
Expand Down Expand Up @@ -79,11 +81,13 @@ func (itw *IndexTableWriter) AppendIndexRow(
index.Dimensions,
index.VectorDistanceMetric,
status.GetUnmergedRecordCount(),
status.GetIndexHealerVectorRecordsIndexed(),
formatBytes(calculateIndexSize(index, status)),
getPercentUnmerged(status),
}

if itw.verbose {
row = append(row,
status.GetIndexHealerVectorRecordsIndexed(),
status.GetIndexHealerVerticesValid(),
index.Labels,
)
Expand All @@ -107,8 +111,10 @@ func (itw *IndexTableWriter) AppendIndexRow(
{"Batch Index Interval*", convertMillisecondToDuration(uint64(v.HnswParams.BatchingParams.GetIndexInterval()))},
{"Batch Max Reindex Records*", v.HnswParams.BatchingParams.GetMaxReindexRecords()},
{"Batch Reindex Interval*", convertMillisecondToDuration(uint64(v.HnswParams.BatchingParams.GetReindexInterval()))},
{"Cache Max Entries*", v.HnswParams.IndexCachingParams.GetMaxEntries()},
{"Cache Expiry*", convertMillisecondToDuration(v.HnswParams.IndexCachingParams.GetExpiry())},
{"Index Cache Max Entries*", v.HnswParams.IndexCachingParams.GetMaxEntries()},
{"Index Cache Expiry*", convertMillisecondToDuration(v.HnswParams.IndexCachingParams.GetExpiry())},
{"Record Cache Max Entries*", v.HnswParams.RecordCachingParams.GetMaxEntries()},
{"Record Cache Expiry*", convertMillisecondToDuration(v.HnswParams.RecordCachingParams.GetExpiry())},
{"Healer Max Scan Rate / Node*", v.HnswParams.HealerParams.GetMaxScanRatePerNode()},
{"Healer Max Page Size*", v.HnswParams.HealerParams.GetMaxScanPageSize()},
{"Healer Re-index % *", convertFloatToPercentStr(v.HnswParams.HealerParams.GetReindexPercent())},
Expand Down Expand Up @@ -143,3 +149,51 @@ func convertMillisecondToDuration[T int64 | uint64 | uint32](m T) time.Duration
func convertFloatToPercentStr(f float32) string {
return fmt.Sprintf("%.2f%%", f)
}

// calculateIndexSize calculates the size of the index in bytes
func calculateIndexSize(index *protos.IndexDefinition, status *protos.IndexStatusResponse) int64 {
// Each dimension is a float32
vectorSize := int64(index.Dimensions) * 4
// Each index record has ~500 bytes of overhead + the vector size
indexRecSize := 500 + vectorSize
// The total size is the number of records times the size of each record
return indexRecSize * status.GetIndexHealerVerticesValid()
}

// formatBytes converts bytes to human readable string format
func formatBytes(bytes int64) string {
const (
B = 1
KB = 1024 * B
MB = 1024 * KB
GB = 1024 * MB
TB = 1024 * GB
PB = 1024 * TB
)

switch {
case bytes >= PB:
return fmt.Sprintf("%.2f PB", float64(bytes)/float64(PB))
case bytes >= TB:
return fmt.Sprintf("%.2f TB", float64(bytes)/float64(TB))
case bytes >= GB:
return fmt.Sprintf("%.2f GB", float64(bytes)/float64(GB))
case bytes >= MB:
return fmt.Sprintf("%.2f MB", float64(bytes)/float64(MB))
case bytes >= KB:
return fmt.Sprintf("%.2f KB", float64(bytes)/float64(KB))
default:
return fmt.Sprintf("%d B", bytes)
}
}

func getPercentUnmerged(status *protos.IndexStatusResponse) string {
unmergedCount := status.GetUnmergedRecordCount()

verticies := status.GetIndexHealerVerticesValid()
if verticies == 0 {
return "0%"
}

return fmt.Sprintf("%.2f%%", float64(unmergedCount)/float64(verticies)*100)
}
Loading

0 comments on commit 9b7a2c1

Please sign in to comment.