Skip to content

Commit

Permalink
Merge pull request #1432 from anyproto/GO-3844-fix-and-simplify-subsc…
Browse files Browse the repository at this point in the history
…riptions-in-sync-status

GO-3844: Fix and simplify subscriptions for sync status
  • Loading branch information
mcrakhman authored Jul 31, 2024
2 parents d8599a6 + ef08953 commit 4a8c39e
Show file tree
Hide file tree
Showing 5 changed files with 5 additions and 137 deletions.
14 changes: 1 addition & 13 deletions core/syncstatus/spacesyncstatus/spacestatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ func (s *spaceSyncStatus) sendEventToSession(spaceId, token string) {
bytesLeftPercentage: s.getBytesLeftPercentage(spaceId),
connectionStatus: s.nodeStatus.GetNodeStatus(spaceId),
compatibility: s.nodeConf.NetworkCompatibilityStatus(),
filesSyncingCount: s.getFileSyncingObjectsCount(spaceId),
objectsSyncingCount: s.getObjectSyncingObjectsCount(spaceId, s.getMissingIds(spaceId)),
}
s.eventSender.SendToSession(token, &pb.Event{
Expand Down Expand Up @@ -227,15 +226,6 @@ func (s *spaceSyncStatus) getObjectSyncingObjectsCount(spaceId string, missingOb
return curSub.SyncingObjectsCount(missingObjects)
}

func (s *spaceSyncStatus) getFileSyncingObjectsCount(spaceId string) int {
curSub, err := s.subs.GetSubscription(spaceId)
if err != nil {
log.Errorf("failed to get subscription: %s", err)
return 0
}
return curSub.FileSyncingObjectsCount()
}

func (s *spaceSyncStatus) getBytesLeftPercentage(spaceId string) float64 {
nodeUsage, err := s.nodeUsage.GetNodeUsage(context.Background())
if err != nil {
Expand All @@ -255,7 +245,6 @@ func (s *spaceSyncStatus) updateSpaceSyncStatus(spaceId string) {
bytesLeftPercentage: s.getBytesLeftPercentage(spaceId),
connectionStatus: s.nodeStatus.GetNodeStatus(spaceId),
compatibility: s.nodeConf.NetworkCompatibilityStatus(),
filesSyncingCount: s.getFileSyncingObjectsCount(spaceId),
objectsSyncingCount: s.getObjectSyncingObjectsCount(spaceId, missingObjects),
}
s.broadcast(&pb.Event{
Expand All @@ -276,14 +265,13 @@ type syncParams struct {
bytesLeftPercentage float64
connectionStatus nodestatus.ConnectionStatus
compatibility nodeconf.NetworkCompatibilityStatus
filesSyncingCount int
objectsSyncingCount int
}

func (s *spaceSyncStatus) makeSyncEvent(spaceId string, params syncParams) *pb.EventSpaceSyncStatusUpdate {
status := pb.EventSpace_Synced
err := pb.EventSpace_Null
syncingObjectsCount := int64(params.objectsSyncingCount + params.filesSyncingCount)
syncingObjectsCount := int64(params.objectsSyncingCount)
if syncingObjectsCount > 0 {
status = pb.EventSpace_Syncing
}
Expand Down
31 changes: 1 addition & 30 deletions core/syncstatus/spacesyncstatus/spacestatus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/anyproto/anytype-heart/core/filestorage/filesync"
"github.com/anyproto/anytype-heart/core/session"
"github.com/anyproto/anytype-heart/core/subscription"
"github.com/anyproto/anytype-heart/core/syncstatus/filesyncstatus"
"github.com/anyproto/anytype-heart/core/syncstatus/nodestatus"
"github.com/anyproto/anytype-heart/core/syncstatus/nodestatus/mock_nodestatus"
"github.com/anyproto/anytype-heart/core/syncstatus/spacesyncstatus/mock_spacesyncstatus"
Expand Down Expand Up @@ -80,31 +79,6 @@ type fixture struct {
ctrl *gomock.Controller
}

func mapFileStatus(status filesyncstatus.Status) domain.ObjectSyncStatus {
switch status {
case filesyncstatus.Syncing:
return domain.ObjectSyncStatusSyncing
case filesyncstatus.Queued:
return domain.ObjectSyncStatusSyncing
case filesyncstatus.Limited:
return domain.ObjectSyncStatusError
default:
return domain.ObjectSyncStatusSynced
}
}

func genFileObject(fileStatus filesyncstatus.Status, spaceId string) objectstore.TestObject {
id := fmt.Sprintf("%d", rand.Int())
return objectstore.TestObject{
bundle.RelationKeyId: pbtypes.String(id),
bundle.RelationKeySyncStatus: pbtypes.Int64(int64(mapFileStatus(fileStatus))),
bundle.RelationKeyFileBackupStatus: pbtypes.Int64(int64(fileStatus)),
bundle.RelationKeyLayout: pbtypes.Int64(int64(model.ObjectType_file)),
bundle.RelationKeyName: pbtypes.String("name" + id),
bundle.RelationKeySpaceId: pbtypes.String(spaceId),
}
}

func genObject(syncStatus domain.ObjectSyncStatus, spaceId string) objectstore.TestObject {
id := fmt.Sprintf("%d", rand.Int())
return objectstore.TestObject{
Expand All @@ -118,10 +92,7 @@ func genObject(syncStatus domain.ObjectSyncStatus, spaceId string) objectstore.T

func genSyncingObjects(fileObjects, objects int, spaceId string) []objectstore.TestObject {
var res []objectstore.TestObject
for i := 0; i < fileObjects; i++ {
res = append(res, genFileObject(filesyncstatus.Syncing, spaceId))
}
for i := 0; i < objects; i++ {
for i := 0; i < objects+fileObjects; i++ {
res = append(res, genObject(domain.ObjectSyncStatusSyncing, spaceId))
}
return res
Expand Down
47 changes: 3 additions & 44 deletions core/syncstatus/syncsubscriptions/syncingobjects.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,13 @@ import (

"github.com/anyproto/anytype-heart/core/domain"
"github.com/anyproto/anytype-heart/core/subscription"
"github.com/anyproto/anytype-heart/core/syncstatus/filesyncstatus"
"github.com/anyproto/anytype-heart/pkg/lib/bundle"
"github.com/anyproto/anytype-heart/pkg/lib/pb/model"
"github.com/anyproto/anytype-heart/util/pbtypes"
"github.com/anyproto/anytype-heart/util/slice"
)

type syncingObjects struct {
fileSubscription *ObjectSubscription[struct{}]
objectSubscription *ObjectSubscription[struct{}]
service subscription.Service
spaceId string
Expand All @@ -35,36 +33,8 @@ func (s *syncingObjects) Run() error {
Filters: []*model.BlockContentDataviewFilter{
{
RelationKey: bundle.RelationKeySyncStatus.String(),
Condition: model.BlockContentDataviewFilter_Equal,
Value: pbtypes.Int64(int64(domain.SpaceSyncStatusSyncing)),
},
{
RelationKey: bundle.RelationKeyLayout.String(),
Condition: model.BlockContentDataviewFilter_NotIn,
Value: pbtypes.IntList(
int(model.ObjectType_file),
int(model.ObjectType_image),
int(model.ObjectType_video),
int(model.ObjectType_audio),
),
},
{
RelationKey: bundle.RelationKeySpaceId.String(),
Condition: model.BlockContentDataviewFilter_Equal,
Value: pbtypes.String(s.spaceId),
},
},
}
fileReq := subscription.SubscribeRequest{
SubId: fmt.Sprintf("spacestatus.files.%s", s.spaceId),
Internal: true,
NoDepSubscription: true,
Keys: []string{bundle.RelationKeyId.String()},
Filters: []*model.BlockContentDataviewFilter{
{
RelationKey: bundle.RelationKeyFileBackupStatus.String(),
Condition: model.BlockContentDataviewFilter_In,
Value: pbtypes.IntList(int(filesyncstatus.Syncing), int(filesyncstatus.Queued)),
Value: pbtypes.IntList(int(domain.SpaceSyncStatusSyncing), int(domain.ObjectSyncStatusQueued), int(domain.ObjectSyncStatusError)),
},
{
RelationKey: bundle.RelationKeySpaceId.String(),
Expand All @@ -73,25 +43,18 @@ func (s *syncingObjects) Run() error {
},
},
}
s.fileSubscription = NewIdSubscription(s.service, fileReq)
s.objectSubscription = NewIdSubscription(s.service, objectReq)
errFiles := s.fileSubscription.Run()
errObjects := s.objectSubscription.Run()
if errFiles != nil || errObjects != nil {
return fmt.Errorf("error running syncing objects: %w %w", errFiles, errObjects)
if errObjects != nil {
return fmt.Errorf("error running syncing objects: %w", errObjects)
}
return nil
}

func (s *syncingObjects) Close() {
s.fileSubscription.Close()
s.objectSubscription.Close()
}

func (s *syncingObjects) GetFileSubscription() *ObjectSubscription[struct{}] {
return s.fileSubscription
}

func (s *syncingObjects) GetObjectSubscription() *ObjectSubscription[struct{}] {
return s.objectSubscription
}
Expand All @@ -105,7 +68,3 @@ func (s *syncingObjects) SyncingObjectsCount(missing []string) int {
_, added := slice.DifferenceRemovedAdded(ids, missing)
return len(ids) + len(added)
}

func (s *syncingObjects) FileSyncingObjectsCount() int {
return s.fileSubscription.Len()
}
2 changes: 0 additions & 2 deletions core/syncstatus/syncsubscriptions/syncsubscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@ const CName = "client.syncstatus.syncsubscriptions"
type SyncSubscription interface {
Run() error
Close()
GetFileSubscription() *ObjectSubscription[struct{}]
GetObjectSubscription() *ObjectSubscription[struct{}]
SyncingObjectsCount(missing []string) int
FileSyncingObjectsCount() int
}

type SyncSubscriptions interface {
Expand Down
48 changes: 0 additions & 48 deletions core/syncstatus/syncsubscriptions/syncsubscriptions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,38 +11,12 @@ import (

"github.com/anyproto/anytype-heart/core/domain"
"github.com/anyproto/anytype-heart/core/subscription"
"github.com/anyproto/anytype-heart/core/syncstatus/filesyncstatus"
"github.com/anyproto/anytype-heart/pkg/lib/bundle"
"github.com/anyproto/anytype-heart/pkg/lib/localstore/objectstore"
"github.com/anyproto/anytype-heart/pkg/lib/pb/model"
"github.com/anyproto/anytype-heart/util/pbtypes"
)

func mapFileStatus(status filesyncstatus.Status) domain.ObjectSyncStatus {
switch status {
case filesyncstatus.Syncing:
return domain.ObjectSyncStatusSyncing
case filesyncstatus.Queued:
return domain.ObjectSyncStatusSyncing
case filesyncstatus.Limited:
return domain.ObjectSyncStatusError
default:
return domain.ObjectSyncStatusSynced
}
}

func genFileObject(fileStatus filesyncstatus.Status, spaceId string) objectstore.TestObject {
id := fmt.Sprintf("%d", rand.Int())
return objectstore.TestObject{
bundle.RelationKeyId: pbtypes.String(id),
bundle.RelationKeySyncStatus: pbtypes.Int64(int64(mapFileStatus(fileStatus))),
bundle.RelationKeyFileBackupStatus: pbtypes.Int64(int64(fileStatus)),
bundle.RelationKeyLayout: pbtypes.Int64(int64(model.ObjectType_file)),
bundle.RelationKeyName: pbtypes.String("name" + id),
bundle.RelationKeySpaceId: pbtypes.String(spaceId),
}
}

func genObject(syncStatus domain.ObjectSyncStatus, spaceId string) objectstore.TestObject {
id := fmt.Sprintf("%d", rand.Int())
return objectstore.TestObject{
Expand All @@ -57,7 +31,6 @@ func genObject(syncStatus domain.ObjectSyncStatus, spaceId string) objectstore.T
func TestSyncSubscriptions(t *testing.T) {
testSubs := subscription.NewInternalTestService(t)
var objects []objectstore.TestObject
fileObjs := map[string]struct{}{}
objs := map[string]struct{}{}
for i := 0; i < 10; i++ {
obj := genObject(domain.ObjectSyncStatusSyncing, "spaceId")
Expand All @@ -67,19 +40,6 @@ func TestSyncSubscriptions(t *testing.T) {
for i := 0; i < 10; i++ {
objects = append(objects, genObject(domain.ObjectSyncStatusSynced, "spaceId"))
}
for i := 0; i < 10; i++ {
obj := genFileObject(filesyncstatus.Syncing, "spaceId")
objects = append(objects, obj)
fileObjs[obj[bundle.RelationKeyId].GetStringValue()] = struct{}{}
}
for i := 0; i < 10; i++ {
obj := genFileObject(filesyncstatus.Queued, "spaceId")
objects = append(objects, obj)
fileObjs[obj[bundle.RelationKeyId].GetStringValue()] = struct{}{}
}
for i := 0; i < 10; i++ {
objects = append(objects, genFileObject(filesyncstatus.Synced, "spaceId"))
}
testSubs.AddObjects(t, objects)
subs := New()
subs.(*syncSubscriptions).service = testSubs
Expand All @@ -89,20 +49,12 @@ func TestSyncSubscriptions(t *testing.T) {
spaceSub, err := subs.GetSubscription("spaceId")
require.NoError(t, err)
syncCnt := spaceSub.SyncingObjectsCount([]string{"1", "2"})
fileCnt := spaceSub.FileSyncingObjectsCount()
require.Equal(t, 12, syncCnt)
require.Equal(t, 20, fileCnt)
require.Len(t, fileObjs, 20)
require.Len(t, objs, 10)
spaceSub.GetFileSubscription().Iterate(func(id string, data struct{}) bool {
delete(fileObjs, id)
return true
})
spaceSub.GetObjectSubscription().Iterate(func(id string, data struct{}) bool {
delete(objs, id)
return true
})
require.Empty(t, fileObjs)
require.Empty(t, objs)
for i := 0; i < 10; i++ {
objects[i][bundle.RelationKeySyncStatus] = pbtypes.Int64(int64(domain.ObjectSyncStatusSynced))
Expand Down

0 comments on commit 4a8c39e

Please sign in to comment.