Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GO-3861 prioritise fulltext indexing for spaces [rebase] #1655

Draft
wants to merge 6 commits into
base: feature/chat
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/block/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,10 @@ func (s *Service) debugListObjects(req *http.Request) ([]debugObject, error) {
}
result := make([]debugObject, 0, len(ids))
for _, id := range ids {
obj, err := s.getDebugObject(id)
obj, err := s.getDebugObject(id.ObjectID)
if err != nil {
obj = debugObject{
ID: id,
ID: id.ObjectID,
Error: err.Error(),
}
}
Expand Down
6 changes: 4 additions & 2 deletions core/block/editor/smartblock/smarttest/smarttest.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,11 @@ func (st *SmartTest) SetSpace(space smartblock.Space) {
}

type stubSpace struct {
id string
}

func (s *stubSpace) Id() string {
return ""
return s.id
}

func (s *stubSpace) TreeBuilder() objecttreebuilder.TreeBuilder {
Expand Down Expand Up @@ -126,7 +127,8 @@ func (st *SmartTest) Space() smartblock.Space {
if st.space != nil {
return st.space
}
return &stubSpace{}

return &stubSpace{id: st.spaceId}
}

func (st *SmartTest) EnabledRelationAsDependentObjects() {
Expand Down
14 changes: 6 additions & 8 deletions core/debug/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,13 +230,6 @@ func (d *debug) DumpTree(ctx context.Context, objectID string, path string, anon
}

func (d *debug) DumpLocalstore(ctx context.Context, spaceID string, objIds []string, path string) (filename string, err error) {
if len(objIds) == 0 {
objIds, err = d.store.ListIdsCrossSpace()
if err != nil {
return "", err
}
}

filename = filepath.Join(path, fmt.Sprintf("at.store.dbg.%s.zip", time.Now().Format("20060102.150405.99")))
f, err := os.Create(filename)
if err != nil {
Expand All @@ -251,7 +244,12 @@ func (d *debug) DumpLocalstore(ctx context.Context, spaceID string, objIds []str
m := jsonpb.Marshaler{Indent: " "}

store := d.store.SpaceIndex(spaceID)

if len(objIds) == 0 {
objIds, err = store.ListIds()
if err != nil {
return "", err
}
}
for _, objId := range objIds {
doc, err := store.GetWithLinksInfoById(objId)
if err != nil {
Expand Down
24 changes: 12 additions & 12 deletions core/indexer/fulltext.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
var (
ftIndexInterval = 1 * time.Second
ftIndexForceMinInterval = time.Second * 10
ftBatchLimit = 1000
ftBatchLimit = 50
ftBlockMaxSize = 1024 * 1024
)

Expand All @@ -40,29 +40,28 @@ func (i *indexer) ForceFTIndex() {
// MUST NOT be called more than once
func (i *indexer) ftLoopRoutine() {
ticker := time.NewTicker(ftIndexInterval)
ctx := i.runCtx

i.runFullTextIndexer(ctx)
i.runFullTextIndexer(i.componentCtx, i.spacesPriorityGet())
defer close(i.ftQueueFinished)
var lastForceIndex time.Time
for {
select {
case <-ctx.Done():
case <-i.componentCtx.Done():
return
case <-ticker.C:
i.runFullTextIndexer(ctx)
i.runFullTextIndexer(i.componentCtx, i.spacesPriorityGet())
case <-i.forceFt:
if time.Since(lastForceIndex) > ftIndexForceMinInterval {
i.runFullTextIndexer(ctx)
i.runFullTextIndexer(i.componentCtx, i.spacesPriorityGet())
lastForceIndex = time.Now()
}
}
}
}

func (i *indexer) runFullTextIndexer(ctx context.Context) {
func (i *indexer) runFullTextIndexer(ctx context.Context, spaceIdsPriority []string) {
batcher := i.ftsearch.NewAutoBatcher(ftsearch.AutoBatcherRecommendedMaxDocs, ftsearch.AutoBatcherRecommendedMaxSize)
err := i.store.BatchProcessFullTextQueue(ctx, ftBatchLimit, func(objectIds []string) error {
err := i.store.BatchProcessFullTextQueue(ctx, spaceIdsPriority, ftBatchLimit, func(objectIds []string) error {
for _, objectId := range objectIds {
objDocs, err := i.prepareSearchDocument(ctx, objectId)
if err != nil {
Expand Down Expand Up @@ -216,13 +215,14 @@ func (i *indexer) ftInit() error {
return err
}
if docCount == 0 {
// query objects that are existing in the store
// if they are not existing in the object store, they will be indexed and added via reindexOutdatedObjects or on receiving via any-sync
ids, err := i.store.ListIdsCrossSpace()
fullIds, err := i.store.ListIdsCrossSpace()
if err != nil {
return err
}
if err := i.store.AddToIndexQueue(i.runCtx, ids...); err != nil {

log.Infof("adding %d objects to full-text index", len(fullIds))
err = i.store.AddToIndexQueue(i.componentCtx, fullIds...)
if err != nil {
return err
}

Expand Down
23 changes: 17 additions & 6 deletions core/indexer/fulltext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/anyproto/anytype-heart/core/block/editor/smartblock/smarttest"
"github.com/anyproto/anytype-heart/core/block/editor/state"
"github.com/anyproto/anytype-heart/core/block/source/mock_source"
"github.com/anyproto/anytype-heart/core/domain"
"github.com/anyproto/anytype-heart/core/indexer/mock_indexer"
"github.com/anyproto/anytype-heart/core/wallet"
"github.com/anyproto/anytype-heart/core/wallet/mock_wallet"
Expand Down Expand Up @@ -85,6 +86,7 @@ func NewIndexerFixture(t *testing.T) *IndexerFixture {
indexerFx.ftsearch = indxr.ftsearch
indexerFx.pickerFx = mock_cache.NewMockObjectGetter(t)
indxr.picker = indexerFx.pickerFx
indxr.componentCtx, indxr.componentCancel = context.WithCancel(context.Background())
indxr.spaceIndexers = make(map[string]*spaceIndexer)
indxr.forceFt = make(chan struct{})
indxr.config = &config.Config{NetworkMode: pb.RpcAccount_LocalOnly}
Expand Down Expand Up @@ -327,11 +329,14 @@ func TestRunFullTextIndexer(t *testing.T) {
blockbuilder.ID("blockId1"),
),
)))
indexerFx.store.AddToIndexQueue(context.Background(), "objectId"+strconv.Itoa(i))
indexerFx.store.AddToIndexQueue(domain.FullID{
ObjectID: "objectId" + strconv.Itoa(i),
SpaceID: "space1",
})
indexerFx.pickerFx.EXPECT().GetObject(mock.Anything, "objectId"+strconv.Itoa(i)).Return(smartTest, nil).Once()
}

indexerFx.runFullTextIndexer(context.Background())
indexerFx.runFullTextIndexer(context.Background(), []string{"space1"})

count, _ := indexerFx.ftsearch.DocCount()
assert.Equal(t, 10, int(count))
Expand All @@ -353,11 +358,14 @@ func TestRunFullTextIndexer(t *testing.T) {
),
)))
indexerFx.pickerFx.EXPECT().GetObject(mock.Anything, "objectId"+strconv.Itoa(i)).Return(smartTest, nil).Once()
indexerFx.store.AddToIndexQueue(context.Background(), "objectId"+strconv.Itoa(i))
indexerFx.store.AddToIndexQueue(domain.FullID{
ObjectID: "objectId" + strconv.Itoa(i),
SpaceID: "space1",
})

}

indexerFx.runFullTextIndexer(context.Background())
indexerFx.runFullTextIndexer(context.Background(), []string{"space1"})

count, _ = indexerFx.ftsearch.DocCount()
assert.Equal(t, 10, int(count))
Expand All @@ -382,9 +390,12 @@ func TestPrepareSearchDocument_Reindex_Removed(t *testing.T) {
blockbuilder.ID("blockId1"),
),
)))
indexerFx.store.AddToIndexQueue(context.Background(), "objectId1")
indexerFx.store.AddToIndexQueue(domain.FullID{
ObjectID: "objectId1",
SpaceID: "space1",
})
indexerFx.pickerFx.EXPECT().GetObject(mock.Anything, mock.Anything).Return(smartTest, nil)
indexerFx.runFullTextIndexer(context.Background())
indexerFx.runFullTextIndexer(context.Background(), []string{"space1"})

count, _ = indexerFx.ftsearch.DocCount()
assert.Equal(t, uint64(1), count)
Expand Down
124 changes: 104 additions & 20 deletions core/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@ import (

"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/commonspace/spacestorage"
"github.com/gogo/protobuf/types"
"go.uber.org/zap"

"github.com/anyproto/anytype-heart/core/anytype/config"
"github.com/anyproto/anytype-heart/core/block/cache"
"github.com/anyproto/anytype-heart/core/block/editor/smartblock"
"github.com/anyproto/anytype-heart/core/block/source"
"github.com/anyproto/anytype-heart/core/subscription"
"github.com/anyproto/anytype-heart/core/subscription/objectsubscription"
"github.com/anyproto/anytype-heart/pkg/lib/bundle"
"github.com/anyproto/anytype-heart/pkg/lib/database"
"github.com/anyproto/anytype-heart/pkg/lib/localstore/filestore"
Expand Down Expand Up @@ -49,25 +52,28 @@ type Hasher interface {
}

type indexer struct {
store objectstore.ObjectStore
fileStore filestore.FileStore
source source.Service
picker cache.ObjectGetter
ftsearch ftsearch.FTSearch
storageService storage.ClientStorage

runCtx context.Context
runCtxCancel context.CancelFunc
ftQueueFinished chan struct{}
config *config.Config
store objectstore.ObjectStore
fileStore filestore.FileStore
source source.Service
picker cache.ObjectGetter
ftsearch ftsearch.FTSearch
storageService storage.ClientStorage
subscriptionService subscription.Service

componentCtx context.Context
componentCtxCancel context.CancelFunc
ftQueueFinished chan struct{}
config *config.Config

btHash Hasher
forceFt chan struct{}

// state
lock sync.Mutex
reindexLogFields []zap.Field
spaceIndexers map[string]*spaceIndexer
techSpaceIdGetter techSpaceIdGetter
spacesPrioritySubscription *objectsubscription.ObjectSubscription[*types.Struct]
lock sync.Mutex
reindexLogFields []zap.Field
spacesPriority []string
spaceIndexers map[string]*spaceIndexer
}

func (i *indexer) Init(a *app.App) (err error) {
Expand All @@ -78,18 +84,29 @@ func (i *indexer) Init(a *app.App) (err error) {
i.fileStore = app.MustComponent[filestore.FileStore](a)
i.ftsearch = app.MustComponent[ftsearch.FTSearch](a)
i.picker = app.MustComponent[cache.ObjectGetter](a)
i.runCtx, i.runCtxCancel = context.WithCancel(context.Background())
i.ftQueueFinished = make(chan struct{})
i.forceFt = make(chan struct{})
i.config = app.MustComponent[*config.Config](a)
i.subscriptionService = app.MustComponent[subscription.Service](a)
i.componentCtx, i.componentCtxCancel = context.WithCancel(context.Background())
i.spaceIndexers = map[string]*spaceIndexer{}
i.techSpaceIdGetter = app.MustComponent[techSpaceIdGetter](a)
return
}

type techSpaceIdGetter interface {
TechSpaceId() string
}

func (i *indexer) Name() (name string) {
return CName
}

func (i *indexer) Run(context.Context) (err error) {
err = i.subscribeToSpaces()
if err != nil {
return
}
return i.StartFullTextIndex()
}

Expand All @@ -112,8 +129,8 @@ func (i *indexer) Close(ctx context.Context) (err error) {
delete(i.spaceIndexers, spaceId)
}
i.lock.Unlock()
if i.runCtxCancel != nil {
i.runCtxCancel()
if i.componentCtxCancel != nil {
i.componentCtxCancel()
// we need to wait for the ftQueue processing to be finished gracefully. Because we may be in the middle of badger transaction
<-i.ftQueueFinished
}
Expand All @@ -133,17 +150,84 @@ func (i *indexer) RemoveAclIndexes(spaceId string) (err error) {
if err != nil {
return
}
return i.store.SpaceIndex(spaceId).DeleteDetails(i.runCtx, ids)
return i.store.SpaceIndex(spaceId).DeleteDetails(i.componentCtx, ids)
}

func (i *indexer) Index(ctx context.Context, info smartblock.DocInfo, options ...smartblock.IndexOption) error {
i.lock.Lock()
spaceInd, ok := i.spaceIndexers[info.Space.Id()]
if !ok {
spaceInd = newSpaceIndexer(i.runCtx, i.store.SpaceIndex(info.Space.Id()), i.store, i.storageService)
spaceInd = newSpaceIndexer(i.componentCtx, i.store.SpaceIndex(info.Space.Id()), i.store, i.storageService)
i.spaceIndexers[info.Space.Id()] = spaceInd
}
i.lock.Unlock()

return spaceInd.Index(ctx, info, options...)
}

// subscribeToSpaces subscribes to the lastOpenedSpaces subscription
// it used by fulltext and reindexing to prioritize most recent spaces
func (i *indexer) subscribeToSpaces() error {
objectReq := subscription.SubscribeRequest{
SubId: "lastOpenedSpaces",
Internal: true,
SpaceId: i.techSpaceIdGetter.TechSpaceId(),
NoDepSubscription: true,
Keys: []string{bundle.RelationKeyTargetSpaceId.String(), bundle.RelationKeyLastOpenedDate.String(), bundle.RelationKeyLastModifiedDate.String()},
Filters: []*model.BlockContentDataviewFilter{
{
RelationKey: bundle.RelationKeyLayout.String(),
Condition: model.BlockContentDataviewFilter_Equal,
Value: pbtypes.Int64(int64(model.ObjectType_spaceView)),
},
},
Sorts: []*model.BlockContentDataviewSort{
{
RelationKey: bundle.RelationKeyLastOpenedDate.String(),
Type: model.BlockContentDataviewSort_Desc,
IncludeTime: true,
Format: model.RelationFormat_date,
EmptyPlacement: model.BlockContentDataviewSort_End,
},
},
}
spacePriorityUpdateChan := make(chan []*types.Struct)
go i.spacesPrioritySubscriptionWatcher(spacePriorityUpdateChan)
i.spacesPrioritySubscription = objectsubscription.New[*types.Struct](i.subscriptionService, objectsubscription.SubscriptionParams[*types.Struct]{
Request: objectReq,
Extract: func(t *types.Struct) (string, *types.Struct) {
return pbtypes.GetString(t, bundle.RelationKeyId.String()), t
},
Update: func(key string, value *types.Value, s2 *types.Struct) *types.Struct {
if s2 == nil {
// todo: shouldn't happen because of changes sort, but happen, need to debug
return nil
}
s2.Fields[key] = value
return s2
},
Unset: func(keys []string, s *types.Struct) *types.Struct {
return pbtypes.StructFilterKeys(s, keys)
},
UpdateChan: spacePriorityUpdateChan,
})
return i.spacesPrioritySubscription.Run()
}

func (i *indexer) spacesPriorityUpdate(priority []string) {
i.lock.Lock()
defer i.lock.Unlock()
i.spacesPriority = priority
}

func (i *indexer) spacesPriorityGet() []string {
i.lock.Lock()
defer i.lock.Unlock()
return i.spacesPriority
}

func (i *indexer) spacesPrioritySubscriptionWatcher(ch chan []*types.Struct) {
for records := range ch {
i.spacesPriorityUpdate(pbtypes.ExtractString(records, bundle.RelationKeyTargetSpaceId.String(), true))
}
}
Loading
Loading