From 239824c1870337451b847189369f35410060dc58 Mon Sep 17 00:00:00 2001 From: tamaina Date: Sat, 22 Apr 2023 15:37:17 +0000 Subject: [PATCH] job! --- .config/example.yml | 16 +++--- .vscode/settings.json | 1 + packages/backend/src/config.ts | 2 + .../backend/src/core/AccountMoveService.ts | 6 +-- packages/backend/src/core/QueueModule.ts | 2 +- packages/backend/src/core/QueueService.ts | 9 +++- .../RelationshipQueueProcessorsService.ts | 2 +- packages/backend/test/e2e/move.ts | 53 ++++++++++--------- 8 files changed, 54 insertions(+), 37 deletions(-) diff --git a/.config/example.yml b/.config/example.yml index 57e2b56b78..8111b1992e 100644 --- a/.config/example.yml +++ b/.config/example.yml @@ -133,16 +133,20 @@ id: 'aid' #clusterLimit: 1 # Job concurrency per worker -# deliverJobConcurrency: 128 -# inboxJobConcurrency: 16 +#deliverJobConcurrency: 128 +#inboxJobConcurrency: 16 +#relashionshipJobConcurrency: 16 +# What's relashionshipJob?: +# Follow, unfollow, block and unblock(ings) while following-imports, etc. or account migrations. # Job rate limiter -# deliverJobPerSec: 128 -# inboxJobPerSec: 16 +#deliverJobPerSec: 128 +#inboxJobPerSec: 16 +#relashionshipJobPerSec: 64 # Job attempts -# deliverJobMaxAttempts: 12 -# inboxJobMaxAttempts: 8 +#deliverJobMaxAttempts: 12 +#inboxJobMaxAttempts: 8 # IP address family used for outgoing request (ipv4, ipv6 or dual) #outgoingAddressFamily: ipv4 diff --git a/.vscode/settings.json b/.vscode/settings.json index baffbe18ec..71fb02a59d 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -6,5 +6,6 @@ "files.associations": { "*.test.ts": "typescript" }, + "jest.jestCommandLine": "pnpm run jest", "jest.autoRun": "off" } \ No newline at end of file diff --git a/packages/backend/src/config.ts b/packages/backend/src/config.ts index e4f7601fa9..bf8468a832 100644 --- a/packages/backend/src/config.ts +++ b/packages/backend/src/config.ts @@ -84,8 +84,10 @@ export type Source = { deliverJobConcurrency?: number; inboxJobConcurrency?: number; + relashionshipJobConcurrency?: number; deliverJobPerSec?: number; inboxJobPerSec?: number; + relashionshipJobPerSec?: number; deliverJobMaxAttempts?: number; inboxJobMaxAttempts?: number; diff --git a/packages/backend/src/core/AccountMoveService.ts b/packages/backend/src/core/AccountMoveService.ts index 77757e0f20..4d315f6308 100644 --- a/packages/backend/src/core/AccountMoveService.ts +++ b/packages/backend/src/core/AccountMoveService.ts @@ -96,14 +96,14 @@ export class AccountMoveService { const iObj = await this.userEntityService.pack(src.id, src, { detail: true, includeSecrets: true }); this.globalEventService.publishMainStream(src.id, 'meUpdated', iObj); - // Unfollow + // Unfollow after 24 hours const followings = await this.followingsRepository.findBy({ followerId: src.id, }); - this.queueService.createUnfollowJob(followings.map(following => ({ + this.queueService.createDelayedUnfollowJob(followings.map(following => ({ from: { id: src.id }, to: { id: following.followeeId }, - }))); + })), process.env.NODE_ENV === 'test' ? 10000 : 1000 * 60 * 60 * 24); await this.postMoveProcess(src, dst); diff --git a/packages/backend/src/core/QueueModule.ts b/packages/backend/src/core/QueueModule.ts index bac85d7a15..d4905a5f88 100644 --- a/packages/backend/src/core/QueueModule.ts +++ b/packages/backend/src/core/QueueModule.ts @@ -78,7 +78,7 @@ const $db: Provider = { const $relationship: Provider = { provide: 'queue:relationship', - useFactory: (config: Config) => q(config, 'relationship'), + useFactory: (config: Config) => q(config, 'relationship', config.relashionshipJobPerSec ?? 64), inject: [DI.config], }; diff --git a/packages/backend/src/core/QueueService.ts b/packages/backend/src/core/QueueService.ts index 375ac49911..784f4b841c 100644 --- a/packages/backend/src/core/QueueService.ts +++ b/packages/backend/src/core/QueueService.ts @@ -258,6 +258,12 @@ export class QueueService { return this.relationshipQueue.addBulk(jobs); } + @bindThis + public createDelayedUnfollowJob(followings: { from: ThinUser, to: ThinUser, requestId?: string }[], delay: number) { + const jobs = followings.map(rel => this.generateRelationshipJobData('unfollow', rel, { delay })); + return this.relationshipQueue.addBulk(jobs); + } + @bindThis public createBlockJob(blockings: { from: ThinUser, to: ThinUser, silent?: boolean }[]) { const jobs = blockings.map(rel => this.generateRelationshipJobData('block', rel)); @@ -271,7 +277,7 @@ export class QueueService { } @bindThis - private generateRelationshipJobData(name: 'follow' | 'unfollow' | 'block' | 'unblock', data: RelationshipJobData): { + private generateRelationshipJobData(name: 'follow' | 'unfollow' | 'block' | 'unblock', data: RelationshipJobData, opts?: Bull.JobOptions): { name: string, data: RelationshipJobData, opts: Bull.JobOptions, @@ -287,6 +293,7 @@ export class QueueService { opts: { removeOnComplete: true, removeOnFail: true, + ...opts, }, }; } diff --git a/packages/backend/src/queue/RelationshipQueueProcessorsService.ts b/packages/backend/src/queue/RelationshipQueueProcessorsService.ts index af086fa4e7..736b4fa80d 100644 --- a/packages/backend/src/queue/RelationshipQueueProcessorsService.ts +++ b/packages/backend/src/queue/RelationshipQueueProcessorsService.ts @@ -17,7 +17,7 @@ export class RelationshipQueueProcessorsService { @bindThis public start(q: Bull.Queue): void { - const maxJobs = (this.config.deliverJobConcurrency ?? 128) / 4; // conservative? + const maxJobs = this.config.relashionshipJobConcurrency ?? 16; q.process('follow', maxJobs, (job) => this.relationshipProcessorService.processFollow(job)); q.process('unfollow', maxJobs, (job) => this.relationshipProcessorService.processUnfollow(job)); q.process('block', maxJobs, (job) => this.relationshipProcessorService.processBlock(job)); diff --git a/packages/backend/test/e2e/move.ts b/packages/backend/test/e2e/move.ts index 657934ba42..94f1ea4c7a 100644 --- a/packages/backend/test/e2e/move.ts +++ b/packages/backend/test/e2e/move.ts @@ -266,11 +266,12 @@ describe('Account Move', () => { await sleep(1000 * 3); // wait for jobs to finish + // Unfollow delayed? const aliceFollowings = await api('/users/following', { userId: alice.id, }, alice); assert.strictEqual(aliceFollowings.status, 200); - assert.strictEqual(aliceFollowings.body.length, 0); + assert.strictEqual(aliceFollowings.body.length, 3); const carolFollowings = await api('/users/following', { userId: carol.id, @@ -304,16 +305,35 @@ describe('Account Move', () => { assert.ok(eveLists.body[0].userIds.find((id: string) => id === bob.id)); }); - test('Unable to move if the destination account has already moved.', async () => { - await api('/i/move', { - moveToAccount: `@bob@${url.hostname}`, + test('A locked account automatically accept the follow request if it had already accepted the old account.', async () => { + await successfulApiCall({ + endpoint: '/following/create', + parameters: { + userId: frank.id, + }, + user: bob, + }); + const followers = await api('/users/followers', { + userId: frank.id, + }, frank); + + assert.strictEqual(followers.status, 200); + assert.strictEqual(followers.body.length, 2); + assert.strictEqual(followers.body[0].followerId, bob.id); + }); + + test('Unfollowed after 10 sec (24 hours in production).', async () => { + await sleep(1000 * 8); + + const following = await api('/users/following', { + userId: alice.id, }, alice); - const newAlice = await Users.findOneByOrFail({ id: alice.id }); - assert.strictEqual(newAlice.movedToUri, `${url.origin}/users/${bob.id}`); - assert.strictEqual(newAlice.alsoKnownAs?.length, 1); - assert.strictEqual(newAlice.alsoKnownAs[0], `${url.origin}/users/${bob.id}`); + assert.strictEqual(following.status, 200); + assert.strictEqual(following.body.length, 0); + }); + test('Unable to move if the destination account has already moved.', async () => { const res = await api('/i/move', { moveToAccount: `@alice@${url.hostname}`, }, bob); @@ -345,23 +365,6 @@ describe('Account Move', () => { assert.strictEqual(newEve.followersCount, 1); }); - test('A locked account automatically accept the follow request if it had already accepted the old account.', async () => { - await successfulApiCall({ - endpoint: '/following/create', - parameters: { - userId: frank.id, - }, - user: bob, - }); - const followers = await api('/users/followers', { - userId: frank.id, - }, frank); - - assert.strictEqual(followers.status, 200); - assert.strictEqual(followers.body.length, 2); - assert.strictEqual(followers.body[0].followerId, bob.id); - }); - test.each([ '/antennas/create', '/channels/create',