Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

expiration for storage events / files #59

Merged
merged 3 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/subzero-ion-connect/subzero_ion_connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ var (

func init() {
initFlags()
query.RegisterExpiredEventsProcessor(storage.DeleteExpiredFiles)
wsserver.RegisterReqMustAuthenticate(func(_ context.Context, sub *model.Subscription) (authRequired bool) {
if sub == nil {
return false
Expand Down
4 changes: 4 additions & 0 deletions database/query/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ func MustInit(ctx context.Context) {
})
}

func RegisterExpiredEventsProcessor(proc func(ctx context.Context, events ...*model.Event) error) {
notifyExpiredEvents = proc
}

func AcceptEvents(ctx context.Context, events ...*model.Event) error {
return globalDB.Client.AcceptEvents(ctx, events...)
}
Expand Down
46 changes: 40 additions & 6 deletions database/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ var (
ErrAttestationUpdateRejected = errors.New("attestation update rejected")

errEventIteratorInterrupted = errors.New("interrupted")

notifyExpiredEvents func(ctx context.Context, events ...*model.Event) error
)

type databaseEvent struct {
Expand Down Expand Up @@ -452,11 +454,43 @@ func generateEventsWhereClause(filters ...model.Filter) (clauseMain, clauseDeps

func (db *dbClient) deleteExpiredEvents(ctx context.Context) error {
params := map[string]any{}
_, err := db.exec(ctx, `delete from events
where id in (
select event_id from event_tags
where (((event_tag_key = 'expiration')
AND cast(event_tag_value1 as integer) <= unixepoch())))`, params)
it := &eventIterator{
OneShot: true,
Map: nil,
Fetch: func(pivot int64) (*sqlx.Rows, error) {
result, err := db.NamedQueryContext(ctx, `delete from events
where id in (
select event_id from event_tags
where (((event_tag_key = 'expiration')
AND cast(event_tag_value1 as integer) <= unixepoch())))
returning
kind,
created_at,
system_created_at,
id,
pubkey,
master_pubkey,
sig,
content,
d_tag,
tags as jtags;
`, params)
if err != nil {
err = errors.Wrap(db.handleError(err), "failed to exec delete expired events")
}
return result, err
}}
events := []*model.Event{}
err := it.Each(ctx, func(event *model.Event) error {
events = append(events, event)

return errors.Wrap(err, "failed to exec delete expired events")
return nil
})
if err != nil {
return errors.Wrap(err, "failed to exec delete expired events")
}
if notifyExpiredEvents != nil && len(events) > 0 {
err = errors.Wrapf(notifyExpiredEvents(ctx, events...), "failed to process notification of expired events")
}
return err
}
11 changes: 11 additions & 0 deletions database/query/query_fixture.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// SPDX-License-Identifier: ice License 1.0

//go:build test

package query

import "context"

func TriggerExpiredEventsCleanup(ctx context.Context) error {
return globalDB.Client.deleteExpiredEvents(ctx)
}
54 changes: 38 additions & 16 deletions server/http/storage_nip_96_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ var testdata embed.FS
func TestNIP96(t *testing.T) {
t.Parallel()
now := time.Now().Unix()
ctx, cancel := context.WithTimeout(context.Background(), 300*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second)
defer cancel()
defer func() {
require.NoError(t, storage.Client().Close())
Expand All @@ -70,30 +70,39 @@ func TestNIP96(t *testing.T) {
require.NoError(t, ev.SignWithAlg(master, model.SignAlgEDDSA, model.KeyAlgCurve25519))
require.NoError(t, query.AcceptEvents(ctx, &ev))
})

t.Run("files are uploaded, response is ok", func(t *testing.T) {
var responses chan *nip96.UploadResponse
responses = make(chan *nip96.UploadResponse, 100)
var responses []*nip96.UploadResponse
responses = make([]*nip96.UploadResponse, 0)
upload(t, ctx, user1, masterPubKey, ".testdata/image2.png", "profile.png", "ice profile pic", func(resp *nip96.UploadResponse) {
responses <- resp
responses = append(responses, resp)
})
upload(t, ctx, user1, masterPubKey, ".testdata/image.jpg", "ice.jpg", "ice logo", func(resp *nip96.UploadResponse) {
responses <- resp
responses = append(responses, resp)
})
upload(t, ctx, master, "", ".testdata/text-master.txt", "master.txt", "master's file", func(resp *nip96.UploadResponse) { responses <- resp })
upload(t, ctx, user1, masterPubKey, ".testdata/text.txt", "text.txt", "text file", func(resp *nip96.UploadResponse) { responses <- resp })
close(responses)
i := 0
for resp := range responses {
if i == 0 {
outdatedTags = resp.Nip94Event.Tags
outdatedContent = resp.Nip94Event.Content
}
upload(t, ctx, master, "", ".testdata/text-master.txt", "master.txt", "master's file", func(resp *nip96.UploadResponse) { responses = append(responses, resp) })
upload(t, ctx, user1, masterPubKey, ".testdata/text.txt", "text.txt", "text file", func(resp *nip96.UploadResponse) { responses = append(responses, resp) })
outdatedTags = responses[0].Nip94Event.Tags
outdatedContent = responses[0].Nip94Event.Content
tagsToBroadcast = responses[len(responses)-1].Nip94Event.Tags
contentToBroadcast = responses[len(responses)-1].Nip94Event.Content
for _, resp := range responses {
verifyFile(t, resp.Nip94Event.Content, resp.Nip94Event.Tags)
tagsToBroadcast = resp.Nip94Event.Tags
tagsToBroadcast = resp.Nip94Event.Tags.AppendUnique(model.Tag{"b", masterPubKey}).
AppendUnique(model.Tag{"expiration", strconv.FormatInt(time.Now().Unix()-10, 10)})
contentToBroadcast = resp.Nip94Event.Content
i += 1
nip94EventToSign := &model.Event{Event: nostr.Event{
CreatedAt: nostr.Timestamp(time.Now().Unix()),
Kind: nostr.KindFileMetadata,
Tags: tagsToBroadcast,
Content: contentToBroadcast,
}}
require.NoError(t, nip94EventToSign.SignWithAlg(user1, model.SignAlgEDDSA, model.KeyAlgCurve25519))
require.NoError(t, query.AcceptEvents(ctx, nip94EventToSign))
require.NoError(t, storage.AcceptEvents(ctx, nip94EventToSign))
}
})

var outdatedNip94EventToSign *model.Event
t.Run("nip-94 event is accepted on the same relay it was uploaded to = no-op", func(t *testing.T) {
outdatedTags = outdatedTags.AppendUnique(model.Tag{"b", masterPubKey})
Expand Down Expand Up @@ -219,6 +228,19 @@ func TestNIP96(t *testing.T) {
fileName := "master.txt"
require.FileExists(t, filepath.Join(storageRoot, masterPubKey, fileName))
})
ch := make(chan struct{}, 1)
query.RegisterExpiredEventsProcessor(func(ctx context.Context, events ...*model.Event) error {
err := storage.DeleteExpiredFiles(ctx, events...)
ch <- struct{}{}
return err
})
require.NoError(t, query.TriggerExpiredEventsCleanup(ctx))
select {
case <-ch:
default:
t.Fatal("Expired events processor was not triggered")
}
require.NoFileExists(t, filepath.Join(newStorageRoot, masterPubKey, "master.txt"), "expiration")

}

Expand Down
33 changes: 28 additions & 5 deletions storage/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,30 +109,37 @@ func acceptDeletion(ctx context.Context, event *model.Event) error {
return nil
}
log.Printf("[STORAGE] INFO: ACCEPT FILE DELETION OF NIP-94 for user %v: %v, original event %v", event.GetMasterPublicKey(), event.String(), originalEvent.String())

return processEventDeletion(ctx, originalEvent)
}

func processEventDeletion(ctx context.Context, originalEvent *model.Event) error {
fileHash := ""
if xTag := originalEvent.Tags.GetFirst([]string{"x"}); xTag != nil && len(*xTag) > 1 {
fileHash = xTag.Value()
} else {
return errors.Errorf("malformed x tag in event %v", originalEvent.ID)
}
bag, err := globalClient.bagByUser(event.GetMasterPublicKey())
bag, err := globalClient.bagByUser(originalEvent.GetMasterPublicKey())
if err != nil {
return errors.Wrapf(err, "failed to get bagID for the user %v", event.GetMasterPublicKey())
return errors.Wrapf(err, "failed to get bagID for the user %v", originalEvent.GetMasterPublicKey())
}
if bag == nil {
return errors.Errorf("bagID for user %v not found", event.GetMasterPublicKey())
return errors.Errorf("bagID for user %v not found", originalEvent.GetMasterPublicKey())
}
file, err := globalClient.detectFile(bag, fileHash)
if err != nil {
return errors.Wrapf(err, "failed to detect file %v in bag %v", fileHash, hex.EncodeToString(bag.BagID))
}
userRoot, _ := globalClient.BuildUserPath(event.GetMasterPublicKey(), "")
userRoot, _ := globalClient.BuildUserPath(originalEvent.GetMasterPublicKey(), "")
if err := os.Remove(filepath.Join(userRoot, file)); err != nil && !errors.Is(err, os.ErrNotExist) {
return errors.Wrapf(err, "failed to delete file %v", file)
}
if _, _, _, err := globalClient.StartUpload(ctx, event.PubKey, event.GetMasterPublicKey(), file, fileHash, nil); err != nil {
bagID, _, _, err := globalClient.StartUpload(ctx, originalEvent.PubKey, originalEvent.GetMasterPublicKey(), file, fileHash, nil)
ice-ares marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return errors.Wrapf(err, "failed to rebuild bag with deleted file")
}
log.Printf("[STORAGE] INFO: bag %x replaced by %v due to file deletion %+v", bag.BagID, bagID, originalEvent)
return nil
}

Expand Down Expand Up @@ -269,3 +276,19 @@ func mustInit(ctx context.Context) *client {
go cl.startDownloadsFromQueue()
return cl
}

func DeleteExpiredFiles(ctx context.Context, events ...*model.Event) error {
var err error
for _, ev := range events {
if ev.Kind != nostr.KindFileMetadata {
continue
}
log.Printf("[STORAGE] DEBUG: FILE expired for user %v: %v", ev.GetMasterPublicKey(), ev.String(), ev.String())
err = processEventDeletion(ctx, ev)
if errors.Is(err, os.ErrNotExist) || errors.Is(err, ErrNotFound) {
err = nil
}
err = errors.Join(err, errors.Wrapf(err, "failed to delete files for expired event %+v", ev))
}
return errors.Wrapf(err, "failed to delete files for expired events")
}
Loading