Skip to content

Commit

Permalink
fix(backend): memory leak in memory caches (misskey-dev#14363)
Browse files Browse the repository at this point in the history
cherry picked from commit bf8c42e

Co-authored-by: かっこかり <[email protected]>
Co-authored-by: Hazel K <[email protected]>
  • Loading branch information
2 people authored and u1-liquid committed Dec 24, 2024
1 parent 6d4dc5e commit 9e998cc
Show file tree
Hide file tree
Showing 11 changed files with 101 additions and 88 deletions.
2 changes: 1 addition & 1 deletion packages/backend/src/core/AvatarDecorationService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ export class AvatarDecorationService implements OnApplicationShutdown {
private moderationLogService: ModerationLogService,
private globalEventService: GlobalEventService,
) {
this.cache = new MemorySingleCache<MiAvatarDecoration[]>(1000 * 60 * 30);
this.cache = new MemorySingleCache<MiAvatarDecoration[]>(1000 * 60 * 30); // 30s

this.redisForSub.on('message', this.onMessage);
}
Expand Down
12 changes: 6 additions & 6 deletions packages/backend/src/core/CacheService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ export class CacheService implements OnApplicationShutdown {
) {
//this.onMessage = this.onMessage.bind(this);

this.userByIdCache = new MemoryKVCache<MiUser>(Infinity);
this.localUserByNativeTokenCache = new MemoryKVCache<MiLocalUser | null>(Infinity);
this.localUserByIdCache = new MemoryKVCache<MiLocalUser>(Infinity);
this.uriPersonCache = new MemoryKVCache<MiUser | null>(Infinity);
this.userByIdCache = new MemoryKVCache<MiUser>(1000 * 60 * 5); // 5m
this.localUserByNativeTokenCache = new MemoryKVCache<MiLocalUser | null>(1000 * 60 * 5); // 5m
this.localUserByIdCache = new MemoryKVCache<MiLocalUser>(1000 * 60 * 5); // 5m
this.uriPersonCache = new MemoryKVCache<MiUser | null>(1000 * 60 * 5); // 5m

this.userProfileCache = new RedisKVCache<MiUserProfile>(this.redisClient, 'userProfile', {
lifetime: 1000 * 60 * 30, // 30m
Expand Down Expand Up @@ -135,14 +135,14 @@ export class CacheService implements OnApplicationShutdown {
if (user == null) {
this.userByIdCache.delete(body.id);
this.localUserByIdCache.delete(body.id);
for (const [k, v] of this.uriPersonCache.cache.entries()) {
for (const [k, v] of this.uriPersonCache.entries) {
if (v.value?.id === body.id) {
this.uriPersonCache.delete(k);
}
}
} else {
this.userByIdCache.set(user.id, user);
for (const [k, v] of this.uriPersonCache.cache.entries()) {
for (const [k, v] of this.uriPersonCache.entries) {
if (v.value?.id === user.id) {
this.uriPersonCache.set(k, user);
}
Expand Down
12 changes: 6 additions & 6 deletions packages/backend/src/core/CustomEmojiService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const parseEmojiStrRegexp = /^([-\w]+)(?:@([\w.-]+))?$/;

@Injectable()
export class CustomEmojiService implements OnApplicationShutdown {
private cache: MemoryKVCache<MiEmoji | null>;
private emojisCache: MemoryKVCache<MiEmoji | null>;
public localEmojisCache: RedisSingleCache<Map<string, MiEmoji>>;

constructor(
Expand All @@ -40,7 +40,7 @@ export class CustomEmojiService implements OnApplicationShutdown {
private moderationLogService: ModerationLogService,
private globalEventService: GlobalEventService,
) {
this.cache = new MemoryKVCache<MiEmoji | null>(1000 * 60 * 60 * 12);
this.emojisCache = new MemoryKVCache<MiEmoji | null>(1000 * 60 * 60 * 12); // 12h

this.localEmojisCache = new RedisSingleCache<Map<string, MiEmoji>>(this.redisClient, 'localEmojis', {
lifetime: 1000 * 60 * 30, // 30m
Expand Down Expand Up @@ -346,7 +346,7 @@ export class CustomEmojiService implements OnApplicationShutdown {
host,
})) ?? null;

const emoji = await this.cache.fetch(`${name} ${host}`, queryOrNull);
const emoji = await this.emojisCache.fetch(`${name} ${host}`, queryOrNull);

if (emoji == null) return null;
return emoji.publicUrl || emoji.originalUrl; // || emoji.originalUrl してるのは後方互換性のため(publicUrlはstringなので??はだめ)
Expand All @@ -372,7 +372,7 @@ export class CustomEmojiService implements OnApplicationShutdown {
*/
@bindThis
public async prefetchEmojis(emojis: { name: string; host: string | null; }[]): Promise<void> {
const notCachedEmojis = emojis.filter(emoji => this.cache.get(`${emoji.name} ${emoji.host}`) == null);
const notCachedEmojis = emojis.filter(emoji => this.emojisCache.get(`${emoji.name} ${emoji.host}`) == null);
const emojisQuery: any[] = [];
const hosts = new Set(notCachedEmojis.map(e => e.host));
for (const host of hosts) {
Expand All @@ -387,7 +387,7 @@ export class CustomEmojiService implements OnApplicationShutdown {
select: ['name', 'host', 'originalUrl', 'publicUrl'],
}) : [];
for (const emoji of _emojis) {
this.cache.set(`${emoji.name} ${emoji.host}`, emoji);
this.emojisCache.set(`${emoji.name} ${emoji.host}`, emoji);
}
}

Expand All @@ -412,7 +412,7 @@ export class CustomEmojiService implements OnApplicationShutdown {

@bindThis
public dispose(): void {
this.cache.dispose();
this.emojisCache.dispose();
}

@bindThis
Expand Down
2 changes: 1 addition & 1 deletion packages/backend/src/core/RelayService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ export class RelayService {
private createSystemUserService: CreateSystemUserService,
private apRendererService: ApRendererService,
) {
this.relaysCache = new MemorySingleCache<MiRelay[]>(1000 * 60 * 10);
this.relaysCache = new MemorySingleCache<MiRelay[]>(1000 * 60 * 10); // 10m
}

@bindThis
Expand Down
6 changes: 2 additions & 4 deletions packages/backend/src/core/RoleService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,8 @@ export class RoleService implements OnApplicationShutdown, OnModuleInit {
private moderationLogService: ModerationLogService,
private fanoutTimelineService: FanoutTimelineService,
) {
//this.onMessage = this.onMessage.bind(this);

this.rolesCache = new MemorySingleCache<MiRole[]>(1000 * 60 * 60 * 1);
this.roleAssignmentByUserIdCache = new MemoryKVCache<MiRoleAssignment[]>(1000 * 60 * 60 * 1);
this.rolesCache = new MemorySingleCache<MiRole[]>(1000 * 60 * 60); // 1h
this.roleAssignmentByUserIdCache = new MemoryKVCache<MiRoleAssignment[]>(1000 * 60 * 5); // 5m

this.redisForSub.on('message', this.onMessage);
}
Expand Down
2 changes: 1 addition & 1 deletion packages/backend/src/core/UserKeypairService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ export class UserKeypairService implements OnApplicationShutdown {
) {
this.cache = new RedisKVCache<MiUserKeypair>(this.redisClient, 'userKeypair', {
lifetime: 1000 * 60 * 60 * 24, // 24h
memoryCacheLifetime: Infinity,
memoryCacheLifetime: 1000 * 60 * 60, // 1h
fetcher: (key) => this.userKeypairsRepository.findOneByOrFail({ userId: key }),
toRedisConverter: (value) => JSON.stringify(value),
fromRedisConverter: (value) => JSON.parse(value),
Expand Down
4 changes: 2 additions & 2 deletions packages/backend/src/core/activitypub/ApDbResolverService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ export class ApDbResolverService implements OnApplicationShutdown {
private cacheService: CacheService,
private apPersonService: ApPersonService,
) {
this.publicKeyCache = new MemoryKVCache<MiUserPublickey | null>(Infinity);
this.publicKeyByUserIdCache = new MemoryKVCache<MiUserPublickey | null>(Infinity);
this.publicKeyCache = new MemoryKVCache<MiUserPublickey | null>(1000 * 60 * 60 * 12); // 12h
this.publicKeyByUserIdCache = new MemoryKVCache<MiUserPublickey | null>(1000 * 60 * 60 * 12); // 12h
}

@bindThis
Expand Down
143 changes: 79 additions & 64 deletions packages/backend/src/misc/cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,23 @@ import * as Redis from 'ioredis';
import { bindThis } from '@/decorators.js';

export class RedisKVCache<T> {
private redisClient: Redis.Redis;
private name: string;
private lifetime: number;
private memoryCache: MemoryKVCache<T>;
private fetcher: (key: string) => Promise<T>;
private toRedisConverter: (value: T) => string;
private fromRedisConverter: (value: string) => T | undefined;

constructor(redisClient: RedisKVCache<T>['redisClient'], name: RedisKVCache<T>['name'], opts: {
lifetime: RedisKVCache<T>['lifetime'];
memoryCacheLifetime: number;
fetcher: RedisKVCache<T>['fetcher'];
toRedisConverter: RedisKVCache<T>['toRedisConverter'];
fromRedisConverter: RedisKVCache<T>['fromRedisConverter'];
}) {
this.redisClient = redisClient;
this.name = name;
private readonly lifetime: number;
private readonly memoryCache: MemoryKVCache<T>;
private readonly fetcher: (key: string) => Promise<T>;
private readonly toRedisConverter: (value: T) => string;
private readonly fromRedisConverter: (value: string) => T | undefined;

constructor(
private redisClient: Redis.Redis,
private name: string,
opts: {
lifetime: RedisKVCache<T>['lifetime'];
memoryCacheLifetime: number;
fetcher: RedisKVCache<T>['fetcher'];
toRedisConverter: RedisKVCache<T>['toRedisConverter'];
fromRedisConverter: RedisKVCache<T>['fromRedisConverter'];
},
) {
this.lifetime = opts.lifetime;
this.memoryCache = new MemoryKVCache(opts.memoryCacheLifetime);
this.fetcher = opts.fetcher;
Expand Down Expand Up @@ -55,10 +55,13 @@ export class RedisKVCache<T> {

const cached = await this.redisClient.get(`kvcache:${this.name}:${key}`);
if (cached == null) return undefined;
const parsed = this.fromRedisConverter(cached);
if (parsed == null) return undefined;
this.memoryCache.set(key, parsed);
return parsed;

const value = this.fromRedisConverter(cached);
if (value !== undefined) {
this.memoryCache.set(key, value);
}

return value;
}

@bindThis
Expand All @@ -69,6 +72,10 @@ export class RedisKVCache<T> {

/**
* キャッシュがあればそれを返し、無ければfetcherを呼び出して結果をキャッシュ&返します
* This awaits the call to Redis to ensure that the write succeeded, which is important for a few reasons:
* * Other code uses this to synchronize changes between worker processes. A failed write can internally de-sync the cluster.
* * Without an `await`, consecutive calls could race. An unlucky race could result in the older write overwriting the newer value.
* * Not awaiting here makes the entire cache non-consistent. The prevents many possible uses.
*/
@bindThis
public async fetch(key: string): Promise<T> {
Expand All @@ -80,14 +87,14 @@ export class RedisKVCache<T> {

// Cache MISS
const value = await this.fetcher(key);
this.set(key, value);
await this.set(key, value);
return value;
}

@bindThis
public async refresh(key: string) {
const value = await this.fetcher(key);
this.set(key, value);
await this.set(key, value);

// TODO: イベント発行して他プロセスのメモリキャッシュも更新できるようにする
}
Expand All @@ -104,23 +111,23 @@ export class RedisKVCache<T> {
}

export class RedisSingleCache<T> {
private redisClient: Redis.Redis;
private name: string;
private lifetime: number;
private memoryCache: MemorySingleCache<T>;
private fetcher: () => Promise<T>;
private toRedisConverter: (value: T) => string;
private fromRedisConverter: (value: string) => T | undefined;

constructor(redisClient: RedisSingleCache<T>['redisClient'], name: RedisSingleCache<T>['name'], opts: {
lifetime: RedisSingleCache<T>['lifetime'];
memoryCacheLifetime: number;
fetcher: RedisSingleCache<T>['fetcher'];
toRedisConverter: RedisSingleCache<T>['toRedisConverter'];
fromRedisConverter: RedisSingleCache<T>['fromRedisConverter'];
}) {
this.redisClient = redisClient;
this.name = name;
private readonly lifetime: number;
private readonly memoryCache: MemorySingleCache<T>;
private readonly fetcher: () => Promise<T>;
private readonly toRedisConverter: (value: T) => string;
private readonly fromRedisConverter: (value: string) => T | undefined;

constructor(
private redisClient: Redis.Redis,
private name: string,
opts: {
lifetime: number;
memoryCacheLifetime: number;
fetcher: RedisSingleCache<T>['fetcher'];
toRedisConverter: RedisSingleCache<T>['toRedisConverter'];
fromRedisConverter: RedisSingleCache<T>['fromRedisConverter'];
},
) {
this.lifetime = opts.lifetime;
this.memoryCache = new MemorySingleCache(opts.memoryCacheLifetime);
this.fetcher = opts.fetcher;
Expand Down Expand Up @@ -152,10 +159,13 @@ export class RedisSingleCache<T> {

const cached = await this.redisClient.get(`singlecache:${this.name}`);
if (cached == null) return undefined;
const parsed = this.fromRedisConverter(cached);
if (parsed == null) return undefined;
this.memoryCache.set(parsed);
return parsed;

const value = this.fromRedisConverter(cached);
if (value !== undefined) {
this.memoryCache.set(value);
}

return value;
}

@bindThis
Expand All @@ -166,6 +176,10 @@ export class RedisSingleCache<T> {

/**
* キャッシュがあればそれを返し、無ければfetcherを呼び出して結果をキャッシュ&返します
* This awaits the call to Redis to ensure that the write succeeded, which is important for a few reasons:
* * Other code uses this to synchronize changes between worker processes. A failed write can internally de-sync the cluster.
* * Without an `await`, consecutive calls could race. An unlucky race could result in the older write overwriting the newer value.
* * Not awaiting here makes the entire cache non-consistent. The prevents many possible uses.
*/
@bindThis
public async fetch(): Promise<T> {
Expand All @@ -177,14 +191,14 @@ export class RedisSingleCache<T> {

// Cache MISS
const value = await this.fetcher();
this.set(value);
await this.set(value);
return value;
}

@bindThis
public async refresh() {
const value = await this.fetcher();
this.set(value);
await this.set(value);

// TODO: イベント発行して他プロセスのメモリキャッシュも更新できるようにする
}
Expand All @@ -197,18 +211,12 @@ export class MemoryKVCache<T> {
* データを持つマップ
* これを直接操作するべきではない
*/
public cache: Map<string, { date: number; value: T; }>;
private lifetime: number;
private gcIntervalHandle: NodeJS.Timeout;

constructor(lifetime: MemoryKVCache<never>['lifetime']) {
this.cache = new Map();
this.lifetime = lifetime;
private readonly cache = new Map<string, { date: number; value: T; }>();
private readonly gcIntervalHandle = setInterval(() => this.gc(), 1000 * 60 * 3); // 3m

this.gcIntervalHandle = setInterval(() => {
this.gc();
}, 1000 * 60 * 3);
}
constructor(
private readonly lifetime: number,
) {}

@bindThis
/**
Expand Down Expand Up @@ -293,27 +301,34 @@ export class MemoryKVCache<T> {
@bindThis
public gc(): void {
const now = Date.now();

for (const [key, { date }] of this.cache.entries()) {
if ((now - date) > this.lifetime) {
this.cache.delete(key);
}
// The map is ordered from oldest to youngest.
// We can stop once we find an entry that's still active, because all following entries must *also* be active.
const age = now - date;
if (age < this.lifetime) break;

this.cache.delete(key);
}
}

@bindThis
public dispose(): void {
clearInterval(this.gcIntervalHandle);
}

public get entries() {
return this.cache.entries();
}
}

export class MemorySingleCache<T> {
private cachedAt: number | null = null;
private value: T | undefined;
private lifetime: number;

constructor(lifetime: MemorySingleCache<never>['lifetime']) {
this.lifetime = lifetime;
}
constructor(
private lifetime: number,
) {}

@bindThis
public set(value: T): void {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ export class DeliverProcessorService {
private queueLoggerService: QueueLoggerService,
) {
this.logger = this.queueLoggerService.logger.createSubLogger('deliver');
this.suspendedHostsCache = new MemorySingleCache<MiInstance[]>(1000 * 60 * 60);
this.suspendedHostsCache = new MemorySingleCache<MiInstance[]>(1000 * 60 * 60); // 1h
}

@bindThis
Expand Down
2 changes: 1 addition & 1 deletion packages/backend/src/server/NodeinfoServerService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ export class NodeinfoServerService {
return document;
};

const cache = new MemorySingleCache<Awaited<ReturnType<typeof nodeinfo2>>>(1000 * 60 * 10);
const cache = new MemorySingleCache<Awaited<ReturnType<typeof nodeinfo2>>>(1000 * 60 * 10); // 10m

fastify.get(nodeinfo2_1path, async (request, reply) => {
const base = await cache.fetch(() => nodeinfo2(21));
Expand Down
Loading

0 comments on commit 9e998cc

Please sign in to comment.