diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index f856dc67..74c140ed 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -829,6 +829,7 @@ const ( // KEYS[6] -> asynq:{}:failed: // KEYS[7] -> asynq:{}:processed // KEYS[8] -> asynq:{}:failed +// KEYS[9] -> asynq:{}:t: // ------- // ARGV[1] -> task ID // ARGV[2] -> updated base.TaskMessage value @@ -845,8 +846,22 @@ if redis.call("ZREM", KEYS[3], ARGV[1]) == 0 then return redis.error_reply("NOT FOUND") end redis.call("ZADD", KEYS[4], ARGV[3], ARGV[1]) -redis.call("ZREMRANGEBYSCORE", KEYS[4], "-inf", ARGV[4]) -redis.call("ZREMRANGEBYRANK", KEYS[4], 0, -ARGV[5]) +local old = redis.call("ZRANGE", KEYS[4], "-inf", ARGV[4], "BYSCORE") +if #old > 0 then + for _, id in ipairs(old) do + redis.call("DEL", KEYS[9] .. id) + end + redis.call("ZREM", KEYS[4], unpack(old)) +end + +local extra = redis.call("ZRANGE", KEYS[4], 0, -ARGV[5]) +if #extra > 0 then + for _, id in ipairs(extra) do + redis.call("DEL", KEYS[9] .. id) + end + redis.call("ZREM", KEYS[4], unpack(extra)) +end + redis.call("HSET", KEYS[1], "msg", ARGV[2], "state", "archived") local n = redis.call("INCR", KEYS[5]) if tonumber(n) == 1 then @@ -889,6 +904,7 @@ func (r *RDB) Archive(ctx context.Context, msg *base.TaskMessage, errMsg string) base.FailedKey(msg.Queue, now), base.ProcessedTotalKey(msg.Queue), base.FailedTotalKey(msg.Queue), + base.TaskKeyPrefix(msg.Queue), } argv := []interface{}{ msg.ID, diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 9df831f7..1b7d1381 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -2002,7 +2002,6 @@ func TestArchive(t *testing.T) { } errMsg := "SMTP server not responding" - // TODO(hibiken): add test cases for trimming tests := []struct { active map[string][]*base.TaskMessage lease map[string][]base.Z @@ -2171,6 +2170,163 @@ func TestArchive(t *testing.T) { } } +func TestArchiveTrim(t *testing.T) { + r := setup(t) + defer r.Close() + now := time.Now() + r.SetClock(timeutil.NewSimulatedClock(now)) + + t1 := &base.TaskMessage{ + ID: uuid.NewString(), + Type: "send_email", + Payload: nil, + Queue: "default", + Retry: 25, + Retried: 25, + Timeout: 1800, + } + t2 := &base.TaskMessage{ + ID: uuid.NewString(), + Type: "reindex", + Payload: nil, + Queue: "default", + Retry: 25, + Retried: 0, + Timeout: 3000, + } + errMsg := "SMTP server not responding" + + maxArchiveSet := make([]base.Z, 0) + for i := 0; i < maxArchiveSize-1; i++ { + maxArchiveSet = append(maxArchiveSet, base.Z{Message: &base.TaskMessage{ + ID: uuid.NewString(), + Type: "generate_csv", + Payload: nil, + Queue: "default", + Retry: 25, + Retried: 0, + Timeout: 60, + }, Score: now.Add(-time.Hour + -time.Second*time.Duration(i)).Unix()}) + } + + wantMaxArchiveSet := make([]base.Z, 0) + // newly archived task should be at the front + wantMaxArchiveSet = append(wantMaxArchiveSet, base.Z{Message: h.TaskMessageWithError(*t1, errMsg, now), Score: now.Unix()}) + // oldest task should be dropped from the set + wantMaxArchiveSet = append(wantMaxArchiveSet, maxArchiveSet[:len(maxArchiveSet)-1]...) + + tests := []struct { + toArchive map[string][]*base.TaskMessage + lease map[string][]base.Z + archived map[string][]base.Z + wantArchived map[string][]base.Z + }{ + { // simple, 1 to be archived, 1 already archived, both are in the archive set + toArchive: map[string][]*base.TaskMessage{ + "default": {t1}, + }, + lease: map[string][]base.Z{ + "default": { + {Message: t1, Score: now.Add(10 * time.Second).Unix()}, + }, + }, + archived: map[string][]base.Z{ + "default": { + {Message: t2, Score: now.Add(-time.Hour).Unix()}, + }, + }, + wantArchived: map[string][]base.Z{ + "default": { + {Message: h.TaskMessageWithError(*t1, errMsg, now), Score: now.Unix()}, + {Message: t2, Score: now.Add(-time.Hour).Unix()}, + }, + }, + }, + { // 1 to be archived, 1 already archived but past expiry, only the newly archived task should be left + toArchive: map[string][]*base.TaskMessage{ + "default": {t1}, + }, + lease: map[string][]base.Z{ + "default": { + {Message: t1, Score: now.Add(10 * time.Second).Unix()}, + }, + }, + archived: map[string][]base.Z{ + "default": { + {Message: t2, Score: now.Add(-time.Hour * 24 * (archivedExpirationInDays + 1)).Unix()}, + }, + }, + wantArchived: map[string][]base.Z{ + "default": { + {Message: h.TaskMessageWithError(*t1, errMsg, now), Score: now.Unix()}, + }, + }, + }, + { // 1 to be archived, maxArchiveSize in archive set, archive set should be trimmed back to maxArchiveSize and newly archived task should be in the set + toArchive: map[string][]*base.TaskMessage{ + "default": {t1}, + }, + lease: map[string][]base.Z{ + "default": { + {Message: t1, Score: now.Add(10 * time.Second).Unix()}, + }, + }, + archived: map[string][]base.Z{ + "default": maxArchiveSet, + }, + wantArchived: map[string][]base.Z{ + "default": wantMaxArchiveSet, + }, + }, + } + + for _, tc := range tests { + h.FlushDB(t, r.client) // clean up db before each test case + h.SeedAllActiveQueues(t, r.client, tc.toArchive) + h.SeedAllLease(t, r.client, tc.lease) + h.SeedAllArchivedQueues(t, r.client, tc.archived) + + for _, tasks := range tc.toArchive { + for _, target := range tasks { + err := r.Archive(context.Background(), target, errMsg) + if err != nil { + t.Errorf("(*RDB).Archive(%v, %v) = %v, want nil", target, errMsg, err) + continue + } + } + } + + for queue, want := range tc.wantArchived { + gotArchived := h.GetArchivedEntries(t, r.client, queue) + + if diff := cmp.Diff(want, gotArchived, h.SortZSetEntryOpt, zScoreCmpOpt, timeCmpOpt); diff != "" { + t.Errorf("mismatch found in %q after calling (*RDB).Archive: (-want, +got):\n%s", base.ArchivedKey(queue), diff) + } + + // check that only keys present in the archived set are in rdb + vals := r.client.Keys(context.Background(), base.TaskKeyPrefix(queue)+"*").Val() + if len(vals) != len(gotArchived) { + t.Errorf("len of keys = %v, want %v", len(vals), len(gotArchived)) + return + } + + for _, val := range vals { + found := false + for _, entry := range gotArchived { + if strings.Contains(val, entry.Message.ID) { + found = true + break + } + } + + if !found { + t.Errorf("key %v not found in archived set (it was orphaned by the archive trim)", val) + } + } + } + } +} + func TestForwardIfReadyWithGroup(t *testing.T) { r := setup(t) defer r.Close()