Skip to content

Commit

Permalink
move index creation into go routine with 1 hour timeout (#199)
Browse files Browse the repository at this point in the history
  • Loading branch information
danielchalef authored Sep 24, 2023
1 parent 7dc6076 commit ad8d1aa
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 38 deletions.
2 changes: 2 additions & 0 deletions pkg/store/postgres/document_search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ func TestDocumentSearchWithIndexEndToEnd(t *testing.T) {
err = vci.CreateIndex(context.Background(), true)
assert.NoError(t, err)

pollIndexCreation(documentStore, collectionName, ctx, t)

// Set Collection's IsIndexed flag to true
col, err := documentStore.GetCollection(ctx, vci.Collection.Name)
assert.NoError(t, err)
Expand Down
88 changes: 51 additions & 37 deletions pkg/store/postgres/vector_col_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ import (
"fmt"
"math"
"sync"
"time"

"github.com/getzep/zep/pkg/models"

"github.com/uptrace/bun"
)

// reference: https://github.com/pgvector/pgvector#indexing

const IndexTimeout = 1 * time.Hour
const EmbeddingColName = "embedding"

// MinRowsForIndex is the minimum number of rows required to create an index. The pgvector docs
Expand Down Expand Up @@ -90,7 +90,6 @@ func (vci *VectorColIndex) CreateIndex(ctx context.Context, force bool) error {
}
// Lock the mutex for this collection.
IndexMutexMap[vci.Collection.Name].Lock()
defer IndexMutexMap[vci.Collection.Name].Unlock()

if vci.Collection.DistanceFunction != "cosine" {
return fmt.Errorf("only cosine distance function is currently supported")
Expand All @@ -108,40 +107,55 @@ func (vci *VectorColIndex) CreateIndex(ctx context.Context, force bool) error {

indexName := fmt.Sprintf("%s_%s_idx", vci.Collection.TableName, vci.ColName)

// Drop index if it exists
// We're using CONCURRENTLY for both drop and index operations. This means we can't run them in a transaction.
_, err := db.ExecContext(
ctx,
"DROP INDEX CONCURRENTLY IF EXISTS ?",
bun.Ident(indexName),
)
if err != nil {
return fmt.Errorf("error dropping index: %w", err)
}

// currently only supports cosine distance ops
_, err = db.ExecContext(
ctx,
"CREATE INDEX CONCURRENTLY ON ? USING ivfflat (embedding vector_cosine_ops) WITH (lists = ?)",
bun.Ident(vci.Collection.TableName),
vci.ListCount,
)
if err != nil {
return fmt.Errorf("error creating index: %w", err)
}

// Set Collection's IsIndexed flag to true
collection, err := vci.appState.DocumentStore.GetCollection(ctx, vci.Collection.Name)
if err != nil {
return fmt.Errorf("error getting collection: %w", err)
}
collection.IsIndexed = true
collection.ProbeCount = vci.ProbeCount
collection.ListCount = vci.ListCount
err = vci.appState.DocumentStore.UpdateCollection(ctx, collection)
if err != nil {
return fmt.Errorf("error updating collection: %w", err)
}
// run index creation in a goroutine with IndexTimeout
go func() {
defer IndexMutexMap[vci.Collection.Name].Unlock()
// Create a new context with a timeout
ctx, cancel := context.WithTimeout(ctx, IndexTimeout)
defer cancel()

// Drop index if it exists
// We're using CONCURRENTLY for both drop and index operations. This means we can't run them in a transaction.
_, err := db.ExecContext(
ctx,
"DROP INDEX CONCURRENTLY IF EXISTS ?",
bun.Ident(indexName),
)
if err != nil {
log.Error("error dropping index: ", err)
return
}

// currently only supports cosine distance ops
log.Infof("Starting index creation on %s", vci.Collection.Name)
_, err = db.ExecContext(
ctx,
"CREATE INDEX CONCURRENTLY ON ? USING ivfflat (embedding vector_cosine_ops) WITH (lists = ?)",
bun.Ident(vci.Collection.TableName),
vci.ListCount,
)
if err != nil {
log.Error("error creating index: ", err)
return
}

// Set Collection's IsIndexed flag to true
collection, err := vci.appState.DocumentStore.GetCollection(ctx, vci.Collection.Name)
if err != nil {
log.Error("error getting collection: ", err)
return
}
collection.IsIndexed = true
collection.ProbeCount = vci.ProbeCount
collection.ListCount = vci.ListCount
err = vci.appState.DocumentStore.UpdateCollection(ctx, collection)
if err != nil {
log.Error("error updating collection: ", err)
return
}

log.Infof("Index creation on %s completed successfully", collection.Name)
}()

return nil
}
Expand Down
30 changes: 29 additions & 1 deletion pkg/store/postgres/vector_col_indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"math"
"math/rand"
"testing"
"time"

"github.com/brianvoe/gofakeit/v6"

Expand Down Expand Up @@ -77,10 +78,12 @@ func TestCreateIndex(t *testing.T) {
)
assert.NoError(t, err)

// CreateIndex will add a timeout to the ctx
err = vci.CreateIndex(context.Background(), true)
assert.NoError(t, err)

// Set Collection's IsIndexed flag to true
pollIndexCreation(documentStore, collectionName, ctx, t)

col, err := documentStore.GetCollection(ctx, vci.Collection.Name)
assert.NoError(t, err)
assert.Equal(t, true, col.IsIndexed)
Expand Down Expand Up @@ -158,3 +161,28 @@ func generateRandomEmbeddings(embeddingCount int, embeddingWidth int) [][]float3

return embeddings
}

func pollIndexCreation(
documentStore *DocumentStore,
collectionName string,
ctx context.Context,
t *testing.T,
) {
timeout := time.After(10 * time.Minute)
tick := time.Tick(500 * time.Millisecond)
Loop:
for {
select {
case <-timeout:
t.Fatal("timed out waiting for index to be created")
case <-tick:
col, err := documentStore.GetCollection(ctx, collectionName)
if err != nil {
t.Fatal("error getting collection: ", err)
}
if col.IsIndexed {
break Loop
}
}
}
}

0 comments on commit ad8d1aa

Please sign in to comment.