From d6e2071238d41eacf5a7c75739763483e555917a Mon Sep 17 00:00:00 2001 From: pen <121443048+penginn-net@users.noreply.github.com> Date: Mon, 4 Nov 2024 17:10:03 +0900 Subject: [PATCH] ApOutboxFetchService.ts iroiro (#526) --- .../models/ApOutboxFetchService.ts | 217 +++++++++--------- packages/backend/src/core/activitypub/type.ts | 3 +- .../server/api/endpoints/ap/fetch-outbox.ts | 19 +- 3 files changed, 123 insertions(+), 116 deletions(-) diff --git a/packages/backend/src/core/activitypub/models/ApOutboxFetchService.ts b/packages/backend/src/core/activitypub/models/ApOutboxFetchService.ts index c0cd21bfde..2d4c28227e 100644 --- a/packages/backend/src/core/activitypub/models/ApOutboxFetchService.ts +++ b/packages/backend/src/core/activitypub/models/ApOutboxFetchService.ts @@ -5,6 +5,8 @@ import { Inject, Injectable } from '@nestjs/common'; import { ModuleRef } from '@nestjs/core'; +import * as Redis from 'ioredis'; +import { AbortError } from 'got'; import { DI } from '@/di-symbols.js'; import type { UsersRepository } from '@/models/_.js'; import type { MiRemoteUser } from '@/models/User.js'; @@ -19,16 +21,16 @@ import { AppLockService } from '@/core/AppLockService.js'; import { NoteEntityService } from '@/core/entities/NoteEntityService.js'; import { NoteCreateService } from '@/core/NoteCreateService.js'; import { IdentifiableError } from '@/misc/identifiable-error.js'; -import { getApId, isIOrderedCollectionPage, isAnnounce, isNote, isPost } from '../type.js'; +import { ApDbResolverService } from '@/core/activitypub/ApDbResolverService.js'; +import { isIOrderedCollectionPage, isCreate, IOrderedCollectionPage, isNote } from '../type.js'; import { ApAudienceService } from '../ApAudienceService.js'; import type { OnModuleInit } from '@nestjs/common'; import type { ApNoteService } from './ApNoteService.js'; import type { ApResolverService, Resolver } from '../ApResolverService.js'; import type { ApLoggerService } from '../ApLoggerService.js'; -const pagelimit = 3; -const createLimit = 15; - +const pagelimit = 1; +const createLimit = 20; @Injectable() export class ApOutboxFetchService implements OnModuleInit { private utilityService: UtilityService; @@ -44,11 +46,14 @@ export class ApOutboxFetchService implements OnModuleInit { @Inject(DI.usersRepository) private usersRepository: UsersRepository, + @Inject(DI.redis) + private redisClient: Redis.Redis, - private noteEntityService: NoteEntityService, - private noteCreateService: NoteCreateService, - private appLockService: AppLockService, private apAudienceService: ApAudienceService, + private apDbResolverService: ApDbResolverService, + private appLockService: AppLockService, + private noteCreateService: NoteCreateService, + private noteEntityService: NoteEntityService, ) { } @@ -67,142 +72,136 @@ export class ApOutboxFetchService implements OnModuleInit { */ @bindThis public async fetchOutbox(userId: MiUser['id'], includeAnnounce = false, resolver?: Resolver): Promise { - const user = await this.usersRepository.findOneByOrFail({ id: userId }) as MiRemoteUser; + const user = (await this.usersRepository.findOneBy({ id: userId }) as MiRemoteUser | null) ?? null; if (!user) throw new IdentifiableError('3fc5a089-cab4-48db-b9f3-f220574b3c0a', 'No such user'); if (!user.host) throw new IdentifiableError('67070303-177c-4600-af93-b26a7ab889c6', 'Is local user'); if (!user.outbox) throw new IdentifiableError('e7a2e510-a8ce-40e9-b1e6-c007bacdc89f', 'outbox undefined.'); + const blockedHosts = (await this.metaService.fetch()).blockedHosts; + if (this.utilityService.isBlockedHost(blockedHosts, user.host)) throw new IdentifiableError('b27090c8-8a68-4189-a445-14591c32a89c', 'blocked instance.'); const outboxUrl = user.outbox; - const meta = await this.metaService.fetch(); - if (this.utilityService.isBlockedHost(meta.blockedHosts, this.utilityService.extractDbHost(outboxUrl))) return; - this.logger.info(`Fetcing the Outbox: ${outboxUrl}`); const Resolver = resolver ?? this.apResolverService.createResolver(); - + const cache = await this.redisClient.get(`${outboxUrl}--next`); // Resolve to (Ordered)Collection Object - const outbox = await Resolver.resolveCollection(outboxUrl); + const outbox = cache ? await Resolver.resolveOrderedCollectionPage(cache) : await Resolver.resolveCollection(outboxUrl); - if (outbox.type !== 'OrderedCollection') throw new IdentifiableError('0be2f5a1-2345-46d8-b8c3-430b111c68d3', 'outbox type is not OrderedCollection'); - if (!outbox.first) throw new IdentifiableError('a723c2df-0250-4091-b5fc-e3a7b36c7b61', 'outbox first page not exist'); + if (!cache && outbox.type !== 'OrderedCollection') throw new IdentifiableError('0be2f5a1-2345-46d8-b8c3-430b111c68d3', 'outbox type is not OrderedCollection'); + if (!cache && !outbox.first) throw new IdentifiableError('a723c2df-0250-4091-b5fc-e3a7b36c7b61', 'outbox first page not exist'); - let nextUrl = outbox.first; + let nextUrl = cache ? (outbox as IOrderedCollectionPage).next : outbox.first; + let page = 0; let created = 0; + if (typeof(nextUrl) !== 'string') { + const first = (nextUrl as any); + if (first.partOf !== user.outbox) throw new IdentifiableError('6603433f-99db-4134-980c-48705ae57ab8', 'outbox part is invalid'); - for (let i = 0; i < pagelimit; i++) { - const collectionPage = await Resolver.resolveOrderedCollectionPage(nextUrl); - if (!isIOrderedCollectionPage(collectionPage)) throw new IdentifiableError('2a05bb06-f38c-4854-af6f-7fd5e87c98ee', 'Object is not collectionPage'); + const activityes = first.orderedItems ?? first.items; + await this.fetchObjects(user, activityes, includeAnnounce, created); - if (collectionPage.orderedItems.length === 0) { - break; - } + page = 1; + if (!first.next) return; + } - //IObject,IActivity型にないプロパティがあるのでany型にする - const activityes = collectionPage.orderedItems as any[]; - created = await this.fetchObjects(activityes, created, includeAnnounce, outboxUrl, user, Resolver); - if (created > createLimit) break; + for (; page < pagelimit; page++) { + this.logger.info(nextUrl as string); + const collectionPage = (typeof(nextUrl) === 'string' ? await Resolver.resolveOrderedCollectionPage(nextUrl) : nextUrl) as IOrderedCollectionPage; + if (!isIOrderedCollectionPage(collectionPage)) throw new IdentifiableError('2a05bb06-f38c-4854-af6f-7fd5e87c98ee', 'Object is not collectionPage'); + if (collectionPage.partOf !== user.outbox) throw new IdentifiableError('6603433f-99db-4134-980c-48705ae57ab8', 'outbox part is invalid'); + + const activityes = (collectionPage.orderedItems ?? collectionPage.items); nextUrl = collectionPage.next; + if (!activityes) continue; + + created = await this.fetchObjects(user, activityes, includeAnnounce, created); + if (createLimit <= created) break;//次ページ見て一件だけしか取れないのは微妙 if (!nextUrl) { break; } + + await this.redisClient.set(`${outboxUrl}--next`, `${nextUrl}`, 'EX', 60 * 15);//15min } this.logger.succ(`Outbox Fetced: ${outboxUrl}`); //this.logger.info(`Outbox Fetced last: ${nextUrl}`); } @bindThis - private async fetchObjects(activityes: any[], created: number, includeAnnounce:boolean, outboxUrl: string, user:MiRemoteUser, resolver?: Resolver): Promise { - const Resolver = resolver ?? this.apResolverService.createResolver(); - const meta = await this.metaService.fetch(); - + private async fetchObjects(user: MiRemoteUser, activityes: any[], includeAnnounce:boolean, created: number): Promise { for (const activity of activityes) { - if (created > createLimit) break; - if (activity) { - try { - if (isAnnounce(activity)) { - if (includeAnnounce === false) continue; - const uri = getApId(activity); - const announceLocal = await this.apNoteService.fetchNote(uri); - if (announceLocal) continue; - - if (!activity.object) { - this.apLoggerService.logger.info('skip: activity has no object property'); + if (createLimit < created) return created; + try { + if (includeAnnounce && activity.type === 'Announce') { + const object = await this.apDbResolverService.getNoteFromApId(activity.id); + + if (object) continue; + + //ブロックしてたら取得しない + const blockedHosts = (await this.metaService.fetch()).blockedHosts; + if (typeof(activity.object) === 'string') { + if (this.utilityService.isBlockedHost(blockedHosts, this.utilityService.toPuny(new URL(activity.object).hostname))) continue; + } else { + if (this.utilityService.isBlockedHost(blockedHosts, this.utilityService.toPuny(new URL(activity.object.id).hostname))) continue; + } + + const unlock = await this.appLockService.getApLock(activity.id); + try { + if (!activity.id) continue; + let renote = await this.apNoteService.fetchNote(activity.object); + if (renote === null) { + renote = await this.apNoteService.createNote(activity.object, undefined, true); + if (renote === null) { + this.logger.info('announce target is null'); + continue; + } + } + this.logger.info(`Creating the (Re)Note: ${activity.id}`); + + const activityAudience = await this.apAudienceService.parseAudience(user, activity.to, activity.cc); + const createdAt = activity.published ? new Date(activity.published) : null; + + if (createdAt && createdAt < this.idService.parse(renote.id).date) { + this.logger.info('skip: malformed createdAt'); continue; } - //From ApInboxService.Announce - //Announce対象 - const targetUri = getApId(activity.object); - if (targetUri.startsWith('bear:')) { - this.apLoggerService.logger.info('skip: bearcaps url not supported.'); + if (!await this.noteEntityService.isVisibleForMe(renote, user.id)) { + this.logger.info('skip: invalid actor for this activity'); continue; } - const target = await Resolver.resolve(activity.object); - const unlock = await this.appLockService.getApLock(uri); - try { - if (isPost(target)) { - if (this.utilityService.isBlockedHost(meta.blockedHosts, this.utilityService.extractDbHost(targetUri))) continue; - try { - let renote = await this.apNoteService.fetchNote(targetUri); - if (renote === null) { - renote = await this.apNoteService.createNote(targetUri, undefined, true); - if (renote === null) { - this.apLoggerService.logger.info('announce target is null'); - continue; - } - } - this.logger.info(`Creating the (Re)Note: ${uri}`); - - const activityAudience = await this.apAudienceService.parseAudience(user, activity.to, activity.cc); - const createdAt = activity.published ? new Date(activity.published) : null; - - if (createdAt && createdAt < this.idService.parse(renote.id).date) { - this.apLoggerService.logger.info('skip: malformed createdAt'); - continue; - } - if (!await this.noteEntityService.isVisibleForMe(renote, user.id)) { - this.apLoggerService.logger.info('skip: invalid actor for this activity'); - continue; - } - await this.noteCreateService.create(user, { - createdAt, - renote, - visibility: activityAudience.visibility, - visibleUsers: activityAudience.visibleUsers, - uri, - }, true ); - created++; - continue; - } catch (err) { - // 対象が4xxならスキップ - if (err instanceof StatusError) { - if (!err.isRetryable) { - this.apLoggerService.logger.info(`Ignored announce target ${target.id} - ${err.statusCode}`); - } - this.apLoggerService.logger.info(`Error in announce target ${target.id} - ${err.statusCode}`); - } - throw err; - } - } else { - continue; + await this.noteCreateService.create(user, { + createdAt, + renote, + visibility: activityAudience.visibility, + visibleUsers: activityAudience.visibleUsers, + uri: activity.id, + }, true ); + } catch (err) { + // 対象が4xxならスキップ + if (err instanceof StatusError) { + if (!err.isRetryable) { + this.logger.info(`Ignored announce target ${activity.object} - ${err.statusCode}`); } - } finally { - unlock(); + this.logger.info(`Error in announce target ${activity.object} - ${err.statusCode}`); + } else { + throw err; } - } else if (isNote(activity.object)) { - const id = getApId(activity.object); - const local = await this.apNoteService.fetchNote(id); - if (local) { - continue; - } - await this.apNoteService.createNote(id, undefined, true); - created++; - } else { - this.apLoggerService.logger.warn('Outbox activity type is not announce or create-note (type:' + activity.type + ')' ); + } finally { + unlock(); } - } catch (err) { - this.apLoggerService.logger.warn('Outbox activity fetch error:' + err ); - this.apLoggerService.logger.info(JSON.stringify(activity)); + } else if (isCreate(activity) && typeof(activity.object) !== 'string' && isNote(activity.object)) { + const object = await this.apDbResolverService.getNoteFromApId(activity.object); + if (object) continue; + await this.apNoteService.createNote(activity.object, undefined, true); + } + } catch (err) { + if (err instanceof AbortError) { + this.logger.warn(`Aborted note: ${activity.id}`); + } else { + this.logger.warn(JSON.stringify(err)); + this.logger.warn(JSON.stringify(activity)); + throw err; } } + created ++; } return created; } diff --git a/packages/backend/src/core/activitypub/type.ts b/packages/backend/src/core/activitypub/type.ts index bd26c15a92..11aedbc9b1 100644 --- a/packages/backend/src/core/activitypub/type.ts +++ b/packages/backend/src/core/activitypub/type.ts @@ -118,7 +118,8 @@ export interface IOrderedCollectionPage extends IObject { partOf: string; totalItems?: number; first?: IObject | string; - orderedItems: ApObject[]; + orderedItems?: IObject[]; + items?: IObject[]; prev: string; next: string; } diff --git a/packages/backend/src/server/api/endpoints/ap/fetch-outbox.ts b/packages/backend/src/server/api/endpoints/ap/fetch-outbox.ts index 0af6ccdab0..09c71207a7 100644 --- a/packages/backend/src/server/api/endpoints/ap/fetch-outbox.ts +++ b/packages/backend/src/server/api/endpoints/ap/fetch-outbox.ts @@ -38,6 +38,11 @@ export const meta = { code: 'OUTBOX_UNDEFINED_THIS_USER', id: '890ecef7-ad5a-487d-a201-b49a54059c90', }, + outboxFirstPageUndefined: { + message: 'outbox first page undefined this user', + code: 'OUTBOX_FIRST_PAGE_UNDEFINED_THIS_USER', + id: 'e1f29e66-86a9-4fdc-9be6-63d4587dc350', + }, }, } as const; @@ -59,6 +64,7 @@ export const paramDef = { default: false, description: 'Outbox取得の際にRenoteも対象にします', }, + //skip: { type: 'integer', minimum: 1, default: 0 }, }, required: ['userId'], } as const; @@ -71,21 +77,22 @@ export default class extends Endpoint { // eslint- super(meta, paramDef, async (ps, me) => { if (ps.wait) { try { - ps.includeAnnounce ? - await this.apOutboxFetchService.fetchOutbox(ps.userId, true) : - await this.apOutboxFetchService.fetchOutbox(ps.userId, false); + await this.apOutboxFetchService.fetchOutbox(ps.userId, ps.includeAnnounce); } catch (err) { if (err instanceof IdentifiableError) { if (err.id === '3fc5a089-cab4-48db-b9f3-f220574b3c0a') throw new ApiError(meta.errors.noSuchUser); if (err.id === '67070303-177c-4600-af93-b26a7ab889c6') throw new ApiError(meta.errors.isLocalUser); if (err.id === 'e7a2e510-a8ce-40e9-b1e6-c007bacdc89f') throw new ApiError(meta.errors.outboxUndefined); + //if (err.id === 'b27090c8-8a68-4189-a445-14591c32a89c') + //if (err.id === '0be2f5a1-2345-46d8-b8c3-430b111c68d3') + if (err.id === 'a723c2df-0250-4091-b5fc-e3a7b36c7b61') throw new ApiError(meta.errors.outboxFirstPageUndefined); + //if (err.id === '6603433f-99db-4134-980c-48705ae57ab8') + //if (err.id === '2a05bb06-f38c-4854-af6f-7fd5e87c98ee') } throw (err); } } else { - ps.includeAnnounce ? - this.apOutboxFetchService.fetchOutbox(ps.userId, true) : - this.apOutboxFetchService.fetchOutbox(ps.userId, false); + this.apOutboxFetchService.fetchOutbox(ps.userId, ps.includeAnnounce); } }); }