Skip to content

Commit

Permalink
Merge pull request #54 from anyproto/GO-2321-check-missed-cids
Browse files Browse the repository at this point in the history
GO-2321 restore missed cids
  • Loading branch information
cheggaaa authored Nov 3, 2023
2 parents f59f5b2 + 14652c2 commit 0fbfded
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 6 deletions.
18 changes: 14 additions & 4 deletions index/cids.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func (ri *redisIndex) CidEntries(ctx context.Context, cids []cid.Cid) (entries *
entries = &CidEntries{}
for _, c := range cids {
if err = ri.getAndAddToEntries(ctx, entries, c); err != nil {
entries.Release()
return nil, err
}
}
Expand Down Expand Up @@ -63,14 +64,15 @@ func (ri *redisIndex) CidEntriesByBlocks(ctx context.Context, bs []blocks.Block)
}

func (ri *redisIndex) getAndAddToEntries(ctx context.Context, entries *CidEntries, c cid.Cid) (err error) {
ok, release, err := ri.AcquireKey(ctx, cidKey(c))
_, release, err := ri.AcquireKey(ctx, cidKey(c))
if err != nil {
return
}
if !ok {
//temporarily ignore the exists check to make a deep check
/*if !ok {
release()
return ErrCidsNotExist
}
}*/
entry, err := ri.getCidEntry(ctx, c)
if err != nil {
release()
Expand Down Expand Up @@ -130,7 +132,15 @@ func (ri *redisIndex) getCidEntry(ctx context.Context, c cid.Cid) (entry *cidEnt
cidData, err := ri.cl.Get(ctx, ck).Result()
if err != nil {
if errors.Is(err, redis.Nil) {
err = ErrCidsNotExist
// temporary additional check: try to load data from store and restore cid
var b blocks.Block
if b, err = ri.persistStore.Get(ctx, c); err != nil {
log.WarnCtx(ctx, "restore cid entry error", zap.String("cid", c.String()), zap.Error(err))
err = ErrCidsNotExist
return
}
log.InfoCtx(ctx, "restore cid entry", zap.String("cid", c.String()))
return ri.createCidEntry(ctx, b)
}
return
}
Expand Down
21 changes: 20 additions & 1 deletion index/cids_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package index

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"

"github.com/anyproto/any-sync-filenode/index/indexproto"
"github.com/anyproto/any-sync-filenode/testutil"
Expand Down Expand Up @@ -53,7 +55,7 @@ func TestRedisIndex_CidEntries(t *testing.T) {
require.NoError(t, fx.BlocksAdd(ctx, bs[:3]))

cids := testutil.BlocksToKeys(bs)

fx.persistStore.EXPECT().Get(ctx, gomock.Any()).Return(nil, fmt.Errorf("err")).AnyTimes()
_, err := fx.CidEntries(ctx, cids)
assert.EqualError(t, err, ErrCidsNotExist.Error())
})
Expand Down Expand Up @@ -88,6 +90,23 @@ func TestRedisIndex_CidEntries(t *testing.T) {
assert.NotEmpty(t, e.Version)
}
})
t.Run("restore from store", func(t *testing.T) {
bs := testutil.NewRandBlocks(4)
fx := newFixture(t)
defer fx.Finish(t)

require.NoError(t, fx.BlocksAdd(ctx, bs[:3]))

cids := testutil.BlocksToKeys(bs)

fx.persistStore.EXPECT().Get(ctx, bs[3].Cid()).Return(bs[3], nil)

result, err := fx.CidEntries(ctx, cids)
defer result.Release()
require.NoError(t, err)
require.Len(t, result.entries, len(bs))
t.Log(result.entries[3])
})
}

func TestRedisIndex_CidExistsInSpace(t *testing.T) {
Expand Down
6 changes: 5 additions & 1 deletion index/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (

"github.com/cespare/xxhash/v2"
"github.com/go-redsync/redsync/v4"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/redis/go-redis/v9"
"go.uber.org/zap"
)
Expand All @@ -32,6 +34,8 @@ func init() {
type persistentStore interface {
IndexGet(ctx context.Context, key string) (value []byte, err error)
IndexPut(ctx context.Context, key string, value []byte) (err error)

Get(ctx context.Context, k cid.Cid) (blocks.Block, error)
}

func bloomFilterKey(key string) string {
Expand Down Expand Up @@ -188,7 +192,7 @@ func (ri *redisIndex) persistKey(ctx context.Context, storeKey, key string, dead
stat.errors.Add(1)
return
}
if int64(res[0]) > deadline {
if int64(res[0]) > deadline || res[0] == 0 {
stat.missed.Add(1)
return
}
Expand Down
6 changes: 6 additions & 0 deletions index/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package index

import (
"context"
"errors"
"strings"
"time"

Expand Down Expand Up @@ -63,6 +64,11 @@ func (ri *redisIndex) Migrate(ctx context.Context, key Key) (err error) {
for _, k := range keys {
if strings.HasPrefix(k, "f:") {
if err = ri.migrateFile(ctx, key, migrateKey, k); err != nil {
if errors.Is(err, ErrCidsNotExist) {
err = nil
log.WarnCtx(ctx, "migrate cid no exists", zap.String("spaceId", key.SpaceId), zap.String("fileId", k))
continue
}
return
}
}
Expand Down

0 comments on commit 0fbfded

Please sign in to comment.