From 9daafca155682281f567c9b4da8f3af3564aa281 Mon Sep 17 00:00:00 2001 From: Hazelnoot Date: Mon, 9 Dec 2024 19:04:06 -0500 Subject: [PATCH 1/8] fix rate limits under multi-node environments --- .../src/server/api/SkRateLimiterService.ts | 187 +++--- .../server/api/SkRateLimiterServiceTests.ts | 544 +++++++++++------- 2 files changed, 457 insertions(+), 274 deletions(-) diff --git a/packages/backend/src/server/api/SkRateLimiterService.ts b/packages/backend/src/server/api/SkRateLimiterService.ts index 6415ee905c..05166ed93c 100644 --- a/packages/backend/src/server/api/SkRateLimiterService.ts +++ b/packages/backend/src/server/api/SkRateLimiterService.ts @@ -5,16 +5,13 @@ import { Inject, Injectable } from '@nestjs/common'; import Redis from 'ioredis'; -import { LoggerService } from '@/core/LoggerService.js'; import { TimeService } from '@/core/TimeService.js'; import { EnvService } from '@/core/EnvService.js'; import { DI } from '@/di-symbols.js'; -import type Logger from '@/logger.js'; import { BucketRateLimit, LegacyRateLimit, LimitInfo, RateLimit, hasMinLimit, isLegacyRateLimit, Keyed } from '@/misc/rate-limit-utils.js'; @Injectable() export class SkRateLimiterService { - private readonly logger: Logger; private readonly disabled: boolean; constructor( @@ -24,14 +21,10 @@ export class SkRateLimiterService { @Inject(DI.redis) private readonly redisClient: Redis.Redis, - @Inject(LoggerService) - loggerService: LoggerService, - @Inject(EnvService) envService: EnvService, ) { - this.logger = loggerService.getLogger('limiter'); - this.disabled = envService.env.NODE_ENV !== 'production'; // TODO disable in TEST *only* + this.disabled = envService.env.NODE_ENV !== 'production'; } public async limit(limit: Keyed, actor: string, factor = 1): Promise { @@ -50,10 +43,25 @@ export class SkRateLimiterService { throw new Error(`Rate limit factor is zero or negative: ${factor}`); } - if (isLegacyRateLimit(limit)) { - return await this.limitLegacy(limit, actor, factor); - } else { - return await this.limitBucket(limit, actor, factor); + return await this.tryLimit(limit, actor, factor); + } + + private async tryLimit(limit: Keyed, actor: string, factor: number, retry = 1): Promise { + try { + if (isLegacyRateLimit(limit)) { + return await this.limitLegacy(limit, actor, factor); + } else { + return await this.limitBucket(limit, actor, factor); + } + } catch (err) { + // We may experience collision errors from optimistic locking. + // This is expected, so we should retry a few times before giving up. + // https://redis.io/docs/latest/develop/interact/transactions/#optimistic-locking-using-check-and-set + if (err instanceof TransactionError && retry < 3) { + return await this.tryLimit(limit, actor, factor, retry + 1); + } + + throw err; } } @@ -94,36 +102,30 @@ export class SkRateLimiterService { if (limit.minInterval === 0) return null; if (limit.minInterval < 0) throw new Error(`Invalid rate limit ${limit.key}: minInterval is negative (${limit.minInterval})`); - const counter = await this.getLimitCounter(limit, actor, 'min'); const minInterval = Math.max(Math.ceil(limit.minInterval * factor), 0); + const expirationSec = Math.max(Math.ceil(minInterval / 1000), 1); - // Update expiration - if (counter.c > 0) { - const isCleared = this.timeService.now - counter.t >= minInterval; + // Check for window clear + const counter = await this.getLimitCounter(limit, actor, 'min'); + if (counter.counter > 0) { + const isCleared = this.timeService.now - counter.timestamp >= minInterval; if (isCleared) { - counter.c = 0; + counter.counter = 0; } } - const blocked = counter.c > 0; + // Increment the limit, then synchronize with redis + const blocked = counter.counter > 0; if (!blocked) { - counter.c++; - counter.t = this.timeService.now; + counter.counter++; + counter.timestamp = this.timeService.now; + await this.updateLimitCounter(limit, actor, 'min', expirationSec, counter); } // Calculate limit status - const resetMs = Math.max(Math.ceil(minInterval - (this.timeService.now - counter.t)), 0); + const resetMs = Math.max(minInterval - (this.timeService.now - counter.timestamp), 0); const resetSec = Math.ceil(resetMs / 1000); - const limitInfo: LimitInfo = { blocked, remaining: 0, resetSec, resetMs, fullResetSec: resetSec, fullResetMs: resetMs }; - - // Update the limit counter, but not if blocked - if (!blocked) { - // Don't await, or we will slow down the API. - this.setLimitCounter(limit, actor, counter, resetSec, 'min') - .catch(err => this.logger.error(`Failed to update limit ${limit.key}:min for ${actor}:`, err)); - } - - return limitInfo; + return { blocked, remaining: 0, resetSec, resetMs, fullResetSec: resetSec, fullResetMs: resetMs }; } private async limitBucket(limit: Keyed, actor: string, factor: number): Promise { @@ -131,68 +133,113 @@ export class SkRateLimiterService { if (limit.dripRate != null && limit.dripRate < 1) throw new Error(`Invalid rate limit ${limit.key}: dripRate is less than 1 (${limit.dripRate})`); if (limit.dripSize != null && limit.dripSize < 1) throw new Error(`Invalid rate limit ${limit.key}: dripSize is less than 1 (${limit.dripSize})`); - const counter = await this.getLimitCounter(limit, actor, 'bucket'); const bucketSize = Math.max(Math.ceil(limit.size / factor), 1); const dripRate = Math.ceil(limit.dripRate ?? 1000); const dripSize = Math.ceil(limit.dripSize ?? 1); + const expirationSec = Math.max(Math.ceil(bucketSize / dripRate), 1); - // Update drips - if (counter.c > 0) { - const dripsSinceLastTick = Math.floor((this.timeService.now - counter.t) / dripRate) * dripSize; - counter.c = Math.max(counter.c - dripsSinceLastTick, 0); + // Simulate bucket drips + const counter = await this.getLimitCounter(limit, actor, 'bucket'); + if (counter.counter > 0) { + const dripsSinceLastTick = Math.floor((this.timeService.now - counter.timestamp) / dripRate) * dripSize; + counter.counter = Math.max(counter.counter - dripsSinceLastTick, 0); } - const blocked = counter.c >= bucketSize; + // Increment the limit, then synchronize with redis + const blocked = counter.counter >= bucketSize; if (!blocked) { - counter.c++; - counter.t = this.timeService.now; + counter.counter++; + counter.timestamp = this.timeService.now; + await this.updateLimitCounter(limit, actor, 'bucket', expirationSec, counter); } + // Calculate how much time is needed to free up a bucket slot + const overflow = Math.max((counter.counter + 1) - bucketSize, 0); + const dripsNeeded = Math.ceil(overflow / dripSize); + const timeNeeded = Math.max((dripRate * dripsNeeded) - (this.timeService.now - counter.timestamp), 0); + // Calculate limit status - const remaining = Math.max(bucketSize - counter.c, 0); - const resetMs = remaining > 0 ? 0 : Math.max(dripRate - (this.timeService.now - counter.t), 0); + const remaining = Math.max(bucketSize - counter.counter, 0); + const resetMs = timeNeeded; const resetSec = Math.ceil(resetMs / 1000); - const fullResetMs = Math.ceil(counter.c / dripSize) * dripRate; + const fullResetMs = Math.ceil(counter.counter / dripSize) * dripRate; const fullResetSec = Math.ceil(fullResetMs / 1000); - const limitInfo: LimitInfo = { blocked, remaining, resetSec, resetMs, fullResetSec, fullResetMs }; - - // Update the limit counter, but not if blocked - if (!blocked) { - // Don't await, or we will slow down the API. - this.setLimitCounter(limit, actor, counter, fullResetSec, 'bucket') - .catch(err => this.logger.error(`Failed to update limit ${limit.key} for ${actor}:`, err)); - } - - return limitInfo; + return { blocked, remaining, resetSec, resetMs, fullResetSec, fullResetMs }; } private async getLimitCounter(limit: Keyed, actor: string, subject: string): Promise { - const key = createLimitKey(limit, actor, subject); + const timestampKey = createLimitKey(limit, actor, subject, 't'); + const counterKey = createLimitKey(limit, actor, subject, 'c'); - const value = await this.redisClient.get(key); - if (value == null) { - return { t: 0, c: 0 }; + const [timestamp, counter] = await this.executeRedis( + [ + ['get', timestampKey], + ['get', counterKey], + ], + [ + timestampKey, + counterKey, + ], + ); + + return { + timestamp: timestamp ? parseInt(timestamp) : 0, + counter: counter ? parseInt(counter) : 0, + }; + } + + private async updateLimitCounter(limit: Keyed, actor: string, subject: string, expirationSec: number, counter: LimitCounter): Promise { + const timestampKey = createLimitKey(limit, actor, subject, 't'); + const counterKey = createLimitKey(limit, actor, subject, 'c'); + + await this.executeRedis( + [ + ['set', timestampKey, counter.timestamp.toString(), 'EX', expirationSec], + ['set', counterKey, counter.counter.toString(), 'EX', expirationSec], + ], + [ + timestampKey, + counterKey, + ], + ); + } + + private async executeRedis(batch: RedisBatch, watch: string[]): Promise> { + const results = await this.redisClient + .multi(batch) + .watch(watch) + .exec(); + + // Transaction error + if (!results) { + throw new TransactionError('Redis error: transaction conflict'); } - return JSON.parse(value); - } + // The entire call failed + if (results.length !== batch.length) { + throw new Error('Redis error: failed to execute batch'); + } - private async setLimitCounter(limit: Keyed, actor: string, counter: LimitCounter, expiration: number, subject: string): Promise { - const key = createLimitKey(limit, actor, subject); - const value = JSON.stringify(counter); - const expirationSec = Math.max(expiration, 1); - await this.redisClient.set(key, value, 'EX', expirationSec); + // A particular command failed + const errors = results.map(r => r[0]).filter(e => e != null); + if (errors.length > 0) { + throw new AggregateError(errors, `Redis error: failed to execute command(s): '${errors.join('\', \'')}'`); + } + + return results.map(r => r[1]) as RedisResults; } } -function createLimitKey(limit: Keyed, actor: string, subject: string): string { - return `rl_${actor}_${limit.key}_${subject}`; +type RedisBatch = [string, ...unknown[]][] & { length: Num }; +type RedisResults = (string | null)[] & { length: Num }; + +function createLimitKey(limit: Keyed, actor: string, subject: string, value: string): string { + return `rl_${actor}_${limit.key}_${subject}_${value}`; } -export interface LimitCounter { - /** Timestamp */ - t: number; +class TransactionError extends Error {} - /** Counter */ - c: number; +interface LimitCounter { + timestamp: number; + counter: number; } diff --git a/packages/backend/test/unit/server/api/SkRateLimiterServiceTests.ts b/packages/backend/test/unit/server/api/SkRateLimiterServiceTests.ts index dbf7795fc6..871c9afa64 100644 --- a/packages/backend/test/unit/server/api/SkRateLimiterServiceTests.ts +++ b/packages/backend/test/unit/server/api/SkRateLimiterServiceTests.ts @@ -3,25 +3,19 @@ * SPDX-License-Identifier: AGPL-3.0-only */ -import { KEYWORD } from 'color-convert/conversions.js'; -import { jest } from '@jest/globals'; import type Redis from 'ioredis'; -import { LimitCounter, SkRateLimiterService } from '@/server/api/SkRateLimiterService.js'; -import { LoggerService } from '@/core/LoggerService.js'; +import { SkRateLimiterService } from '@/server/api/SkRateLimiterService.js'; import { BucketRateLimit, Keyed, LegacyRateLimit } from '@/misc/rate-limit-utils.js'; /* eslint-disable @typescript-eslint/no-non-null-assertion */ -/* eslint-disable @typescript-eslint/no-unnecessary-condition */ describe(SkRateLimiterService, () => { let mockTimeService: { now: number, date: Date } = null!; - let mockRedisGet: ((key: string) => string | null) | undefined = undefined; - let mockRedisSet: ((args: unknown[]) => void) | undefined = undefined; + let mockRedis: Array<(command: [string, ...unknown[]]) => [Error | null, unknown] | null> = null!; + let mockRedisExec: (batch: [string, ...unknown[]][]) => Promise<[Error | null, unknown][] | null> = null!; let mockEnvironment: Record = null!; let serviceUnderTest: () => SkRateLimiterService = null!; - let loggedMessages: { level: string, data: unknown[] }[] = []; - beforeEach(() => { mockTimeService = { now: 0, @@ -30,16 +24,26 @@ describe(SkRateLimiterService, () => { }, }; - mockRedisGet = undefined; - mockRedisSet = undefined; + mockRedis = []; + mockRedisExec = (batch) => { + const results: [Error | null, unknown][] = batch.map(command => { + const handlerResults = mockRedis.map(handler => handler(command)); + const finalResult = handlerResults.findLast(result => result != null); + return finalResult ?? [new Error('test error: no handler'), null]; + }); + return Promise.resolve(results); + }; const mockRedisClient = { - get(key: string) { - if (mockRedisGet) return Promise.resolve(mockRedisGet(key)); - else return Promise.resolve(null); - }, - set(...args: unknown[]): Promise { - if (mockRedisSet) mockRedisSet(args); - return Promise.resolve(); + multi(batch: [string, ...unknown[]][]) { + return { + watch() { + return { + exec() { + return mockRedisExec(batch); + }, + }; + }, + }; }, } as unknown as Redis.Redis; @@ -49,89 +53,77 @@ describe(SkRateLimiterService, () => { env: mockEnvironment, }; - loggedMessages = []; - const mockLogService = { - getLogger() { - return { - createSubLogger(context: string, color?: KEYWORD) { - return mockLogService.getLogger(context, color); - }, - error(...data: unknown[]) { - loggedMessages.push({ level: 'error', data }); - }, - warn(...data: unknown[]) { - loggedMessages.push({ level: 'warn', data }); - }, - succ(...data: unknown[]) { - loggedMessages.push({ level: 'succ', data }); - }, - debug(...data: unknown[]) { - loggedMessages.push({ level: 'debug', data }); - }, - info(...data: unknown[]) { - loggedMessages.push({ level: 'info', data }); - }, - }; - }, - } as unknown as LoggerService; - let service: SkRateLimiterService | undefined = undefined; serviceUnderTest = () => { - return service ??= new SkRateLimiterService(mockTimeService, mockRedisClient, mockLogService, mockEnvService); + return service ??= new SkRateLimiterService(mockTimeService, mockRedisClient, mockEnvService); }; }); - function expectNoUnhandledErrors() { - const unhandledErrors = loggedMessages.filter(m => m.level === 'error'); - if (unhandledErrors.length > 0) { - throw new Error(`Test failed: got unhandled errors ${unhandledErrors.join('\n')}`); - } - } - describe('limit', () => { const actor = 'actor'; const key = 'test'; - let counter: LimitCounter | undefined = undefined; - let minCounter: LimitCounter | undefined = undefined; + let limitCounter: number | undefined = undefined; + let limitTimestamp: number | undefined = undefined; + let minCounter: number | undefined = undefined; + let minTimestamp: number | undefined = undefined; beforeEach(() => { - counter = undefined; + limitCounter = undefined; + limitTimestamp = undefined; minCounter = undefined; + minTimestamp = undefined; - mockRedisGet = (key: string) => { - if (key === 'rl_actor_test_bucket' && counter) { - return JSON.stringify(counter); + mockRedis.push(([command, ...args]) => { + if (command === 'set' && args[0] === 'rl_actor_test_bucket_t') { + limitTimestamp = parseInt(args[1] as string); + return [null, args[1]]; + } + if (command === 'get' && args[0] === 'rl_actor_test_bucket_t') { + return [null, limitTimestamp?.toString() ?? null]; + } + // if (command === 'incr' && args[0] === 'rl_actor_test_bucket_c') { + // limitCounter = (limitCounter ?? 0) + 1; + // return [null, null]; + // } + if (command === 'set' && args[0] === 'rl_actor_test_bucket_c') { + limitCounter = parseInt(args[1] as string); + return [null, args[1]]; + } + if (command === 'get' && args[0] === 'rl_actor_test_bucket_c') { + return [null, limitCounter?.toString() ?? null]; } - if (key === 'rl_actor_test_min' && minCounter) { - return JSON.stringify(minCounter); + if (command === 'set' && args[0] === 'rl_actor_test_min_t') { + minTimestamp = parseInt(args[1] as string); + return [null, args[1]]; } + if (command === 'get' && args[0] === 'rl_actor_test_min_t') { + return [null, minTimestamp?.toString() ?? null]; + } + // if (command === 'incr' && args[0] === 'rl_actor_test_min_c') { + // minCounter = (minCounter ?? 0) + 1; + // return [null, null]; + // } + if (command === 'set' && args[0] === 'rl_actor_test_min_c') { + minCounter = parseInt(args[1] as string); + return [null, args[1]]; + } + if (command === 'get' && args[0] === 'rl_actor_test_min_c') { + return [null, minCounter?.toString() ?? null]; + } + // if (command === 'expire') { + // return [null, null]; + // } return null; - }; - - mockRedisSet = (args: unknown[]) => { - const [key, value] = args; - - if (key === 'rl_actor_test_bucket') { - if (value == null) counter = undefined; - else if (typeof(value) === 'string') counter = JSON.parse(value); - else throw new Error('invalid redis call'); - } - - if (key === 'rl_actor_test_min') { - if (value == null) minCounter = undefined; - else if (typeof(value) === 'string') minCounter = JSON.parse(value); - else throw new Error('invalid redis call'); - } - }; + }); }); it('should bypass in non-production', async () => { mockEnvironment.NODE_ENV = 'test'; - const info = await serviceUnderTest().limit({ key: 'l', type: undefined, max: 0 }, 'actor'); + const info = await serviceUnderTest().limit({ key: 'l', type: undefined, max: 0 }, actor); expect(info.blocked).toBeFalsy(); expect(info.remaining).toBe(Number.MAX_SAFE_INTEGER); @@ -158,15 +150,10 @@ describe(SkRateLimiterService, () => { expect(info.blocked).toBeFalsy(); }); - it('should not error when allowed', async () => { - await serviceUnderTest().limit(limit, actor); - - expectNoUnhandledErrors(); - }); - it('should return correct info when allowed', async () => { limit.size = 2; - counter = { c: 1, t: 0 }; + limitCounter = 1; + limitTimestamp = 0; const info = await serviceUnderTest().limit(limit, actor); @@ -180,8 +167,7 @@ describe(SkRateLimiterService, () => { it('should increment counter when called', async () => { await serviceUnderTest().limit(limit, actor); - expect(counter).not.toBeUndefined(); - expect(counter?.c).toBe(1); + expect(limitCounter).toBe(1); }); it('should set timestamp when called', async () => { @@ -189,29 +175,28 @@ describe(SkRateLimiterService, () => { await serviceUnderTest().limit(limit, actor); - expect(counter).not.toBeUndefined(); - expect(counter?.t).toBe(1000); + expect(limitTimestamp).toBe(1000); }); it('should decrement counter when dripRate has passed', async () => { - counter = { c: 2, t: 0 }; + limitCounter = 2; + limitTimestamp = 0; mockTimeService.now = 2000; await serviceUnderTest().limit(limit, actor); - expect(counter).not.toBeUndefined(); - expect(counter?.c).toBe(1); // 2 (starting) - 2 (2x1 drip) + 1 (call) = 1 + expect(limitCounter).toBe(1); // 2 (starting) - 2 (2x1 drip) + 1 (call) = 1 }); it('should decrement counter by dripSize', async () => { - counter = { c: 2, t: 0 }; + limitCounter = 2; + limitTimestamp = 0; limit.dripSize = 2; mockTimeService.now = 1000; await serviceUnderTest().limit(limit, actor); - expect(counter).not.toBeUndefined(); - expect(counter?.c).toBe(1); // 2 (starting) - 2 (1x2 drip) + 1 (call) = 1 + expect(limitCounter).toBe(1); // 2 (starting) - 2 (1x2 drip) + 1 (call) = 1 }); it('should maintain counter between calls over time', async () => { @@ -226,25 +211,13 @@ describe(SkRateLimiterService, () => { mockTimeService.now += 1000; // 2 - 1 = 1 await serviceUnderTest().limit(limit, actor); // 1 + 1 = 2 - expect(counter?.c).toBe(2); - expect(counter?.t).toBe(3000); - }); - - it('should log error and continue when update fails', async () => { - mockRedisSet = () => { - throw new Error('test error'); - }; - - await serviceUnderTest().limit(limit, actor); - - const matchingError = loggedMessages - .find(m => m.level === 'error' && m.data - .some(d => typeof(d) === 'string' && d.includes('Failed to update limit'))); - expect(matchingError).toBeTruthy(); + expect(limitCounter).toBe(2); + expect(limitTimestamp).toBe(3000); }); it('should block when bucket is filled', async () => { - counter = { c: 1, t: 0 }; + limitCounter = 1; + limitTimestamp = 0; const info = await serviceUnderTest().limit(limit, actor); @@ -252,7 +225,8 @@ describe(SkRateLimiterService, () => { }); it('should calculate correct info when blocked', async () => { - counter = { c: 1, t: 0 }; + limitCounter = 1; + limitTimestamp = 0; const info = await serviceUnderTest().limit(limit, actor); @@ -263,7 +237,8 @@ describe(SkRateLimiterService, () => { }); it('should allow when bucket is filled but should drip', async () => { - counter = { c: 1, t: 0 }; + limitCounter = 1; + limitTimestamp = 0; mockTimeService.now = 1000; const info = await serviceUnderTest().limit(limit, actor); @@ -272,7 +247,8 @@ describe(SkRateLimiterService, () => { }); it('should scale limit by factor', async () => { - counter = { c: 1, t: 0 }; + limitCounter = 1; + limitTimestamp = 0; const i1 = await serviceUnderTest().limit(limit, actor, 0.5); // 1 + 1 = 2 const i2 = await serviceUnderTest().limit(limit, actor, 0.5); // 2 + 1 = 3 @@ -281,23 +257,39 @@ describe(SkRateLimiterService, () => { expect(i2.blocked).toBeTruthy(); }); - it('should set key expiration', async () => { - const mock = jest.fn(mockRedisSet); - mockRedisSet = mock; + it('should set counter expiration', async () => { + const commands: unknown[][] = []; + mockRedis.push(command => { + commands.push(command); + return null; + }); await serviceUnderTest().limit(limit, actor); - expect(mock).toHaveBeenCalledWith(['rl_actor_test_bucket', '{"t":0,"c":1}', 'EX', 1]); + expect(commands).toContainEqual(['set', 'rl_actor_test_bucket_c', '1', 'EX', 1]); + }); + + it('should set timestamp expiration', async () => { + const commands: unknown[][] = []; + mockRedis.push(command => { + commands.push(command); + return null; + }); + + await serviceUnderTest().limit(limit, actor); + + expect(commands).toContainEqual(['set', 'rl_actor_test_bucket_t', '0', 'EX', 1]); }); it('should not increment when already blocked', async () => { - counter = { c: 1, t: 0 }; + limitCounter = 1; + limitTimestamp = 0; mockTimeService.now += 100; await serviceUnderTest().limit(limit, actor); - expect(counter?.c).toBe(1); - expect(counter?.t).toBe(0); + expect(limitCounter).toBe(1); + expect(limitTimestamp).toBe(0); }); it('should skip if factor is zero', async () => { @@ -384,6 +376,48 @@ describe(SkRateLimiterService, () => { await expect(promise).rejects.toThrow(/dripSize is less than 1/); }); + + it('should retry when redis conflicts', async () => { + let numCalls = 0; + const realMockRedisExec = mockRedisExec; + mockRedisExec = () => { + if (numCalls > 0) { + mockRedisExec = realMockRedisExec; + } + numCalls++; + return Promise.resolve(null); + }; + + await serviceUnderTest().limit(limit, actor); + + expect(numCalls).toBe(2); + }); + + it('should bail out after 3 tries', async () => { + let numCalls = 0; + mockRedisExec = () => { + numCalls++; + return Promise.resolve(null); + }; + + const promise = serviceUnderTest().limit(limit, actor); + + await expect(promise).rejects.toThrow(/transaction conflict/); + expect(numCalls).toBe(3); + }); + + it('should apply correction if extra calls slip through', async () => { + limitCounter = 2; + + const info = await serviceUnderTest().limit(limit, actor); + + expect(info.blocked).toBeTruthy(); + expect(info.remaining).toBe(0); + expect(info.resetMs).toBe(2000); + expect(info.resetSec).toBe(2); + expect(info.fullResetMs).toBe(2000); + expect(info.fullResetSec).toBe(2); + }); }); describe('with min interval', () => { @@ -403,12 +437,6 @@ describe(SkRateLimiterService, () => { expect(info.blocked).toBeFalsy(); }); - it('should not error when allowed', async () => { - await serviceUnderTest().limit(limit, actor); - - expectNoUnhandledErrors(); - }); - it('should calculate correct info when allowed', async () => { const info = await serviceUnderTest().limit(limit, actor); @@ -423,7 +451,7 @@ describe(SkRateLimiterService, () => { await serviceUnderTest().limit(limit, actor); expect(minCounter).not.toBeUndefined(); - expect(minCounter?.c).toBe(1); + expect(minCounter).toBe(1); }); it('should set timestamp when called', async () => { @@ -432,27 +460,29 @@ describe(SkRateLimiterService, () => { await serviceUnderTest().limit(limit, actor); expect(minCounter).not.toBeUndefined(); - expect(minCounter?.t).toBe(1000); + expect(minTimestamp).toBe(1000); }); it('should decrement counter when minInterval has passed', async () => { - minCounter = { c: 1, t: 0 }; + minCounter = 1; + minTimestamp = 0; mockTimeService.now = 1000; await serviceUnderTest().limit(limit, actor); expect(minCounter).not.toBeUndefined(); - expect(minCounter?.c).toBe(1); // 1 (starting) - 1 (interval) + 1 (call) = 1 + expect(minCounter).toBe(1); // 1 (starting) - 1 (interval) + 1 (call) = 1 }); it('should reset counter entirely', async () => { - minCounter = { c: 2, t: 0 }; + minCounter = 2; + minTimestamp = 0; mockTimeService.now = 1000; await serviceUnderTest().limit(limit, actor); expect(minCounter).not.toBeUndefined(); - expect(minCounter?.c).toBe(1); // 2 (starting) - 2 (interval) + 1 (call) = 1 + expect(minCounter).toBe(1); // 2 (starting) - 2 (interval) + 1 (call) = 1 }); it('should maintain counter between calls over time', async () => { @@ -465,25 +495,13 @@ describe(SkRateLimiterService, () => { mockTimeService.now += 1000; // 0 - 1 = 0 await serviceUnderTest().limit(limit, actor); // 0 + 1 = 1 - expect(minCounter?.c).toBe(1); - expect(minCounter?.t).toBe(3000); - }); - - it('should log error and continue when update fails', async () => { - mockRedisSet = () => { - throw new Error('test error'); - }; - - await serviceUnderTest().limit(limit, actor); - - const matchingError = loggedMessages - .find(m => m.level === 'error' && m.data - .some(d => typeof(d) === 'string' && d.includes('Failed to update limit'))); - expect(matchingError).toBeTruthy(); + expect(minCounter).toBe(1); + expect(minTimestamp).toBe(3000); }); it('should block when interval exceeded', async () => { - minCounter = { c: 1, t: 0 }; + minCounter = 1; + minTimestamp = 0; const info = await serviceUnderTest().limit(limit, actor); @@ -491,7 +509,8 @@ describe(SkRateLimiterService, () => { }); it('should calculate correct info when blocked', async () => { - minCounter = { c: 1, t: 0 }; + minCounter = 1; + minTimestamp = 0; const info = await serviceUnderTest().limit(limit, actor); @@ -502,7 +521,8 @@ describe(SkRateLimiterService, () => { }); it('should allow when bucket is filled but interval has passed', async () => { - minCounter = { c: 1, t: 0 }; + minCounter = 1; + minTimestamp = 0; mockTimeService.now = 1000; const info = await serviceUnderTest().limit(limit, actor); @@ -511,7 +531,8 @@ describe(SkRateLimiterService, () => { }); it('should scale interval by factor', async () => { - minCounter = { c: 1, t: 0 }; + minCounter = 1; + minTimestamp = 0; mockTimeService.now += 500; const info = await serviceUnderTest().limit(limit, actor, 0.5); @@ -519,23 +540,39 @@ describe(SkRateLimiterService, () => { expect(info.blocked).toBeFalsy(); }); - it('should set key expiration', async () => { - const mock = jest.fn(mockRedisSet); - mockRedisSet = mock; + it('should set counter expiration', async () => { + const commands: unknown[][] = []; + mockRedis.push(command => { + commands.push(command); + return null; + }); await serviceUnderTest().limit(limit, actor); - expect(mock).toHaveBeenCalledWith(['rl_actor_test_min', '{"t":0,"c":1}', 'EX', 1]); + expect(commands).toContainEqual(['set', 'rl_actor_test_min_c', '1', 'EX', 1]); + }); + + it('should set timestamp expiration', async () => { + const commands: unknown[][] = []; + mockRedis.push(command => { + commands.push(command); + return null; + }); + + await serviceUnderTest().limit(limit, actor); + + expect(commands).toContainEqual(['set', 'rl_actor_test_min_t', '0', 'EX', 1]); }); it('should not increment when already blocked', async () => { - minCounter = { c: 1, t: 0 }; + minCounter = 1; + minTimestamp = 0; mockTimeService.now += 100; await serviceUnderTest().limit(limit, actor); - expect(minCounter?.c).toBe(1); - expect(minCounter?.t).toBe(0); + expect(minCounter).toBe(1); + expect(minTimestamp).toBe(0); }); it('should skip if factor is zero', async () => { @@ -567,6 +604,19 @@ describe(SkRateLimiterService, () => { await expect(promise).rejects.toThrow(/minInterval is negative/); }); + + it('should not apply correction to extra calls', async () => { + minCounter = 2; + + const info = await serviceUnderTest().limit(limit, actor); + + expect(info.blocked).toBeTruthy(); + expect(info.remaining).toBe(0); + expect(info.resetMs).toBe(1000); + expect(info.resetSec).toBe(1); + expect(info.fullResetMs).toBe(1000); + expect(info.fullResetSec).toBe(1); + }); }); describe('with legacy limit', () => { @@ -587,16 +637,11 @@ describe(SkRateLimiterService, () => { expect(info.blocked).toBeFalsy(); }); - it('should not error when allowed', async () => { - await serviceUnderTest().limit(limit, actor); - - expectNoUnhandledErrors(); - }); - it('should infer dripRate from duration', async () => { limit.max = 10; limit.duration = 10000; - counter = { c: 10, t: 0 }; + limitCounter = 10; + limitTimestamp = 0; const i1 = await serviceUnderTest().limit(limit, actor); mockTimeService.now += 1000; @@ -619,7 +664,8 @@ describe(SkRateLimiterService, () => { it('should calculate correct info when allowed', async () => { limit.max = 10; limit.duration = 10000; - counter = { c: 10, t: 0 }; + limitCounter = 10; + limitTimestamp = 0; mockTimeService.now += 2000; const info = await serviceUnderTest().limit(limit, actor); @@ -634,7 +680,8 @@ describe(SkRateLimiterService, () => { it('should calculate correct info when blocked', async () => { limit.max = 10; limit.duration = 10000; - counter = { c: 10, t: 0 }; + limitCounter = 10; + limitTimestamp = 0; const info = await serviceUnderTest().limit(limit, actor); @@ -646,7 +693,8 @@ describe(SkRateLimiterService, () => { }); it('should allow when bucket is filled but interval has passed', async () => { - counter = { c: 10, t: 0 }; + limitCounter = 10; + limitTimestamp = 0; mockTimeService.now = 1000; const info = await serviceUnderTest().limit(limit, actor); @@ -655,37 +703,55 @@ describe(SkRateLimiterService, () => { }); it('should scale limit by factor', async () => { - counter = { c: 10, t: 0 }; + limitCounter = 10; + limitTimestamp = 0; const info = await serviceUnderTest().limit(limit, actor, 0.5); // 10 + 1 = 11 expect(info.blocked).toBeTruthy(); }); - it('should set key expiration', async () => { - const mock = jest.fn(mockRedisSet); - mockRedisSet = mock; + it('should set counter expiration', async () => { + const commands: unknown[][] = []; + mockRedis.push(command => { + commands.push(command); + return null; + }); await serviceUnderTest().limit(limit, actor); - expect(mock).toHaveBeenCalledWith(['rl_actor_test_bucket', '{"t":0,"c":1}', 'EX', 1]); + expect(commands).toContainEqual(['set', 'rl_actor_test_bucket_c', '1', 'EX', 1]); + }); + + it('should set timestamp expiration', async () => { + const commands: unknown[][] = []; + mockRedis.push(command => { + commands.push(command); + return null; + }); + + await serviceUnderTest().limit(limit, actor); + + expect(commands).toContainEqual(['set', 'rl_actor_test_bucket_t', '0', 'EX', 1]); }); it('should not increment when already blocked', async () => { - counter = { c: 1, t: 0 }; + limitCounter = 1; + limitTimestamp = 0; mockTimeService.now += 100; await serviceUnderTest().limit(limit, actor); - expect(counter?.c).toBe(1); - expect(counter?.t).toBe(0); + expect(limitCounter).toBe(1); + expect(limitTimestamp).toBe(0); }); it('should not allow dripRate to be lower than 0', async () => { // real-world case; taken from StreamingApiServerService limit.max = 4096; limit.duration = 2000; - counter = { c: 4096, t: 0 }; + limitCounter = 4096; + limitTimestamp = 0; const i1 = await serviceUnderTest().limit(limit, actor); mockTimeService.now = 1; @@ -723,6 +789,19 @@ describe(SkRateLimiterService, () => { await expect(promise).rejects.toThrow(/size is less than 1/); }); + + it('should apply correction if extra calls slip through', async () => { + limitCounter = 2; + + const info = await serviceUnderTest().limit(limit, actor); + + expect(info.blocked).toBeTruthy(); + expect(info.remaining).toBe(0); + expect(info.resetMs).toBe(2000); + expect(info.resetSec).toBe(2); + expect(info.fullResetMs).toBe(2000); + expect(info.fullResetSec).toBe(2); + }); }); describe('with legacy limit and min interval', () => { @@ -744,14 +823,9 @@ describe(SkRateLimiterService, () => { expect(info.blocked).toBeFalsy(); }); - it('should not error when allowed', async () => { - await serviceUnderTest().limit(limit, actor); - - expectNoUnhandledErrors(); - }); - it('should block when limit exceeded', async () => { - counter = { c: 5, t: 0 }; + limitCounter = 5; + limitTimestamp = 0; const info = await serviceUnderTest().limit(limit, actor); @@ -759,7 +833,8 @@ describe(SkRateLimiterService, () => { }); it('should block when minInterval exceeded', async () => { - minCounter = { c: 1, t: 0 }; + minCounter = 1; + minTimestamp = 0; const info = await serviceUnderTest().limit(limit, actor); @@ -767,7 +842,8 @@ describe(SkRateLimiterService, () => { }); it('should calculate correct info when allowed', async () => { - counter = { c: 1, t: 0 }; + limitCounter = 1; + limitTimestamp = 0; const info = await serviceUnderTest().limit(limit, actor); @@ -779,7 +855,8 @@ describe(SkRateLimiterService, () => { }); it('should calculate correct info when blocked by limit', async () => { - counter = { c: 5, t: 0 }; + limitCounter = 5; + limitTimestamp = 0; const info = await serviceUnderTest().limit(limit, actor); @@ -791,7 +868,8 @@ describe(SkRateLimiterService, () => { }); it('should calculate correct info when blocked by minInterval', async () => { - minCounter = { c: 1, t: 0 }; + minCounter = 1; + minTimestamp = 0; const info = await serviceUnderTest().limit(limit, actor); @@ -803,7 +881,8 @@ describe(SkRateLimiterService, () => { }); it('should allow when counter is filled but interval has passed', async () => { - counter = { c: 5, t: 0 }; + limitCounter = 5; + limitTimestamp = 0; mockTimeService.now = 1000; const info = await serviceUnderTest().limit(limit, actor); @@ -812,7 +891,8 @@ describe(SkRateLimiterService, () => { }); it('should allow when minCounter is filled but interval has passed', async () => { - minCounter = { c: 1, t: 0 }; + minCounter = 1; + minTimestamp = 0; mockTimeService.now = 1000; const info = await serviceUnderTest().limit(limit, actor); @@ -821,8 +901,10 @@ describe(SkRateLimiterService, () => { }); it('should scale limit and interval by factor', async () => { - counter = { c: 5, t: 0 }; - minCounter = { c: 1, t: 0 }; + limitCounter = 5; + limitTimestamp = 0; + minCounter = 1; + minTimestamp = 0; mockTimeService.now += 500; const info = await serviceUnderTest().limit(limit, actor, 0.5); @@ -830,27 +912,81 @@ describe(SkRateLimiterService, () => { expect(info.blocked).toBeFalsy(); }); - it('should set key expiration', async () => { - const mock = jest.fn(mockRedisSet); - mockRedisSet = mock; + it('should set bucket counter expiration', async () => { + const commands: unknown[][] = []; + mockRedis.push(command => { + commands.push(command); + return null; + }); await serviceUnderTest().limit(limit, actor); - expect(mock).toHaveBeenCalledWith(['rl_actor_test_bucket', '{"t":0,"c":1}', 'EX', 1]); - expect(mock).toHaveBeenCalledWith(['rl_actor_test_min', '{"t":0,"c":1}', 'EX', 1]); + expect(commands).toContainEqual(['set', 'rl_actor_test_bucket_c', '1', 'EX', 1]); + }); + + it('should set bucket timestamp expiration', async () => { + const commands: unknown[][] = []; + mockRedis.push(command => { + commands.push(command); + return null; + }); + + await serviceUnderTest().limit(limit, actor); + + expect(commands).toContainEqual(['set', 'rl_actor_test_bucket_t', '0', 'EX', 1]); + }); + + it('should set min counter expiration', async () => { + const commands: unknown[][] = []; + mockRedis.push(command => { + commands.push(command); + return null; + }); + + await serviceUnderTest().limit(limit, actor); + + expect(commands).toContainEqual(['set', 'rl_actor_test_min_c', '1', 'EX', 1]); + }); + + it('should set min timestamp expiration', async () => { + const commands: unknown[][] = []; + mockRedis.push(command => { + commands.push(command); + return null; + }); + + await serviceUnderTest().limit(limit, actor); + + expect(commands).toContainEqual(['set', 'rl_actor_test_min_t', '0', 'EX', 1]); }); it('should not increment when already blocked', async () => { - counter = { c: 5, t: 0 }; - minCounter = { c: 1, t: 0 }; + limitCounter = 5; + limitTimestamp = 0; + minCounter = 1; + minTimestamp = 0; mockTimeService.now += 100; await serviceUnderTest().limit(limit, actor); - expect(counter?.c).toBe(5); - expect(counter?.t).toBe(0); - expect(minCounter?.c).toBe(1); - expect(minCounter?.t).toBe(0); + expect(limitCounter).toBe(5); + expect(limitTimestamp).toBe(0); + expect(minCounter).toBe(1); + expect(minTimestamp).toBe(0); + }); + + it('should apply correction if extra calls slip through', async () => { + limitCounter = 6; + minCounter = 6; + + const info = await serviceUnderTest().limit(limit, actor); + + expect(info.blocked).toBeTruthy(); + expect(info.remaining).toBe(0); + expect(info.resetMs).toBe(2000); + expect(info.resetSec).toBe(2); + expect(info.fullResetMs).toBe(6000); + expect(info.fullResetSec).toBe(6); }); }); }); From ead781900dcf98b9dace91aa4a5ec7b3cecf07e2 Mon Sep 17 00:00:00 2001 From: Hazelnoot Date: Mon, 9 Dec 2024 19:04:59 -0500 Subject: [PATCH 2/8] enable rate limits for dev environment --- packages/backend/src/server/api/SkRateLimiterService.ts | 2 +- .../backend/test/unit/server/api/SkRateLimiterServiceTests.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/backend/src/server/api/SkRateLimiterService.ts b/packages/backend/src/server/api/SkRateLimiterService.ts index 05166ed93c..b11d1556ba 100644 --- a/packages/backend/src/server/api/SkRateLimiterService.ts +++ b/packages/backend/src/server/api/SkRateLimiterService.ts @@ -24,7 +24,7 @@ export class SkRateLimiterService { @Inject(EnvService) envService: EnvService, ) { - this.disabled = envService.env.NODE_ENV !== 'production'; + this.disabled = envService.env.NODE_ENV === 'test'; } public async limit(limit: Keyed, actor: string, factor = 1): Promise { diff --git a/packages/backend/test/unit/server/api/SkRateLimiterServiceTests.ts b/packages/backend/test/unit/server/api/SkRateLimiterServiceTests.ts index 871c9afa64..90030495ed 100644 --- a/packages/backend/test/unit/server/api/SkRateLimiterServiceTests.ts +++ b/packages/backend/test/unit/server/api/SkRateLimiterServiceTests.ts @@ -120,7 +120,7 @@ describe(SkRateLimiterService, () => { }); }); - it('should bypass in non-production', async () => { + it('should bypass in test environment', async () => { mockEnvironment.NODE_ENV = 'test'; const info = await serviceUnderTest().limit({ key: 'l', type: undefined, max: 0 }, actor); From 407b2423af31ecaf44035f66a180a0bbc40e3aaa Mon Sep 17 00:00:00 2001 From: Hazelnoot Date: Tue, 10 Dec 2024 19:01:35 -0500 Subject: [PATCH 3/8] fix redis transaction implementation --- packages/backend/src/core/CoreModule.ts | 6 + .../backend/src/core/RedisConnectionPool.ts | 103 ++++++ packages/backend/src/core/TimeoutService.ts | 76 ++++ packages/backend/src/misc/rate-limit-utils.ts | 19 +- .../src/server/api/SkRateLimiterService.ts | 238 ++++++------ .../server/api/SkRateLimiterServiceTests.ts | 346 +++++++----------- 6 files changed, 439 insertions(+), 349 deletions(-) create mode 100644 packages/backend/src/core/RedisConnectionPool.ts create mode 100644 packages/backend/src/core/TimeoutService.ts diff --git a/packages/backend/src/core/CoreModule.ts b/packages/backend/src/core/CoreModule.ts index b18db7f366..caf135ae4b 100644 --- a/packages/backend/src/core/CoreModule.ts +++ b/packages/backend/src/core/CoreModule.ts @@ -155,6 +155,8 @@ import { QueueModule } from './QueueModule.js'; import { QueueService } from './QueueService.js'; import { LoggerService } from './LoggerService.js'; import { SponsorsService } from './SponsorsService.js'; +import { RedisConnectionPool } from './RedisConnectionPool.js'; +import { TimeoutService } from './TimeoutService.js'; import type { Provider } from '@nestjs/common'; //#region 文字列ベースでのinjection用(循環参照対応のため) @@ -383,6 +385,8 @@ const $SponsorsService: Provider = { provide: 'SponsorsService', useExisting: Sp ChannelFollowingService, RegistryApiService, ReversiService, + RedisConnectionPool, + TimeoutService, TimeService, EnvService, @@ -684,6 +688,8 @@ const $SponsorsService: Provider = { provide: 'SponsorsService', useExisting: Sp ChannelFollowingService, RegistryApiService, ReversiService, + RedisConnectionPool, + TimeoutService, TimeService, EnvService, diff --git a/packages/backend/src/core/RedisConnectionPool.ts b/packages/backend/src/core/RedisConnectionPool.ts new file mode 100644 index 0000000000..7ebefdfcb3 --- /dev/null +++ b/packages/backend/src/core/RedisConnectionPool.ts @@ -0,0 +1,103 @@ +/* + * SPDX-FileCopyrightText: hazelnoot and other Sharkey contributors + * SPDX-License-Identifier: AGPL-3.0-only + */ + +import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common'; +import Redis, { RedisOptions } from 'ioredis'; +import { DI } from '@/di-symbols.js'; +import type { Config } from '@/config.js'; +import Logger from '@/logger.js'; +import { Timeout, TimeoutService } from '@/core/TimeoutService.js'; +import { LoggerService } from './LoggerService.js'; + +/** + * Target number of connections to keep open and ready for use. + * The pool may grow beyond this during bursty traffic, but it will always shrink back to this number. + * The pool may remain below this number is the server never experiences enough traffic to consume this many clients. + */ +export const poolSize = 16; + +/** + * How often to drop an idle connection from the pool. + * This will never shrink the pool below poolSize. + */ +export const poolShrinkInterval = 5 * 1000; // 5 seconds + +@Injectable() +export class RedisConnectionPool implements OnApplicationShutdown { + private readonly poolShrinkTimer: Timeout; + private readonly pool: Redis.Redis[] = []; + private readonly logger: Logger; + private readonly redisOptions: RedisOptions; + + constructor(@Inject(DI.config) config: Config, loggerService: LoggerService, timeoutService: TimeoutService) { + this.logger = loggerService.getLogger('redis-pool'); + this.poolShrinkTimer = timeoutService.setInterval(() => this.shrinkPool(), poolShrinkInterval); + this.redisOptions = { + ...config.redis, + + // Set lazyConnect so that we can await() the connection manually. + // This helps to avoid a "stampede" of new connections (which are processed in the background!) under bursty conditions. + lazyConnect: true, + enableOfflineQueue: false, + }; + } + + /** + * Gets a Redis connection from the pool, or creates a new connection if the pool is empty. + * The returned object MUST be returned with a call to free(), even in the case of exceptions! + * Use a try...finally block for safe handling. + */ + public async alloc(): Promise { + let redis = this.pool.pop(); + + // The pool may be empty if we're under heavy load and/or we haven't opened all connections. + // Just construct a new instance, which will eventually be added to the pool. + // Excess clients will be disposed eventually. + if (!redis) { + redis = new Redis.Redis(this.redisOptions); + await redis.connect(); + } + + return redis; + } + + /** + * Returns a Redis connection to the pool. + * The instance MUST not be used after returning! + * Use a try...finally block for safe handling. + */ + public async free(redis: Redis.Redis): Promise { + // https://redis.io/docs/latest/commands/reset/ + await redis.reset(); + + this.pool.push(redis); + } + + public async onApplicationShutdown(): Promise { + // Cancel timer, otherwise it will cause a memory leak + clearInterval(this.poolShrinkTimer); + + // Disconnect all remaining instances + while (this.pool.length > 0) { + await this.dropClient(); + } + } + + private async shrinkPool(): Promise { + this.logger.debug(`Pool size is ${this.pool.length}`); + if (this.pool.length > poolSize) { + await this.dropClient(); + } + } + + private async dropClient(): Promise { + try { + const redis = this.pool.pop(); + await redis?.quit(); + } catch (err) { + this.logger.warn(`Error disconnecting from redis: ${err}`, { err }); + } + } +} diff --git a/packages/backend/src/core/TimeoutService.ts b/packages/backend/src/core/TimeoutService.ts new file mode 100644 index 0000000000..093b9a7b04 --- /dev/null +++ b/packages/backend/src/core/TimeoutService.ts @@ -0,0 +1,76 @@ +/* + * SPDX-FileCopyrightText: hazelnoot and other Sharkey contributors + * SPDX-License-Identifier: AGPL-3.0-only + */ + +/** + * Provides access to setTimeout, setInterval, and related functions. + * Used to support deterministic unit testing. + */ +export class TimeoutService { + /** + * Returns a promise that resolves after the specified timeout in milliseconds. + */ + public delay(timeout: number): Promise { + return new Promise(resolve => { + this.setTimeout(resolve, timeout); + }); + } + + /** + * Passthrough to node's setTimeout + */ + public setTimeout(handler: TimeoutHandler, timeout?: number): Timeout { + return setTimeout(() => handler(), timeout); + } + + /** + * Passthrough to node's setInterval + */ + public setInterval(handler: TimeoutHandler, timeout?: number): Timeout { + return setInterval(() => handler(), timeout); + } + + /** + * Passthrough to node's clearTimeout + */ + public clearTimeout(timeout: Timeout) { + clearTimeout(timeout); + } + + /** + * Passthrough to node's clearInterval + */ + public clearInterval(timeout: Timeout) { + clearInterval(timeout); + } +} + +/** + * Function to be called when a timer or interval elapses. + */ +export type TimeoutHandler = () => void; + +/** + * A fucked TS issue causes the DOM setTimeout to get merged with Node setTimeout, creating a "quantum method" that returns either "number" or "NodeJS.Timeout" depending on how it's called. + * This would be fine, except it always matches the *wrong type*! + * The result is this "impossible" scenario: + * + * ```typescript + * // Test evaluates to "false", because the method's return type is not equal to itself. + * type Test = ReturnType extends ReturnType ? true : false; + * + * // This is a compiler error, because the type is broken and triggers some internal TS bug. + * const timeout = setTimeout(handler); + * clearTimeout(timeout); // compiler error here, because even type inference doesn't work. + * + * // This fails to compile. + * function test(handler, timeout): ReturnType { + * return setTimeout(handler, timeout); + * } + * ``` + * + * The bug is marked as "wontfix" by TS devs, so we have to work around it ourselves. -_- + * By forcing the return type to *explicitly* include both types, we at least make it possible to work with the resulting token. + */ +export type Timeout = NodeJS.Timeout | number; diff --git a/packages/backend/src/misc/rate-limit-utils.ts b/packages/backend/src/misc/rate-limit-utils.ts index 9909bb97fa..cc13111390 100644 --- a/packages/backend/src/misc/rate-limit-utils.ts +++ b/packages/backend/src/misc/rate-limit-utils.ts @@ -117,12 +117,27 @@ export interface LimitInfo { fullResetMs: number; } +export const disabledLimitInfo: Readonly = Object.freeze({ + blocked: false, + remaining: Number.MAX_SAFE_INTEGER, + resetSec: 0, + resetMs: 0, + fullResetSec: 0, + fullResetMs: 0, +}); + export function isLegacyRateLimit(limit: RateLimit): limit is LegacyRateLimit { return limit.type === undefined; } -export function hasMinLimit(limit: LegacyRateLimit): limit is LegacyRateLimit & { minInterval: number } { - return !!limit.minInterval; +export type MaxLegacyLimit = LegacyRateLimit & { duration: number, max: number }; +export function hasMaxLimit(limit: LegacyRateLimit): limit is MaxLegacyLimit { + return limit.max != null && limit.duration != null; +} + +export type MinLegacyLimit = LegacyRateLimit & { minInterval: number }; +export function hasMinLimit(limit: LegacyRateLimit): limit is MinLegacyLimit { + return limit.minInterval != null; } export function sendRateLimitHeaders(reply: FastifyReply, info: LimitInfo): void { diff --git a/packages/backend/src/server/api/SkRateLimiterService.ts b/packages/backend/src/server/api/SkRateLimiterService.ts index b11d1556ba..71681aadc9 100644 --- a/packages/backend/src/server/api/SkRateLimiterService.ts +++ b/packages/backend/src/server/api/SkRateLimiterService.ts @@ -7,8 +7,9 @@ import { Inject, Injectable } from '@nestjs/common'; import Redis from 'ioredis'; import { TimeService } from '@/core/TimeService.js'; import { EnvService } from '@/core/EnvService.js'; -import { DI } from '@/di-symbols.js'; -import { BucketRateLimit, LegacyRateLimit, LimitInfo, RateLimit, hasMinLimit, isLegacyRateLimit, Keyed } from '@/misc/rate-limit-utils.js'; +import { BucketRateLimit, LegacyRateLimit, LimitInfo, RateLimit, hasMinLimit, isLegacyRateLimit, Keyed, hasMaxLimit, disabledLimitInfo, MaxLegacyLimit, MinLegacyLimit } from '@/misc/rate-limit-utils.js'; +import { RedisConnectionPool } from '@/core/RedisConnectionPool.js'; +import { TimeoutService } from '@/core/TimeoutService.js'; @Injectable() export class SkRateLimiterService { @@ -18,8 +19,11 @@ export class SkRateLimiterService { @Inject(TimeService) private readonly timeService: TimeService, - @Inject(DI.redis) - private readonly redisClient: Redis.Redis, + @Inject(TimeoutService) + private readonly timeoutService: TimeoutService, + + @Inject(RedisConnectionPool) + private readonly redisPool: RedisConnectionPool, @Inject(EnvService) envService: EnvService, @@ -29,117 +33,110 @@ export class SkRateLimiterService { public async limit(limit: Keyed, actor: string, factor = 1): Promise { if (this.disabled || factor === 0) { - return { - blocked: false, - remaining: Number.MAX_SAFE_INTEGER, - resetSec: 0, - resetMs: 0, - fullResetSec: 0, - fullResetMs: 0, - }; + return disabledLimitInfo; } if (factor < 0) { throw new Error(`Rate limit factor is zero or negative: ${factor}`); } - return await this.tryLimit(limit, actor, factor); + const redis = await this.redisPool.alloc(); + try { + return await this.tryLimit(redis, limit, actor, factor); + } finally { + await this.redisPool.free(redis); + } } - private async tryLimit(limit: Keyed, actor: string, factor: number, retry = 1): Promise { + private async tryLimit(redis: Redis.Redis, limit: Keyed, actor: string, factor: number, retry = 0): Promise { try { + if (retry > 0) { + // Real-world testing showed the need for backoff to "spread out" bursty traffic. + const backoff = Math.round(Math.pow(2, retry + Math.random())); + await this.timeoutService.delay(backoff); + } + if (isLegacyRateLimit(limit)) { - return await this.limitLegacy(limit, actor, factor); + return await this.limitLegacy(redis, limit, actor, factor); } else { - return await this.limitBucket(limit, actor, factor); + return await this.limitBucket(redis, limit, actor, factor); } } catch (err) { // We may experience collision errors from optimistic locking. // This is expected, so we should retry a few times before giving up. // https://redis.io/docs/latest/develop/interact/transactions/#optimistic-locking-using-check-and-set - if (err instanceof TransactionError && retry < 3) { - return await this.tryLimit(limit, actor, factor, retry + 1); + if (err instanceof ConflictError && retry < 4) { + // We can reuse the same connection to reduce pool contention, but we have to reset it first. + await redis.reset(); + return await this.tryLimit(redis, limit, actor, factor, retry + 1); } throw err; } } - private async limitLegacy(limit: Keyed, actor: string, factor: number): Promise { - const promises: Promise[] = []; - - // The "min" limit - if present - is handled directly. - if (hasMinLimit(limit)) { - promises.push( - this.limitMin(limit, actor, factor), - ); + private async limitLegacy(redis: Redis.Redis, limit: Keyed, actor: string, factor: number): Promise { + if (hasMaxLimit(limit)) { + return await this.limitMaxLegacy(redis, limit, actor, factor); + } else if (hasMinLimit(limit)) { + return await this.limitMinLegacy(redis, limit, actor, factor); + } else { + return disabledLimitInfo; } - - // Convert the "max" limit into a leaky bucket with 1 drip / second rate. - if (limit.max != null && limit.duration != null) { - promises.push( - this.limitBucket({ - type: 'bucket', - key: limit.key, - size: limit.max, - dripRate: Math.max(Math.round(limit.duration / limit.max), 1), - }, actor, factor), - ); - } - - const [lim1, lim2] = await Promise.all(promises); - return { - blocked: (lim1?.blocked || lim2?.blocked) ?? false, - remaining: Math.min(lim1?.remaining ?? Number.MAX_SAFE_INTEGER, lim2?.remaining ?? Number.MAX_SAFE_INTEGER), - resetSec: Math.max(lim1?.resetSec ?? 0, lim2?.resetSec ?? 0), - resetMs: Math.max(lim1?.resetMs ?? 0, lim2?.resetMs ?? 0), - fullResetSec: Math.max(lim1?.fullResetSec ?? 0, lim2?.fullResetSec ?? 0), - fullResetMs: Math.max(lim1?.fullResetMs ?? 0, lim2?.fullResetMs ?? 0), - }; } - private async limitMin(limit: Keyed & { minInterval: number }, actor: string, factor: number): Promise { - if (limit.minInterval === 0) return null; + private async limitMaxLegacy(redis: Redis.Redis, limit: Keyed, actor: string, factor: number): Promise { + if (limit.duration === 0) return disabledLimitInfo; + if (limit.duration < 0) throw new Error(`Invalid rate limit ${limit.key}: duration is negative (${limit.duration})`); + if (limit.max < 1) throw new Error(`Invalid rate limit ${limit.key}: max is less than 1 (${limit.max})`); + + // Derive initial dripRate from minInterval OR duration/max. + const initialDripRate = Math.max(limit.minInterval ?? Math.round(limit.duration / limit.max), 1); + + // Calculate dripSize to reach max at exactly duration + const dripSize = Math.max(Math.round(limit.max / (limit.duration / initialDripRate)), 1); + + // Calculate final dripRate from dripSize and duration/max + const dripRate = Math.max(Math.round(limit.duration / (limit.max / dripSize)), 1); + + const bucketLimit: Keyed = { + type: 'bucket', + key: limit.key, + size: limit.max, + dripRate, + dripSize, + }; + return await this.limitBucket(redis, bucketLimit, actor, factor); + } + + private async limitMinLegacy(redis: Redis.Redis, limit: Keyed, actor: string, factor: number): Promise { + if (limit.minInterval === 0) return disabledLimitInfo; if (limit.minInterval < 0) throw new Error(`Invalid rate limit ${limit.key}: minInterval is negative (${limit.minInterval})`); - const minInterval = Math.max(Math.ceil(limit.minInterval * factor), 0); - const expirationSec = Math.max(Math.ceil(minInterval / 1000), 1); - - // Check for window clear - const counter = await this.getLimitCounter(limit, actor, 'min'); - if (counter.counter > 0) { - const isCleared = this.timeService.now - counter.timestamp >= minInterval; - if (isCleared) { - counter.counter = 0; - } - } - - // Increment the limit, then synchronize with redis - const blocked = counter.counter > 0; - if (!blocked) { - counter.counter++; - counter.timestamp = this.timeService.now; - await this.updateLimitCounter(limit, actor, 'min', expirationSec, counter); - } - - // Calculate limit status - const resetMs = Math.max(minInterval - (this.timeService.now - counter.timestamp), 0); - const resetSec = Math.ceil(resetMs / 1000); - return { blocked, remaining: 0, resetSec, resetMs, fullResetSec: resetSec, fullResetMs: resetMs }; + const dripRate = Math.max(Math.round(limit.minInterval), 1); + const bucketLimit: Keyed = { + type: 'bucket', + key: limit.key, + size: 1, + dripRate, + dripSize: 1, + }; + return await this.limitBucket(redis, bucketLimit, actor, factor); } - private async limitBucket(limit: Keyed, actor: string, factor: number): Promise { + private async limitBucket(redis: Redis.Redis, limit: Keyed, actor: string, factor: number): Promise { if (limit.size < 1) throw new Error(`Invalid rate limit ${limit.key}: size is less than 1 (${limit.size})`); if (limit.dripRate != null && limit.dripRate < 1) throw new Error(`Invalid rate limit ${limit.key}: dripRate is less than 1 (${limit.dripRate})`); if (limit.dripSize != null && limit.dripSize < 1) throw new Error(`Invalid rate limit ${limit.key}: dripSize is less than 1 (${limit.dripSize})`); + const redisKey = createLimitKey(limit, actor); const bucketSize = Math.max(Math.ceil(limit.size / factor), 1); const dripRate = Math.ceil(limit.dripRate ?? 1000); const dripSize = Math.ceil(limit.dripSize ?? 1); const expirationSec = Math.max(Math.ceil(bucketSize / dripRate), 1); // Simulate bucket drips - const counter = await this.getLimitCounter(limit, actor, 'bucket'); + const counter = await this.getLimitCounter(redis, redisKey); if (counter.counter > 0) { const dripsSinceLastTick = Math.floor((this.timeService.now - counter.timestamp) / dripRate) * dripSize; counter.counter = Math.max(counter.counter - dripsSinceLastTick, 0); @@ -150,7 +147,7 @@ export class SkRateLimiterService { if (!blocked) { counter.counter++; counter.timestamp = this.timeService.now; - await this.updateLimitCounter(limit, actor, 'bucket', expirationSec, counter); + await this.updateLimitCounter(redis, redisKey, expirationSec, counter); } // Calculate how much time is needed to free up a bucket slot @@ -167,60 +164,49 @@ export class SkRateLimiterService { return { blocked, remaining, resetSec, resetMs, fullResetSec, fullResetMs }; } - private async getLimitCounter(limit: Keyed, actor: string, subject: string): Promise { - const timestampKey = createLimitKey(limit, actor, subject, 't'); - const counterKey = createLimitKey(limit, actor, subject, 'c'); + private async getLimitCounter(redis: Redis.Redis, key: string): Promise { + const counter: LimitCounter = { counter: 0, timestamp: 0 }; - const [timestamp, counter] = await this.executeRedis( - [ - ['get', timestampKey], - ['get', counterKey], - ], - [ - timestampKey, - counterKey, - ], - ); + // Watch the key BEFORE reading it! + await redis.watch(key); + const data = await redis.get(key); - return { - timestamp: timestamp ? parseInt(timestamp) : 0, - counter: counter ? parseInt(counter) : 0, - }; - } - - private async updateLimitCounter(limit: Keyed, actor: string, subject: string, expirationSec: number, counter: LimitCounter): Promise { - const timestampKey = createLimitKey(limit, actor, subject, 't'); - const counterKey = createLimitKey(limit, actor, subject, 'c'); - - await this.executeRedis( - [ - ['set', timestampKey, counter.timestamp.toString(), 'EX', expirationSec], - ['set', counterKey, counter.counter.toString(), 'EX', expirationSec], - ], - [ - timestampKey, - counterKey, - ], - ); - } - - private async executeRedis(batch: RedisBatch, watch: string[]): Promise> { - const results = await this.redisClient - .multi(batch) - .watch(watch) - .exec(); - - // Transaction error - if (!results) { - throw new TransactionError('Redis error: transaction conflict'); + // Data may be missing or corrupt if the key doesn't exist. + // This is an expected edge case. + if (data) { + const parts = data.split(':'); + if (parts.length === 2) { + counter.counter = parseInt(parts[0]); + counter.timestamp = parseInt(parts[1]); + } } - // The entire call failed + return counter; + } + + private async updateLimitCounter(redis: Redis.Redis, key: string, expirationSec: number, counter: LimitCounter): Promise { + const data = `${counter.counter}:${counter.timestamp}`; + + await this.executeRedisMulti( + redis, + [['set', key, data, 'EX', expirationSec]], + ); + } + + private async executeRedisMulti(redis: Redis.Redis, batch: RedisBatch): Promise> { + const results = await redis.multi(batch).exec(); + + // Transaction conflict (retryable) + if (!results) { + throw new ConflictError('Redis error: transaction conflict'); + } + + // Transaction failed (fatal) if (results.length !== batch.length) { throw new Error('Redis error: failed to execute batch'); } - // A particular command failed + // Command failed (fatal) const errors = results.map(r => r[0]).filter(e => e != null); if (errors.length > 0) { throw new AggregateError(errors, `Redis error: failed to execute command(s): '${errors.join('\', \'')}'`); @@ -233,11 +219,11 @@ export class SkRateLimiterService { type RedisBatch = [string, ...unknown[]][] & { length: Num }; type RedisResults = (string | null)[] & { length: Num }; -function createLimitKey(limit: Keyed, actor: string, subject: string, value: string): string { - return `rl_${actor}_${limit.key}_${subject}_${value}`; +function createLimitKey(limit: Keyed, actor: string): string { + return `rl_${actor}_${limit.key}`; } -class TransactionError extends Error {} +class ConflictError extends Error {} interface LimitCounter { timestamp: number; diff --git a/packages/backend/test/unit/server/api/SkRateLimiterServiceTests.ts b/packages/backend/test/unit/server/api/SkRateLimiterServiceTests.ts index 90030495ed..deb6b9f80e 100644 --- a/packages/backend/test/unit/server/api/SkRateLimiterServiceTests.ts +++ b/packages/backend/test/unit/server/api/SkRateLimiterServiceTests.ts @@ -6,6 +6,8 @@ import type Redis from 'ioredis'; import { SkRateLimiterService } from '@/server/api/SkRateLimiterService.js'; import { BucketRateLimit, Keyed, LegacyRateLimit } from '@/misc/rate-limit-utils.js'; +import { RedisConnectionPool } from '@/core/RedisConnectionPool.js'; +import { Timeout, TimeoutHandler, TimeoutService } from '@/core/TimeoutService.js'; /* eslint-disable @typescript-eslint/no-non-null-assertion */ @@ -24,28 +26,50 @@ describe(SkRateLimiterService, () => { }, }; + function callMockRedis(command: [string, ...unknown[]]) { + const handlerResults = mockRedis.map(handler => handler(command)); + const finalResult = handlerResults.findLast(result => result != null); + return finalResult ?? [null, null]; + } + + // I apologize to anyone who tries to read this later 🥲 mockRedis = []; mockRedisExec = (batch) => { const results: [Error | null, unknown][] = batch.map(command => { - const handlerResults = mockRedis.map(handler => handler(command)); - const finalResult = handlerResults.findLast(result => result != null); - return finalResult ?? [new Error('test error: no handler'), null]; + return callMockRedis(command); }); return Promise.resolve(results); }; const mockRedisClient = { + watch(...args: unknown[]) { + const result = callMockRedis(['watch', ...args]); + return Promise.resolve(result[0] ?? result[1]); + }, + get(...args: unknown[]) { + const result = callMockRedis(['get', ...args]); + return Promise.resolve(result[0] ?? result[1]); + }, + set(...args: unknown[]) { + const result = callMockRedis(['set', ...args]); + return Promise.resolve(result[0] ?? result[1]); + }, multi(batch: [string, ...unknown[]][]) { return { - watch() { - return { - exec() { - return mockRedisExec(batch); - }, - }; + exec() { + return mockRedisExec(batch); }, }; }, + reset() { + return Promise.resolve(); + }, } as unknown as Redis.Redis; + const mockRedisPool = { + alloc() { + return Promise.resolve(mockRedisClient); + }, + free() {}, + } as unknown as RedisConnectionPool; mockEnvironment = Object.create(process.env); mockEnvironment.NODE_ENV = 'production'; @@ -53,9 +77,22 @@ describe(SkRateLimiterService, () => { env: mockEnvironment, }; + const mockTimeoutService = new class extends TimeoutService { + setTimeout(handler: TimeoutHandler): Timeout { + handler(); + return 0; + } + setInterval(handler: TimeoutHandler): Timeout { + handler(); + return 0; + } + clearTimeout() {} + clearInterval() {} + }; + let service: SkRateLimiterService | undefined = undefined; serviceUnderTest = () => { - return service ??= new SkRateLimiterService(mockTimeService, mockRedisClient, mockEnvService); + return service ??= new SkRateLimiterService(mockTimeService, mockTimeoutService, mockRedisPool, mockEnvService); }; }); @@ -65,56 +102,22 @@ describe(SkRateLimiterService, () => { let limitCounter: number | undefined = undefined; let limitTimestamp: number | undefined = undefined; - let minCounter: number | undefined = undefined; - let minTimestamp: number | undefined = undefined; beforeEach(() => { limitCounter = undefined; limitTimestamp = undefined; - minCounter = undefined; - minTimestamp = undefined; mockRedis.push(([command, ...args]) => { - if (command === 'set' && args[0] === 'rl_actor_test_bucket_t') { - limitTimestamp = parseInt(args[1] as string); + if (command === 'set' && args[0] === 'rl_actor_test') { + const parts = (args[1] as string).split(':'); + limitCounter = parseInt(parts[0] as string); + limitTimestamp = parseInt(parts[1] as string); return [null, args[1]]; } - if (command === 'get' && args[0] === 'rl_actor_test_bucket_t') { - return [null, limitTimestamp?.toString() ?? null]; + if (command === 'get' && args[0] === 'rl_actor_test') { + const data = `${limitCounter ?? 0}:${limitTimestamp ?? 0}`; + return [null, data]; } - // if (command === 'incr' && args[0] === 'rl_actor_test_bucket_c') { - // limitCounter = (limitCounter ?? 0) + 1; - // return [null, null]; - // } - if (command === 'set' && args[0] === 'rl_actor_test_bucket_c') { - limitCounter = parseInt(args[1] as string); - return [null, args[1]]; - } - if (command === 'get' && args[0] === 'rl_actor_test_bucket_c') { - return [null, limitCounter?.toString() ?? null]; - } - - if (command === 'set' && args[0] === 'rl_actor_test_min_t') { - minTimestamp = parseInt(args[1] as string); - return [null, args[1]]; - } - if (command === 'get' && args[0] === 'rl_actor_test_min_t') { - return [null, minTimestamp?.toString() ?? null]; - } - // if (command === 'incr' && args[0] === 'rl_actor_test_min_c') { - // minCounter = (minCounter ?? 0) + 1; - // return [null, null]; - // } - if (command === 'set' && args[0] === 'rl_actor_test_min_c') { - minCounter = parseInt(args[1] as string); - return [null, args[1]]; - } - if (command === 'get' && args[0] === 'rl_actor_test_min_c') { - return [null, minCounter?.toString() ?? null]; - } - // if (command === 'expire') { - // return [null, null]; - // } return null; }); @@ -266,19 +269,7 @@ describe(SkRateLimiterService, () => { await serviceUnderTest().limit(limit, actor); - expect(commands).toContainEqual(['set', 'rl_actor_test_bucket_c', '1', 'EX', 1]); - }); - - it('should set timestamp expiration', async () => { - const commands: unknown[][] = []; - mockRedis.push(command => { - commands.push(command); - return null; - }); - - await serviceUnderTest().limit(limit, actor); - - expect(commands).toContainEqual(['set', 'rl_actor_test_bucket_t', '0', 'EX', 1]); + expect(commands).toContainEqual(['set', 'rl_actor_test', '1:0', 'EX', 1]); }); it('should not increment when already blocked', async () => { @@ -379,12 +370,12 @@ describe(SkRateLimiterService, () => { it('should retry when redis conflicts', async () => { let numCalls = 0; - const realMockRedisExec = mockRedisExec; + const originalExec = mockRedisExec; mockRedisExec = () => { - if (numCalls > 0) { - mockRedisExec = realMockRedisExec; - } numCalls++; + if (numCalls > 1) { + mockRedisExec = originalExec; + } return Promise.resolve(null); }; @@ -393,7 +384,7 @@ describe(SkRateLimiterService, () => { expect(numCalls).toBe(2); }); - it('should bail out after 3 tries', async () => { + it('should bail out after 5 tries', async () => { let numCalls = 0; mockRedisExec = () => { numCalls++; @@ -403,7 +394,7 @@ describe(SkRateLimiterService, () => { const promise = serviceUnderTest().limit(limit, actor); await expect(promise).rejects.toThrow(/transaction conflict/); - expect(numCalls).toBe(3); + expect(numCalls).toBe(5); }); it('should apply correction if extra calls slip through', async () => { @@ -450,8 +441,8 @@ describe(SkRateLimiterService, () => { it('should increment counter when called', async () => { await serviceUnderTest().limit(limit, actor); - expect(minCounter).not.toBeUndefined(); - expect(minCounter).toBe(1); + expect(limitCounter).not.toBeUndefined(); + expect(limitCounter).toBe(1); }); it('should set timestamp when called', async () => { @@ -459,30 +450,19 @@ describe(SkRateLimiterService, () => { await serviceUnderTest().limit(limit, actor); - expect(minCounter).not.toBeUndefined(); - expect(minTimestamp).toBe(1000); + expect(limitCounter).not.toBeUndefined(); + expect(limitTimestamp).toBe(1000); }); it('should decrement counter when minInterval has passed', async () => { - minCounter = 1; - minTimestamp = 0; + limitCounter = 1; + limitTimestamp = 0; mockTimeService.now = 1000; await serviceUnderTest().limit(limit, actor); - expect(minCounter).not.toBeUndefined(); - expect(minCounter).toBe(1); // 1 (starting) - 1 (interval) + 1 (call) = 1 - }); - - it('should reset counter entirely', async () => { - minCounter = 2; - minTimestamp = 0; - mockTimeService.now = 1000; - - await serviceUnderTest().limit(limit, actor); - - expect(minCounter).not.toBeUndefined(); - expect(minCounter).toBe(1); // 2 (starting) - 2 (interval) + 1 (call) = 1 + expect(limitCounter).not.toBeUndefined(); + expect(limitCounter).toBe(1); // 1 (starting) - 1 (interval) + 1 (call) = 1 }); it('should maintain counter between calls over time', async () => { @@ -495,13 +475,13 @@ describe(SkRateLimiterService, () => { mockTimeService.now += 1000; // 0 - 1 = 0 await serviceUnderTest().limit(limit, actor); // 0 + 1 = 1 - expect(minCounter).toBe(1); - expect(minTimestamp).toBe(3000); + expect(limitCounter).toBe(1); + expect(limitTimestamp).toBe(3000); }); it('should block when interval exceeded', async () => { - minCounter = 1; - minTimestamp = 0; + limitCounter = 1; + limitTimestamp = 0; const info = await serviceUnderTest().limit(limit, actor); @@ -509,8 +489,8 @@ describe(SkRateLimiterService, () => { }); it('should calculate correct info when blocked', async () => { - minCounter = 1; - minTimestamp = 0; + limitCounter = 1; + limitTimestamp = 0; const info = await serviceUnderTest().limit(limit, actor); @@ -521,8 +501,8 @@ describe(SkRateLimiterService, () => { }); it('should allow when bucket is filled but interval has passed', async () => { - minCounter = 1; - minTimestamp = 0; + limitCounter = 1; + limitTimestamp = 0; mockTimeService.now = 1000; const info = await serviceUnderTest().limit(limit, actor); @@ -531,8 +511,8 @@ describe(SkRateLimiterService, () => { }); it('should scale interval by factor', async () => { - minCounter = 1; - minTimestamp = 0; + limitCounter = 1; + limitTimestamp = 0; mockTimeService.now += 500; const info = await serviceUnderTest().limit(limit, actor, 0.5); @@ -549,30 +529,18 @@ describe(SkRateLimiterService, () => { await serviceUnderTest().limit(limit, actor); - expect(commands).toContainEqual(['set', 'rl_actor_test_min_c', '1', 'EX', 1]); - }); - - it('should set timestamp expiration', async () => { - const commands: unknown[][] = []; - mockRedis.push(command => { - commands.push(command); - return null; - }); - - await serviceUnderTest().limit(limit, actor); - - expect(commands).toContainEqual(['set', 'rl_actor_test_min_t', '0', 'EX', 1]); + expect(commands).toContainEqual(['set', 'rl_actor_test', '1:0', 'EX', 1]); }); it('should not increment when already blocked', async () => { - minCounter = 1; - minTimestamp = 0; + limitCounter = 1; + limitTimestamp = 0; mockTimeService.now += 100; await serviceUnderTest().limit(limit, actor); - expect(minCounter).toBe(1); - expect(minTimestamp).toBe(0); + expect(limitCounter).toBe(1); + expect(limitTimestamp).toBe(0); }); it('should skip if factor is zero', async () => { @@ -605,17 +573,17 @@ describe(SkRateLimiterService, () => { await expect(promise).rejects.toThrow(/minInterval is negative/); }); - it('should not apply correction to extra calls', async () => { - minCounter = 2; + it('should apply correction if extra calls slip through', async () => { + limitCounter = 2; const info = await serviceUnderTest().limit(limit, actor); expect(info.blocked).toBeTruthy(); expect(info.remaining).toBe(0); - expect(info.resetMs).toBe(1000); - expect(info.resetSec).toBe(1); - expect(info.fullResetMs).toBe(1000); - expect(info.fullResetSec).toBe(1); + expect(info.resetMs).toBe(2000); + expect(info.resetSec).toBe(2); + expect(info.fullResetMs).toBe(2000); + expect(info.fullResetSec).toBe(2); }); }); @@ -720,19 +688,7 @@ describe(SkRateLimiterService, () => { await serviceUnderTest().limit(limit, actor); - expect(commands).toContainEqual(['set', 'rl_actor_test_bucket_c', '1', 'EX', 1]); - }); - - it('should set timestamp expiration', async () => { - const commands: unknown[][] = []; - mockRedis.push(command => { - commands.push(command); - return null; - }); - - await serviceUnderTest().limit(limit, actor); - - expect(commands).toContainEqual(['set', 'rl_actor_test_bucket_t', '0', 'EX', 1]); + expect(commands).toContainEqual(['set', 'rl_actor_test', '1:0', 'EX', 1]); }); it('should not increment when already blocked', async () => { @@ -774,12 +730,21 @@ describe(SkRateLimiterService, () => { await expect(promise).rejects.toThrow(/factor is zero or negative/); }); + it('should skip if duration is zero', async () => { + limit.duration = 0; + + const info = await serviceUnderTest().limit(limit, actor); + + expect(info.blocked).toBeFalsy(); + expect(info.remaining).toBe(Number.MAX_SAFE_INTEGER); + }); + it('should throw if max is zero', async () => { limit.max = 0; const promise = serviceUnderTest().limit(limit, actor); - await expect(promise).rejects.toThrow(/size is less than 1/); + await expect(promise).rejects.toThrow(/max is less than 1/); }); it('should throw if max is negative', async () => { @@ -787,7 +752,7 @@ describe(SkRateLimiterService, () => { const promise = serviceUnderTest().limit(limit, actor); - await expect(promise).rejects.toThrow(/size is less than 1/); + await expect(promise).rejects.toThrow(/max is less than 1/); }); it('should apply correction if extra calls slip through', async () => { @@ -811,7 +776,7 @@ describe(SkRateLimiterService, () => { limit = { type: undefined, key, - max: 5, + max: 10, duration: 5000, minInterval: 1000, }; @@ -824,7 +789,7 @@ describe(SkRateLimiterService, () => { }); it('should block when limit exceeded', async () => { - limitCounter = 5; + limitCounter = 10; limitTimestamp = 0; const info = await serviceUnderTest().limit(limit, actor); @@ -832,30 +797,8 @@ describe(SkRateLimiterService, () => { expect(info.blocked).toBeTruthy(); }); - it('should block when minInterval exceeded', async () => { - minCounter = 1; - minTimestamp = 0; - - const info = await serviceUnderTest().limit(limit, actor); - - expect(info.blocked).toBeTruthy(); - }); - it('should calculate correct info when allowed', async () => { - limitCounter = 1; - limitTimestamp = 0; - - const info = await serviceUnderTest().limit(limit, actor); - - expect(info.remaining).toBe(0); - expect(info.resetSec).toBe(1); - expect(info.resetMs).toBe(1000); - expect(info.fullResetSec).toBe(2); - expect(info.fullResetMs).toBe(2000); - }); - - it('should calculate correct info when blocked by limit', async () => { - limitCounter = 5; + limitCounter = 9; limitTimestamp = 0; const info = await serviceUnderTest().limit(limit, actor); @@ -867,17 +810,17 @@ describe(SkRateLimiterService, () => { expect(info.fullResetMs).toBe(5000); }); - it('should calculate correct info when blocked by minInterval', async () => { - minCounter = 1; - minTimestamp = 0; + it('should calculate correct info when blocked', async () => { + limitCounter = 10; + limitTimestamp = 0; const info = await serviceUnderTest().limit(limit, actor); expect(info.remaining).toBe(0); expect(info.resetSec).toBe(1); expect(info.resetMs).toBe(1000); - expect(info.fullResetSec).toBe(1); - expect(info.fullResetMs).toBe(1000); + expect(info.fullResetSec).toBe(5); + expect(info.fullResetMs).toBe(5000); }); it('should allow when counter is filled but interval has passed', async () => { @@ -890,21 +833,23 @@ describe(SkRateLimiterService, () => { expect(info.blocked).toBeFalsy(); }); - it('should allow when minCounter is filled but interval has passed', async () => { - minCounter = 1; - minTimestamp = 0; - mockTimeService.now = 1000; + it('should drip according to minInterval', async () => { + limitCounter = 10; + limitTimestamp = 0; + mockTimeService.now += 1000; - const info = await serviceUnderTest().limit(limit, actor); + const i1 = await serviceUnderTest().limit(limit, actor); + const i2 = await serviceUnderTest().limit(limit, actor); + const i3 = await serviceUnderTest().limit(limit, actor); - expect(info.blocked).toBeFalsy(); + expect(i1.blocked).toBeFalsy(); + expect(i2.blocked).toBeFalsy(); + expect(i3.blocked).toBeTruthy(); }); it('should scale limit and interval by factor', async () => { limitCounter = 5; limitTimestamp = 0; - minCounter = 1; - minTimestamp = 0; mockTimeService.now += 500; const info = await serviceUnderTest().limit(limit, actor, 0.5); @@ -912,7 +857,7 @@ describe(SkRateLimiterService, () => { expect(info.blocked).toBeFalsy(); }); - it('should set bucket counter expiration', async () => { + it('should set counter expiration', async () => { const commands: unknown[][] = []; mockRedis.push(command => { commands.push(command); @@ -921,63 +866,22 @@ describe(SkRateLimiterService, () => { await serviceUnderTest().limit(limit, actor); - expect(commands).toContainEqual(['set', 'rl_actor_test_bucket_c', '1', 'EX', 1]); - }); - - it('should set bucket timestamp expiration', async () => { - const commands: unknown[][] = []; - mockRedis.push(command => { - commands.push(command); - return null; - }); - - await serviceUnderTest().limit(limit, actor); - - expect(commands).toContainEqual(['set', 'rl_actor_test_bucket_t', '0', 'EX', 1]); - }); - - it('should set min counter expiration', async () => { - const commands: unknown[][] = []; - mockRedis.push(command => { - commands.push(command); - return null; - }); - - await serviceUnderTest().limit(limit, actor); - - expect(commands).toContainEqual(['set', 'rl_actor_test_min_c', '1', 'EX', 1]); - }); - - it('should set min timestamp expiration', async () => { - const commands: unknown[][] = []; - mockRedis.push(command => { - commands.push(command); - return null; - }); - - await serviceUnderTest().limit(limit, actor); - - expect(commands).toContainEqual(['set', 'rl_actor_test_min_t', '0', 'EX', 1]); + expect(commands).toContainEqual(['set', 'rl_actor_test', '1:0', 'EX', 1]); }); it('should not increment when already blocked', async () => { - limitCounter = 5; + limitCounter = 10; limitTimestamp = 0; - minCounter = 1; - minTimestamp = 0; mockTimeService.now += 100; await serviceUnderTest().limit(limit, actor); - expect(limitCounter).toBe(5); + expect(limitCounter).toBe(10); expect(limitTimestamp).toBe(0); - expect(minCounter).toBe(1); - expect(minTimestamp).toBe(0); }); it('should apply correction if extra calls slip through', async () => { - limitCounter = 6; - minCounter = 6; + limitCounter = 12; const info = await serviceUnderTest().limit(limit, actor); From 0ea9d6ec5d4f037b37a98603f8942404530f2802 Mon Sep 17 00:00:00 2001 From: Hazelnoot Date: Wed, 11 Dec 2024 09:10:11 -0500 Subject: [PATCH 4/8] use atomic variant of Leaky Bucket for safe concurrent rate limits --- packages/backend/src/core/CoreModule.ts | 6 - .../backend/src/core/RedisConnectionPool.ts | 103 --------- packages/backend/src/core/TimeoutService.ts | 76 ------- .../src/server/SkRateLimiterService.md | 143 ++++++++++++ .../src/server/api/SkRateLimiterService.ts | 212 ++++++++++-------- .../server/api/SkRateLimiterServiceTests.ts | 182 +++++++++------ 6 files changed, 378 insertions(+), 344 deletions(-) delete mode 100644 packages/backend/src/core/RedisConnectionPool.ts delete mode 100644 packages/backend/src/core/TimeoutService.ts create mode 100644 packages/backend/src/server/SkRateLimiterService.md diff --git a/packages/backend/src/core/CoreModule.ts b/packages/backend/src/core/CoreModule.ts index caf135ae4b..b18db7f366 100644 --- a/packages/backend/src/core/CoreModule.ts +++ b/packages/backend/src/core/CoreModule.ts @@ -155,8 +155,6 @@ import { QueueModule } from './QueueModule.js'; import { QueueService } from './QueueService.js'; import { LoggerService } from './LoggerService.js'; import { SponsorsService } from './SponsorsService.js'; -import { RedisConnectionPool } from './RedisConnectionPool.js'; -import { TimeoutService } from './TimeoutService.js'; import type { Provider } from '@nestjs/common'; //#region 文字列ベースでのinjection用(循環参照対応のため) @@ -385,8 +383,6 @@ const $SponsorsService: Provider = { provide: 'SponsorsService', useExisting: Sp ChannelFollowingService, RegistryApiService, ReversiService, - RedisConnectionPool, - TimeoutService, TimeService, EnvService, @@ -688,8 +684,6 @@ const $SponsorsService: Provider = { provide: 'SponsorsService', useExisting: Sp ChannelFollowingService, RegistryApiService, ReversiService, - RedisConnectionPool, - TimeoutService, TimeService, EnvService, diff --git a/packages/backend/src/core/RedisConnectionPool.ts b/packages/backend/src/core/RedisConnectionPool.ts deleted file mode 100644 index 7ebefdfcb3..0000000000 --- a/packages/backend/src/core/RedisConnectionPool.ts +++ /dev/null @@ -1,103 +0,0 @@ -/* - * SPDX-FileCopyrightText: hazelnoot and other Sharkey contributors - * SPDX-License-Identifier: AGPL-3.0-only - */ - -import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common'; -import Redis, { RedisOptions } from 'ioredis'; -import { DI } from '@/di-symbols.js'; -import type { Config } from '@/config.js'; -import Logger from '@/logger.js'; -import { Timeout, TimeoutService } from '@/core/TimeoutService.js'; -import { LoggerService } from './LoggerService.js'; - -/** - * Target number of connections to keep open and ready for use. - * The pool may grow beyond this during bursty traffic, but it will always shrink back to this number. - * The pool may remain below this number is the server never experiences enough traffic to consume this many clients. - */ -export const poolSize = 16; - -/** - * How often to drop an idle connection from the pool. - * This will never shrink the pool below poolSize. - */ -export const poolShrinkInterval = 5 * 1000; // 5 seconds - -@Injectable() -export class RedisConnectionPool implements OnApplicationShutdown { - private readonly poolShrinkTimer: Timeout; - private readonly pool: Redis.Redis[] = []; - private readonly logger: Logger; - private readonly redisOptions: RedisOptions; - - constructor(@Inject(DI.config) config: Config, loggerService: LoggerService, timeoutService: TimeoutService) { - this.logger = loggerService.getLogger('redis-pool'); - this.poolShrinkTimer = timeoutService.setInterval(() => this.shrinkPool(), poolShrinkInterval); - this.redisOptions = { - ...config.redis, - - // Set lazyConnect so that we can await() the connection manually. - // This helps to avoid a "stampede" of new connections (which are processed in the background!) under bursty conditions. - lazyConnect: true, - enableOfflineQueue: false, - }; - } - - /** - * Gets a Redis connection from the pool, or creates a new connection if the pool is empty. - * The returned object MUST be returned with a call to free(), even in the case of exceptions! - * Use a try...finally block for safe handling. - */ - public async alloc(): Promise { - let redis = this.pool.pop(); - - // The pool may be empty if we're under heavy load and/or we haven't opened all connections. - // Just construct a new instance, which will eventually be added to the pool. - // Excess clients will be disposed eventually. - if (!redis) { - redis = new Redis.Redis(this.redisOptions); - await redis.connect(); - } - - return redis; - } - - /** - * Returns a Redis connection to the pool. - * The instance MUST not be used after returning! - * Use a try...finally block for safe handling. - */ - public async free(redis: Redis.Redis): Promise { - // https://redis.io/docs/latest/commands/reset/ - await redis.reset(); - - this.pool.push(redis); - } - - public async onApplicationShutdown(): Promise { - // Cancel timer, otherwise it will cause a memory leak - clearInterval(this.poolShrinkTimer); - - // Disconnect all remaining instances - while (this.pool.length > 0) { - await this.dropClient(); - } - } - - private async shrinkPool(): Promise { - this.logger.debug(`Pool size is ${this.pool.length}`); - if (this.pool.length > poolSize) { - await this.dropClient(); - } - } - - private async dropClient(): Promise { - try { - const redis = this.pool.pop(); - await redis?.quit(); - } catch (err) { - this.logger.warn(`Error disconnecting from redis: ${err}`, { err }); - } - } -} diff --git a/packages/backend/src/core/TimeoutService.ts b/packages/backend/src/core/TimeoutService.ts deleted file mode 100644 index 093b9a7b04..0000000000 --- a/packages/backend/src/core/TimeoutService.ts +++ /dev/null @@ -1,76 +0,0 @@ -/* - * SPDX-FileCopyrightText: hazelnoot and other Sharkey contributors - * SPDX-License-Identifier: AGPL-3.0-only - */ - -/** - * Provides access to setTimeout, setInterval, and related functions. - * Used to support deterministic unit testing. - */ -export class TimeoutService { - /** - * Returns a promise that resolves after the specified timeout in milliseconds. - */ - public delay(timeout: number): Promise { - return new Promise(resolve => { - this.setTimeout(resolve, timeout); - }); - } - - /** - * Passthrough to node's setTimeout - */ - public setTimeout(handler: TimeoutHandler, timeout?: number): Timeout { - return setTimeout(() => handler(), timeout); - } - - /** - * Passthrough to node's setInterval - */ - public setInterval(handler: TimeoutHandler, timeout?: number): Timeout { - return setInterval(() => handler(), timeout); - } - - /** - * Passthrough to node's clearTimeout - */ - public clearTimeout(timeout: Timeout) { - clearTimeout(timeout); - } - - /** - * Passthrough to node's clearInterval - */ - public clearInterval(timeout: Timeout) { - clearInterval(timeout); - } -} - -/** - * Function to be called when a timer or interval elapses. - */ -export type TimeoutHandler = () => void; - -/** - * A fucked TS issue causes the DOM setTimeout to get merged with Node setTimeout, creating a "quantum method" that returns either "number" or "NodeJS.Timeout" depending on how it's called. - * This would be fine, except it always matches the *wrong type*! - * The result is this "impossible" scenario: - * - * ```typescript - * // Test evaluates to "false", because the method's return type is not equal to itself. - * type Test = ReturnType extends ReturnType ? true : false; - * - * // This is a compiler error, because the type is broken and triggers some internal TS bug. - * const timeout = setTimeout(handler); - * clearTimeout(timeout); // compiler error here, because even type inference doesn't work. - * - * // This fails to compile. - * function test(handler, timeout): ReturnType { - * return setTimeout(handler, timeout); - * } - * ``` - * - * The bug is marked as "wontfix" by TS devs, so we have to work around it ourselves. -_- - * By forcing the return type to *explicitly* include both types, we at least make it possible to work with the resulting token. - */ -export type Timeout = NodeJS.Timeout | number; diff --git a/packages/backend/src/server/SkRateLimiterService.md b/packages/backend/src/server/SkRateLimiterService.md new file mode 100644 index 0000000000..c2752f5027 --- /dev/null +++ b/packages/backend/src/server/SkRateLimiterService.md @@ -0,0 +1,143 @@ +# SkRateLimiterService - Leaky Bucket Rate Limit Implementation + +SkRateLimiterService replaces Misskey's RateLimiterService for all use cases. +It offers a simplified API, detailed metrics, and support for Rate Limit headers. +The prime feature is an implementation of Leaky Bucket - a flexible rate limiting scheme that better supports bursty request patterns common with human interaction. + +## Compatibility + +The API is backwards-compatible with existing limit definitions, but it's preferred to use the new BucketRateLimit interface. +Legacy limits will be "translated" into a bucket limit in a way that attempts to respect max, duration, and minInterval (if present). +SkRateLimiterService is quite not plug-and-play compatible with existing call sites, because it no longer throws when a limit is exceeded. +Instead, the returned LimitInfo object will have "blocked" set to true. +Callers are responsible for checking this property and taking any desired action, such as rejecting a request or returning limit details. + +## Headers + +LimitInfo objects (returned by SkRateLimitService.limit()) can be passed to rate-limit-utils.attachHeaders() to send standard rate limit headers with an HTTP response. +The defined headers are: + +| Header | Definition | Example | +|-------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------------------------| +| `X-RateLimit-Remaining` | Number of calls that can be made without triggering the rate limit. Will be zero if the limit is already exceeded, or will be exceeded by the next request. | `X-RateLimit-Remaining: 1` | +| `X-RateLimit-Clear` | Time in seconds required to completely clear the rate limit "bucket". | `X-RateLimit-Clear: 1.5` | +| `X-RateLimit-Reset` | Contains the number of seconds to wait before retrying the current request. Clients should delay for at least this long before making another call. Only included if the rate limit has already been exceeded. | `X-RateLimit-Reset: 0.755` | +| `Retry-After` | Like `X-RateLimit-Reset`, but measured in seconds (rounded up). Preserved for backwards compatibility, and only included if the rate limit has already been exceeded. | `Retry-After: 2` | + +Note: rate limit headers are not standardized, except for `Retry-After`. +Header meanings and usage have been devised by adapting common patterns to work with a leaky bucket model instead. + +## Performance + +SkRateLimiterService makes between 1 and 4 redis transactions per rate limit check. +One call is read-only, while the others perform at least one write operation. +Two integer keys are stored per client/subject, and both expire together after the maximum duration of the limit. +While performance has not been formally tested, it's expected that SkRateLimiterService will perform roughly on par with the legacy RateLimiterService. +Redis memory usage should be notably lower due to the reduced number of keys and avoidance of set / array constructions. + +## Concurrency and Multi-Node Correctness + +To provide consistency across multi-node environments, leaky bucket is implemented with only atomic operations (Increment, Decrement, Add, and Subtract). +This allows the use of Optimistic Locking via modify-check-rollback logic. +If a data conflict is detected during the "drip" operation, then it's safely reverted by executing its inverse (Increment <-> Decrement, Add <-> Subtract). +We don't need to check for conflicts when adding the current request, as all checks account for the case where the bucket has been "overfilled". +Should that happen, the limit delay will be extended until the bucket size is back within limits. + +There is one non-atomic `SET` operation used to populate the initial Timestamp value, but we can safely ignore data races there. +Any possible conflict would have to occur within a few-milliseconds window, which means that the final value can be no more than a few milliseconds off from the expected value. +This error does not compound, as all further operations are relative (Increment and Add). +Thus, it's considered an acceptable tradeoff given the limitations imposed by Redis and IORedis library. + +## Algorithm Pseudocode + +The Atomic Leaky Bucket algorithm is described here, in pseudocode: + +``` +# Terms +# * Now - UNIX timestamp of the current moment +# * Bucket Size - Maximum number of requests allowed in the bucket +# * Counter - Number of requests in the bucket +# * Drip Rate - How often to decrement counter +# * Drip Size - How much to decrement the counter +# * Timestamp - UNIX timestamp of last bucket drip +# * Delta Counter - Difference between current and expected counter value +# * Delta Timestamp - Difference between current and expected timestamp value + +# 0 - Calculations +dripRate = ceil(limit.dripRate ?? 1000); +dripSize = ceil(limit.dripSize ?? 1); +bucketSize = max(ceil(limit.size / factor), 1); +maxExpiration = max(ceil((dripRate * ceil(bucketSize / dripSize)) / 1000), 1);; + +# 1 - Read +MULTI + GET 'counter' INTO counter + GET 'timestamp' INTO timestamp +EXEC + +# 2 - Drip +if (counter > 0) { + # Deltas + deltaCounter = floor((now - timestamp) / dripRate) * dripSize; + deltaCounter = min(deltaCounter, counter); + deltaTimestamp = deltaCounter * dripRate; + if (deltaCounter > 0) { + # Update + expectedTimestamp = timestamp + MULTI + GET 'timestamp' INTO canaryTimestamp + INCRBY 'timestamp' deltaTimestamp + EXPIRE 'timestamp' maxExpiration + GET 'timestamp' INTO timestamp + DECRBY 'counter' deltaCounter + EXPIRE 'counter' maxExpiration + GET 'counter' INTO counter + EXEC + # Rollback + if (canaryTimestamp != expectedTimestamp) { + MULTI + DECRBY 'timestamp' deltaTimestamp + GET 'timestamp' INTO timestmamp + INCRBY 'counter' deltaCounter + GET 'counter' INTO counter + EXEC + } + } +} + +# 3 - Check +blocked = counter >= bucketSize +if (!blocked) { + if (timestamp == 0) { + # Edge case - set the initial value for timestamp. + # Otherwise the first request will immediately drip away. + MULTI + SET 'timestamp', now + EXPIRE 'timestamp' maxExpiration + INCR 'counter' + EXPIRE 'counter' maxExpiration + GET 'counter' INTO counter + EXEC + } else { + MULTI + INCR 'counter' + EXPIRE 'counter' maxExpiration + GET 'counter' INTO counter + EXEC + } +} + +# 4 - Handle +if (blocked) { + # Application-specific code goes here. + # At this point blocked, counter, and timestamp are all accurate and synced to redis. + # Caller can apply limits, calculate headers, log audit failure, or anything else. +} +``` + +## Notes, Resources, and Further Reading + +* https://en.wikipedia.org/wiki/Leaky_bucket#As_a_meter +* https://ietf-wg-httpapi.github.io/ratelimit-headers/darrelmiller-policyname/draft-ietf-httpapi-ratelimit-headers.txt +* https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Retry-After +* https://stackoverflow.com/a/16022625 diff --git a/packages/backend/src/server/api/SkRateLimiterService.ts b/packages/backend/src/server/api/SkRateLimiterService.ts index 71681aadc9..d349e192e1 100644 --- a/packages/backend/src/server/api/SkRateLimiterService.ts +++ b/packages/backend/src/server/api/SkRateLimiterService.ts @@ -8,8 +8,7 @@ import Redis from 'ioredis'; import { TimeService } from '@/core/TimeService.js'; import { EnvService } from '@/core/EnvService.js'; import { BucketRateLimit, LegacyRateLimit, LimitInfo, RateLimit, hasMinLimit, isLegacyRateLimit, Keyed, hasMaxLimit, disabledLimitInfo, MaxLegacyLimit, MinLegacyLimit } from '@/misc/rate-limit-utils.js'; -import { RedisConnectionPool } from '@/core/RedisConnectionPool.js'; -import { TimeoutService } from '@/core/TimeoutService.js'; +import { DI } from '@/di-symbols.js'; @Injectable() export class SkRateLimiterService { @@ -19,11 +18,8 @@ export class SkRateLimiterService { @Inject(TimeService) private readonly timeService: TimeService, - @Inject(TimeoutService) - private readonly timeoutService: TimeoutService, - - @Inject(RedisConnectionPool) - private readonly redisPool: RedisConnectionPool, + @Inject(DI.redis) + private readonly redisClient: Redis.Redis, @Inject(EnvService) envService: EnvService, @@ -31,6 +27,12 @@ export class SkRateLimiterService { this.disabled = envService.env.NODE_ENV === 'test'; } + /** + * Check & increment a rate limit + * @param limit The limit definition + * @param actor Client who is calling this limit + * @param factor Scaling factor - smaller = larger limit (less restrictive) + */ public async limit(limit: Keyed, actor: string, factor = 1): Promise { if (this.disabled || factor === 0) { return disabledLimitInfo; @@ -40,52 +42,28 @@ export class SkRateLimiterService { throw new Error(`Rate limit factor is zero or negative: ${factor}`); } - const redis = await this.redisPool.alloc(); - try { - return await this.tryLimit(redis, limit, actor, factor); - } finally { - await this.redisPool.free(redis); + return await this.tryLimit(limit, actor, factor); + } + + private async tryLimit(limit: Keyed, actor: string, factor: number): Promise { + if (isLegacyRateLimit(limit)) { + return await this.limitLegacy(limit, actor, factor); + } else { + return await this.limitBucket(limit, actor, factor); } } - private async tryLimit(redis: Redis.Redis, limit: Keyed, actor: string, factor: number, retry = 0): Promise { - try { - if (retry > 0) { - // Real-world testing showed the need for backoff to "spread out" bursty traffic. - const backoff = Math.round(Math.pow(2, retry + Math.random())); - await this.timeoutService.delay(backoff); - } - - if (isLegacyRateLimit(limit)) { - return await this.limitLegacy(redis, limit, actor, factor); - } else { - return await this.limitBucket(redis, limit, actor, factor); - } - } catch (err) { - // We may experience collision errors from optimistic locking. - // This is expected, so we should retry a few times before giving up. - // https://redis.io/docs/latest/develop/interact/transactions/#optimistic-locking-using-check-and-set - if (err instanceof ConflictError && retry < 4) { - // We can reuse the same connection to reduce pool contention, but we have to reset it first. - await redis.reset(); - return await this.tryLimit(redis, limit, actor, factor, retry + 1); - } - - throw err; - } - } - - private async limitLegacy(redis: Redis.Redis, limit: Keyed, actor: string, factor: number): Promise { + private async limitLegacy(limit: Keyed, actor: string, factor: number): Promise { if (hasMaxLimit(limit)) { - return await this.limitMaxLegacy(redis, limit, actor, factor); + return await this.limitMaxLegacy(limit, actor, factor); } else if (hasMinLimit(limit)) { - return await this.limitMinLegacy(redis, limit, actor, factor); + return await this.limitMinLegacy(limit, actor, factor); } else { return disabledLimitInfo; } } - private async limitMaxLegacy(redis: Redis.Redis, limit: Keyed, actor: string, factor: number): Promise { + private async limitMaxLegacy(limit: Keyed, actor: string, factor: number): Promise { if (limit.duration === 0) return disabledLimitInfo; if (limit.duration < 0) throw new Error(`Invalid rate limit ${limit.key}: duration is negative (${limit.duration})`); if (limit.max < 1) throw new Error(`Invalid rate limit ${limit.key}: max is less than 1 (${limit.max})`); @@ -106,10 +84,10 @@ export class SkRateLimiterService { dripRate, dripSize, }; - return await this.limitBucket(redis, bucketLimit, actor, factor); + return await this.limitBucket(bucketLimit, actor, factor); } - private async limitMinLegacy(redis: Redis.Redis, limit: Keyed, actor: string, factor: number): Promise { + private async limitMinLegacy(limit: Keyed, actor: string, factor: number): Promise { if (limit.minInterval === 0) return disabledLimitInfo; if (limit.minInterval < 0) throw new Error(`Invalid rate limit ${limit.key}: minInterval is negative (${limit.minInterval})`); @@ -121,33 +99,83 @@ export class SkRateLimiterService { dripRate, dripSize: 1, }; - return await this.limitBucket(redis, bucketLimit, actor, factor); + return await this.limitBucket(bucketLimit, actor, factor); } - private async limitBucket(redis: Redis.Redis, limit: Keyed, actor: string, factor: number): Promise { + /** + * Implementation of Leaky Bucket rate limiting - see SkRateLimiterService.md for details. + */ + private async limitBucket(limit: Keyed, actor: string, factor: number): Promise { if (limit.size < 1) throw new Error(`Invalid rate limit ${limit.key}: size is less than 1 (${limit.size})`); if (limit.dripRate != null && limit.dripRate < 1) throw new Error(`Invalid rate limit ${limit.key}: dripRate is less than 1 (${limit.dripRate})`); if (limit.dripSize != null && limit.dripSize < 1) throw new Error(`Invalid rate limit ${limit.key}: dripSize is less than 1 (${limit.dripSize})`); - const redisKey = createLimitKey(limit, actor); + // 0 - Calculate + const now = this.timeService.now; const bucketSize = Math.max(Math.ceil(limit.size / factor), 1); const dripRate = Math.ceil(limit.dripRate ?? 1000); const dripSize = Math.ceil(limit.dripSize ?? 1); - const expirationSec = Math.max(Math.ceil(bucketSize / dripRate), 1); + const expirationSec = Math.max(Math.ceil((dripRate * Math.ceil(bucketSize / dripSize)) / 1000), 1); - // Simulate bucket drips - const counter = await this.getLimitCounter(redis, redisKey); - if (counter.counter > 0) { - const dripsSinceLastTick = Math.floor((this.timeService.now - counter.timestamp) / dripRate) * dripSize; - counter.counter = Math.max(counter.counter - dripsSinceLastTick, 0); + // 1 - Read + const counterKey = createLimitKey(limit, actor, 'c'); + const timestampKey = createLimitKey(limit, actor, 't'); + const counter = await this.getLimitCounter(counterKey, timestampKey); + + // 2 - Drip + const dripsSinceLastTick = Math.floor((now - counter.timestamp) / dripRate) * dripSize; + const deltaCounter = Math.min(dripsSinceLastTick, counter.counter); + const deltaTimestamp = dripsSinceLastTick * dripRate; + if (deltaCounter > 0) { + // Execute the next drip(s) + const results = await this.executeRedisMulti( + ['get', timestampKey], + ['incrby', timestampKey, deltaTimestamp], + ['expire', timestampKey, expirationSec], + ['get', timestampKey], + ['decrby', counterKey, deltaCounter], + ['expire', counterKey, expirationSec], + ['get', counterKey], + ); + const expectedTimestamp = counter.timestamp; + const canaryTimestamp = results[0] ? parseInt(results[0]) : 0; + counter.timestamp = results[3] ? parseInt(results[3]) : 0; + counter.counter = results[6] ? parseInt(results[6]) : 0; + + // Check for a data collision and rollback + if (canaryTimestamp !== expectedTimestamp) { + const rollbackResults = await this.executeRedisMulti( + ['decrby', timestampKey, deltaTimestamp], + ['get', timestampKey], + ['incrby', counterKey, deltaCounter], + ['get', counterKey], + ); + counter.timestamp = rollbackResults[1] ? parseInt(rollbackResults[1]) : 0; + counter.counter = rollbackResults[3] ? parseInt(rollbackResults[3]) : 0; + } } - // Increment the limit, then synchronize with redis + // 3 - Check const blocked = counter.counter >= bucketSize; if (!blocked) { - counter.counter++; - counter.timestamp = this.timeService.now; - await this.updateLimitCounter(redis, redisKey, expirationSec, counter); + if (counter.timestamp === 0) { + const results = await this.executeRedisMulti( + ['set', timestampKey, now], + ['expire', timestampKey, expirationSec], + ['incr', counterKey], + ['expire', counterKey, expirationSec], + ['get', counterKey], + ); + counter.timestamp = now; + counter.counter = results[4] ? parseInt(results[4]) : 0; + } else { + const results = await this.executeRedisMulti( + ['incr', counterKey], + ['expire', counterKey, expirationSec], + ['get', counterKey], + ); + counter.counter = results[2] ? parseInt(results[2]) : 0; + } } // Calculate how much time is needed to free up a bucket slot @@ -164,37 +192,20 @@ export class SkRateLimiterService { return { blocked, remaining, resetSec, resetMs, fullResetSec, fullResetMs }; } - private async getLimitCounter(redis: Redis.Redis, key: string): Promise { - const counter: LimitCounter = { counter: 0, timestamp: 0 }; - - // Watch the key BEFORE reading it! - await redis.watch(key); - const data = await redis.get(key); - - // Data may be missing or corrupt if the key doesn't exist. - // This is an expected edge case. - if (data) { - const parts = data.split(':'); - if (parts.length === 2) { - counter.counter = parseInt(parts[0]); - counter.timestamp = parseInt(parts[1]); - } - } - - return counter; - } - - private async updateLimitCounter(redis: Redis.Redis, key: string, expirationSec: number, counter: LimitCounter): Promise { - const data = `${counter.counter}:${counter.timestamp}`; - - await this.executeRedisMulti( - redis, - [['set', key, data, 'EX', expirationSec]], + private async getLimitCounter(counterKey: string, timestampKey: string): Promise { + const [counter, timestamp] = await this.executeRedisMulti( + ['get', counterKey], + ['get', timestampKey], ); + + return { + counter: counter ? parseInt(counter) : 0, + timestamp: timestamp ? parseInt(timestamp) : 0, + }; } - private async executeRedisMulti(redis: Redis.Redis, batch: RedisBatch): Promise> { - const results = await redis.multi(batch).exec(); + private async executeRedisMulti(...batch: RedisCommand[]): Promise { + const results = await this.redisClient.multi(batch).exec(); // Transaction conflict (retryable) if (!results) { @@ -206,21 +217,32 @@ export class SkRateLimiterService { throw new Error('Redis error: failed to execute batch'); } - // Command failed (fatal) - const errors = results.map(r => r[0]).filter(e => e != null); - if (errors.length > 0) { - throw new AggregateError(errors, `Redis error: failed to execute command(s): '${errors.join('\', \'')}'`); + // Map responses + const errors: Error[] = []; + const responses: RedisResult[] = []; + for (const [error, response] of results) { + if (error) errors.push(error); + responses.push(response as RedisResult); } - return results.map(r => r[1]) as RedisResults; + // Command failed (fatal) + if (errors.length > 0) { + const errorMessages = errors + .map((e, i) => `Error in command ${i}: ${e}`) + .join('\', \''); + throw new AggregateError(errors, `Redis error: failed to execute command(s): '${errorMessages}'`); + } + + return responses; } } -type RedisBatch = [string, ...unknown[]][] & { length: Num }; -type RedisResults = (string | null)[] & { length: Num }; +// Not correct, but good enough for the basic commands we use. +type RedisResult = string | null; +type RedisCommand = [command: string, ...args: unknown[]]; -function createLimitKey(limit: Keyed, actor: string): string { - return `rl_${actor}_${limit.key}`; +function createLimitKey(limit: Keyed, actor: string, value: string): string { + return `rl_${actor}_${limit.key}_${value}`; } class ConflictError extends Error {} diff --git a/packages/backend/test/unit/server/api/SkRateLimiterServiceTests.ts b/packages/backend/test/unit/server/api/SkRateLimiterServiceTests.ts index deb6b9f80e..bf424852e6 100644 --- a/packages/backend/test/unit/server/api/SkRateLimiterServiceTests.ts +++ b/packages/backend/test/unit/server/api/SkRateLimiterServiceTests.ts @@ -6,8 +6,6 @@ import type Redis from 'ioredis'; import { SkRateLimiterService } from '@/server/api/SkRateLimiterService.js'; import { BucketRateLimit, Keyed, LegacyRateLimit } from '@/misc/rate-limit-utils.js'; -import { RedisConnectionPool } from '@/core/RedisConnectionPool.js'; -import { Timeout, TimeoutHandler, TimeoutService } from '@/core/TimeoutService.js'; /* eslint-disable @typescript-eslint/no-non-null-assertion */ @@ -64,12 +62,6 @@ describe(SkRateLimiterService, () => { return Promise.resolve(); }, } as unknown as Redis.Redis; - const mockRedisPool = { - alloc() { - return Promise.resolve(mockRedisClient); - }, - free() {}, - } as unknown as RedisConnectionPool; mockEnvironment = Object.create(process.env); mockEnvironment.NODE_ENV = 'production'; @@ -77,22 +69,9 @@ describe(SkRateLimiterService, () => { env: mockEnvironment, }; - const mockTimeoutService = new class extends TimeoutService { - setTimeout(handler: TimeoutHandler): Timeout { - handler(); - return 0; - } - setInterval(handler: TimeoutHandler): Timeout { - handler(); - return 0; - } - clearTimeout() {} - clearInterval() {} - }; - let service: SkRateLimiterService | undefined = undefined; serviceUnderTest = () => { - return service ??= new SkRateLimiterService(mockTimeService, mockTimeoutService, mockRedisPool, mockEnvService); + return service ??= new SkRateLimiterService(mockTimeService, mockRedisClient, mockEnvService); }; }); @@ -108,15 +87,70 @@ describe(SkRateLimiterService, () => { limitTimestamp = undefined; mockRedis.push(([command, ...args]) => { - if (command === 'set' && args[0] === 'rl_actor_test') { - const parts = (args[1] as string).split(':'); - limitCounter = parseInt(parts[0] as string); - limitTimestamp = parseInt(parts[1] as string); - return [null, args[1]]; + if (command === 'get') { + if (args[0] === 'rl_actor_test_c') { + const data = limitCounter?.toString() ?? null; + return [null, data]; + } + if (args[0] === 'rl_actor_test_t') { + const data = limitTimestamp?.toString() ?? null; + return [null, data]; + } } - if (command === 'get' && args[0] === 'rl_actor_test') { - const data = `${limitCounter ?? 0}:${limitTimestamp ?? 0}`; - return [null, data]; + + if (command === 'set') { + if (args[0] === 'rl_actor_test_c') { + limitCounter = parseInt(args[1] as string); + return [null, args[1]]; + } + if (args[0] === 'rl_actor_test_t') { + limitTimestamp = parseInt(args[1] as string); + return [null, args[1]]; + } + } + + if (command === 'incr') { + if (args[0] === 'rl_actor_test_c') { + limitCounter = (limitCounter ?? 0) + 1; + return [null, null]; + } + if (args[0] === 'rl_actor_test_t') { + limitTimestamp = (limitTimestamp ?? 0) + 1; + return [null, null]; + } + } + + if (command === 'incrby') { + if (args[0] === 'rl_actor_test_c') { + limitCounter = (limitCounter ?? 0) + parseInt(args[1] as string); + return [null, null]; + } + if (args[0] === 'rl_actor_test_t') { + limitTimestamp = (limitTimestamp ?? 0) + parseInt(args[1] as string); + return [null, null]; + } + } + + if (command === 'decr') { + if (args[0] === 'rl_actor_test_c') { + limitCounter = (limitCounter ?? 0) - 1; + return [null, null]; + } + if (args[0] === 'rl_actor_test_t') { + limitTimestamp = (limitTimestamp ?? 0) - 1; + return [null, null]; + } + } + + if (command === 'decrby') { + if (args[0] === 'rl_actor_test_c') { + limitCounter = (limitCounter ?? 0) - parseInt(args[1] as string); + return [null, null]; + } + if (args[0] === 'rl_actor_test_t') { + limitTimestamp = (limitTimestamp ?? 0) - parseInt(args[1] as string); + return [null, null]; + } } return null; @@ -269,7 +303,19 @@ describe(SkRateLimiterService, () => { await serviceUnderTest().limit(limit, actor); - expect(commands).toContainEqual(['set', 'rl_actor_test', '1:0', 'EX', 1]); + expect(commands).toContainEqual(['expire', 'rl_actor_test_c', 1]); + }); + + it('should set timestamp expiration', async () => { + const commands: unknown[][] = []; + mockRedis.push(command => { + commands.push(command); + return null; + }); + + await serviceUnderTest().limit(limit, actor); + + expect(commands).toContainEqual(['expire', 'rl_actor_test_t', 1]); }); it('should not increment when already blocked', async () => { @@ -368,35 +414,6 @@ describe(SkRateLimiterService, () => { await expect(promise).rejects.toThrow(/dripSize is less than 1/); }); - it('should retry when redis conflicts', async () => { - let numCalls = 0; - const originalExec = mockRedisExec; - mockRedisExec = () => { - numCalls++; - if (numCalls > 1) { - mockRedisExec = originalExec; - } - return Promise.resolve(null); - }; - - await serviceUnderTest().limit(limit, actor); - - expect(numCalls).toBe(2); - }); - - it('should bail out after 5 tries', async () => { - let numCalls = 0; - mockRedisExec = () => { - numCalls++; - return Promise.resolve(null); - }; - - const promise = serviceUnderTest().limit(limit, actor); - - await expect(promise).rejects.toThrow(/transaction conflict/); - expect(numCalls).toBe(5); - }); - it('should apply correction if extra calls slip through', async () => { limitCounter = 2; @@ -473,8 +490,9 @@ describe(SkRateLimiterService, () => { await serviceUnderTest().limit(limit, actor); // blocked mockTimeService.now += 1000; // 1 - 1 = 0 mockTimeService.now += 1000; // 0 - 1 = 0 - await serviceUnderTest().limit(limit, actor); // 0 + 1 = 1 + const info = await serviceUnderTest().limit(limit, actor); // 0 + 1 = 1 + expect(info.blocked).toBeFalsy(); expect(limitCounter).toBe(1); expect(limitTimestamp).toBe(3000); }); @@ -529,7 +547,19 @@ describe(SkRateLimiterService, () => { await serviceUnderTest().limit(limit, actor); - expect(commands).toContainEqual(['set', 'rl_actor_test', '1:0', 'EX', 1]); + expect(commands).toContainEqual(['expire', 'rl_actor_test_c', 1]); + }); + + it('should set timer expiration', async () => { + const commands: unknown[][] = []; + mockRedis.push(command => { + commands.push(command); + return null; + }); + + await serviceUnderTest().limit(limit, actor); + + expect(commands).toContainEqual(['expire', 'rl_actor_test_t', 1]); }); it('should not increment when already blocked', async () => { @@ -688,7 +718,19 @@ describe(SkRateLimiterService, () => { await serviceUnderTest().limit(limit, actor); - expect(commands).toContainEqual(['set', 'rl_actor_test', '1:0', 'EX', 1]); + expect(commands).toContainEqual(['expire', 'rl_actor_test_c', 1]); + }); + + it('should set timestamp expiration', async () => { + const commands: unknown[][] = []; + mockRedis.push(command => { + commands.push(command); + return null; + }); + + await serviceUnderTest().limit(limit, actor); + + expect(commands).toContainEqual(['expire', 'rl_actor_test_t', 1]); }); it('should not increment when already blocked', async () => { @@ -866,7 +908,19 @@ describe(SkRateLimiterService, () => { await serviceUnderTest().limit(limit, actor); - expect(commands).toContainEqual(['set', 'rl_actor_test', '1:0', 'EX', 1]); + expect(commands).toContainEqual(['expire', 'rl_actor_test_c', 1]); + }); + + it('should set timestamp expiration', async () => { + const commands: unknown[][] = []; + mockRedis.push(command => { + commands.push(command); + return null; + }); + + await serviceUnderTest().limit(limit, actor); + + expect(commands).toContainEqual(['expire', 'rl_actor_test_t', 1]); }); it('should not increment when already blocked', async () => { From 0f5c78a69bf9ccf645b981e6ec844878feafd36c Mon Sep 17 00:00:00 2001 From: Hazelnoot Date: Wed, 11 Dec 2024 09:10:56 -0500 Subject: [PATCH 5/8] increase chart rate limits (fixes 429s in control panel / info pages) --- .../src/server/api/endpoints/charts/active-users.ts | 7 ++++--- .../backend/src/server/api/endpoints/charts/ap-request.ts | 7 ++++--- packages/backend/src/server/api/endpoints/charts/drive.ts | 7 ++++--- .../backend/src/server/api/endpoints/charts/federation.ts | 7 ++++--- .../backend/src/server/api/endpoints/charts/instance.ts | 7 ++++--- packages/backend/src/server/api/endpoints/charts/notes.ts | 7 ++++--- .../backend/src/server/api/endpoints/charts/user/drive.ts | 7 ++++--- .../src/server/api/endpoints/charts/user/following.ts | 7 ++++--- .../backend/src/server/api/endpoints/charts/user/notes.ts | 7 ++++--- .../backend/src/server/api/endpoints/charts/user/pv.ts | 7 ++++--- .../src/server/api/endpoints/charts/user/reactions.ts | 7 ++++--- packages/backend/src/server/api/endpoints/charts/users.ts | 7 ++++--- 12 files changed, 48 insertions(+), 36 deletions(-) diff --git a/packages/backend/src/server/api/endpoints/charts/active-users.ts b/packages/backend/src/server/api/endpoints/charts/active-users.ts index f6c0c045df..dcdcf46d0b 100644 --- a/packages/backend/src/server/api/endpoints/charts/active-users.ts +++ b/packages/backend/src/server/api/endpoints/charts/active-users.ts @@ -17,10 +17,11 @@ export const meta = { allowGet: true, cacheSec: 60 * 60, - // 10 calls per 5 seconds + // Burst up to 100, then 2/sec average limit: { - duration: 1000 * 5, - max: 10, + type: 'bucket', + size: 100, + dripRate: 500, }, } as const; diff --git a/packages/backend/src/server/api/endpoints/charts/ap-request.ts b/packages/backend/src/server/api/endpoints/charts/ap-request.ts index 4c5c0d5d20..28c64229e7 100644 --- a/packages/backend/src/server/api/endpoints/charts/ap-request.ts +++ b/packages/backend/src/server/api/endpoints/charts/ap-request.ts @@ -17,10 +17,11 @@ export const meta = { allowGet: true, cacheSec: 60 * 60, - // 10 calls per 5 seconds + // Burst up to 100, then 2/sec average limit: { - duration: 1000 * 5, - max: 10, + type: 'bucket', + size: 100, + dripRate: 500, }, } as const; diff --git a/packages/backend/src/server/api/endpoints/charts/drive.ts b/packages/backend/src/server/api/endpoints/charts/drive.ts index 8210ec8fe7..69ff3c5d7a 100644 --- a/packages/backend/src/server/api/endpoints/charts/drive.ts +++ b/packages/backend/src/server/api/endpoints/charts/drive.ts @@ -17,10 +17,11 @@ export const meta = { allowGet: true, cacheSec: 60 * 60, - // 10 calls per 5 seconds + // Burst up to 100, then 2/sec average limit: { - duration: 1000 * 5, - max: 10, + type: 'bucket', + size: 100, + dripRate: 500, }, } as const; diff --git a/packages/backend/src/server/api/endpoints/charts/federation.ts b/packages/backend/src/server/api/endpoints/charts/federation.ts index 56a5dbea31..bd870cc3d9 100644 --- a/packages/backend/src/server/api/endpoints/charts/federation.ts +++ b/packages/backend/src/server/api/endpoints/charts/federation.ts @@ -17,10 +17,11 @@ export const meta = { allowGet: true, cacheSec: 60 * 60, - // 10 calls per 5 seconds + // Burst up to 100, then 2/sec average limit: { - duration: 1000 * 5, - max: 10, + type: 'bucket', + size: 100, + dripRate: 500, }, } as const; diff --git a/packages/backend/src/server/api/endpoints/charts/instance.ts b/packages/backend/src/server/api/endpoints/charts/instance.ts index 7f79e1356d..765bf024ee 100644 --- a/packages/backend/src/server/api/endpoints/charts/instance.ts +++ b/packages/backend/src/server/api/endpoints/charts/instance.ts @@ -17,10 +17,11 @@ export const meta = { allowGet: true, cacheSec: 60 * 60, - // 10 calls per 5 seconds + // Burst up to 100, then 2/sec average limit: { - duration: 1000 * 5, - max: 10, + type: 'bucket', + size: 100, + dripRate: 500, }, } as const; diff --git a/packages/backend/src/server/api/endpoints/charts/notes.ts b/packages/backend/src/server/api/endpoints/charts/notes.ts index b3660b558b..ecac436311 100644 --- a/packages/backend/src/server/api/endpoints/charts/notes.ts +++ b/packages/backend/src/server/api/endpoints/charts/notes.ts @@ -17,10 +17,11 @@ export const meta = { allowGet: true, cacheSec: 60 * 60, - // 10 calls per 5 seconds + // Burst up to 100, then 2/sec average limit: { - duration: 1000 * 5, - max: 10, + type: 'bucket', + size: 100, + dripRate: 500, }, } as const; diff --git a/packages/backend/src/server/api/endpoints/charts/user/drive.ts b/packages/backend/src/server/api/endpoints/charts/user/drive.ts index 716c41f385..98ec40ade2 100644 --- a/packages/backend/src/server/api/endpoints/charts/user/drive.ts +++ b/packages/backend/src/server/api/endpoints/charts/user/drive.ts @@ -17,10 +17,11 @@ export const meta = { allowGet: true, cacheSec: 60 * 60, - // 10 calls per 5 seconds + // Burst up to 100, then 2/sec average limit: { - duration: 1000 * 5, - max: 10, + type: 'bucket', + size: 100, + dripRate: 500, }, } as const; diff --git a/packages/backend/src/server/api/endpoints/charts/user/following.ts b/packages/backend/src/server/api/endpoints/charts/user/following.ts index b67b5ca338..cb3dd36bab 100644 --- a/packages/backend/src/server/api/endpoints/charts/user/following.ts +++ b/packages/backend/src/server/api/endpoints/charts/user/following.ts @@ -17,10 +17,11 @@ export const meta = { allowGet: true, cacheSec: 60 * 60, - // 10 calls per 5 seconds + // Burst up to 100, then 2/sec average limit: { - duration: 1000 * 5, - max: 10, + type: 'bucket', + size: 100, + dripRate: 500, }, } as const; diff --git a/packages/backend/src/server/api/endpoints/charts/user/notes.ts b/packages/backend/src/server/api/endpoints/charts/user/notes.ts index e5587cab86..0742a21210 100644 --- a/packages/backend/src/server/api/endpoints/charts/user/notes.ts +++ b/packages/backend/src/server/api/endpoints/charts/user/notes.ts @@ -17,10 +17,11 @@ export const meta = { allowGet: true, cacheSec: 60 * 60, - // 10 calls per 5 seconds + // Burst up to 100, then 2/sec average limit: { - duration: 1000 * 5, - max: 10, + type: 'bucket', + size: 100, + dripRate: 500, }, } as const; diff --git a/packages/backend/src/server/api/endpoints/charts/user/pv.ts b/packages/backend/src/server/api/endpoints/charts/user/pv.ts index cbae3a21c1..a220381b00 100644 --- a/packages/backend/src/server/api/endpoints/charts/user/pv.ts +++ b/packages/backend/src/server/api/endpoints/charts/user/pv.ts @@ -17,10 +17,11 @@ export const meta = { allowGet: true, cacheSec: 60 * 60, - // 10 calls per 5 seconds + // Burst up to 100, then 2/sec average limit: { - duration: 1000 * 5, - max: 10, + type: 'bucket', + size: 100, + dripRate: 500, }, } as const; diff --git a/packages/backend/src/server/api/endpoints/charts/user/reactions.ts b/packages/backend/src/server/api/endpoints/charts/user/reactions.ts index d734240742..3bb33622c2 100644 --- a/packages/backend/src/server/api/endpoints/charts/user/reactions.ts +++ b/packages/backend/src/server/api/endpoints/charts/user/reactions.ts @@ -17,10 +17,11 @@ export const meta = { allowGet: true, cacheSec: 60 * 60, - // 10 calls per 5 seconds + // Burst up to 100, then 2/sec average limit: { - duration: 1000 * 5, - max: 10, + type: 'bucket', + size: 100, + dripRate: 500, }, } as const; diff --git a/packages/backend/src/server/api/endpoints/charts/users.ts b/packages/backend/src/server/api/endpoints/charts/users.ts index 6e1a8ebd4f..b5452517ab 100644 --- a/packages/backend/src/server/api/endpoints/charts/users.ts +++ b/packages/backend/src/server/api/endpoints/charts/users.ts @@ -17,10 +17,11 @@ export const meta = { allowGet: true, cacheSec: 60 * 60, - // 10 calls per 5 seconds + // Burst up to 100, then 2/sec average limit: { - duration: 1000 * 5, - max: 10, + type: 'bucket', + size: 100, + dripRate: 500, }, } as const; From 1377873b1d29574ff4dcf2436fb80cc043bab997 Mon Sep 17 00:00:00 2001 From: Hazelnoot Date: Wed, 11 Dec 2024 11:16:30 -0500 Subject: [PATCH 6/8] fix typos and wording in SkRateLimiterService.md --- .../src/server/SkRateLimiterService.md | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/packages/backend/src/server/SkRateLimiterService.md b/packages/backend/src/server/SkRateLimiterService.md index c2752f5027..762f8dfe14 100644 --- a/packages/backend/src/server/SkRateLimiterService.md +++ b/packages/backend/src/server/SkRateLimiterService.md @@ -8,13 +8,13 @@ The prime feature is an implementation of Leaky Bucket - a flexible rate limitin The API is backwards-compatible with existing limit definitions, but it's preferred to use the new BucketRateLimit interface. Legacy limits will be "translated" into a bucket limit in a way that attempts to respect max, duration, and minInterval (if present). -SkRateLimiterService is quite not plug-and-play compatible with existing call sites, because it no longer throws when a limit is exceeded. -Instead, the returned LimitInfo object will have "blocked" set to true. +SkRateLimiterService is not quite plug-and-play compatible with existing call sites, as it no longer throws when a limit is exceeded. +Instead, the returned LimitInfo object will have `blocked` set to true. Callers are responsible for checking this property and taking any desired action, such as rejecting a request or returning limit details. ## Headers -LimitInfo objects (returned by SkRateLimitService.limit()) can be passed to rate-limit-utils.attachHeaders() to send standard rate limit headers with an HTTP response. +LimitInfo objects (returned by `SkRateLimitService.limit()`) can be passed to `rate-limit-utils.sendRateLimitHeaders()` to send standard rate limit headers with an HTTP response. The defined headers are: | Header | Definition | Example | @@ -25,28 +25,28 @@ The defined headers are: | `Retry-After` | Like `X-RateLimit-Reset`, but measured in seconds (rounded up). Preserved for backwards compatibility, and only included if the rate limit has already been exceeded. | `Retry-After: 2` | Note: rate limit headers are not standardized, except for `Retry-After`. -Header meanings and usage have been devised by adapting common patterns to work with a leaky bucket model instead. +Header meanings and usage have been devised by adapting common patterns to work with a leaky bucket rate limit model. ## Performance SkRateLimiterService makes between 1 and 4 redis transactions per rate limit check. -One call is read-only, while the others perform at least one write operation. +The first call is read-only, while the others perform at least one write operation. Two integer keys are stored per client/subject, and both expire together after the maximum duration of the limit. -While performance has not been formally tested, it's expected that SkRateLimiterService will perform roughly on par with the legacy RateLimiterService. +While performance has not been formally tested, it's expected that SkRateLimiterService has an impact roughly on par with the legacy RateLimiterService. Redis memory usage should be notably lower due to the reduced number of keys and avoidance of set / array constructions. ## Concurrency and Multi-Node Correctness -To provide consistency across multi-node environments, leaky bucket is implemented with only atomic operations (Increment, Decrement, Add, and Subtract). -This allows the use of Optimistic Locking via modify-check-rollback logic. -If a data conflict is detected during the "drip" operation, then it's safely reverted by executing its inverse (Increment <-> Decrement, Add <-> Subtract). -We don't need to check for conflicts when adding the current request, as all checks account for the case where the bucket has been "overfilled". -Should that happen, the limit delay will be extended until the bucket size is back within limits. +To provide consistency across multi-node environments, leaky bucket is implemented with only atomic operations (`Increment`, `Decrement`, `Add`, and `Subtract`). +This allows the use of Optimistic Locking with read-modify-check logic. +If a data conflict is detected during the "drip" phase, then it's safely reverted by executing its inverse (`Increment` <-> `Decrement`, `Add` <-> `Subtract`). +We don't need to check for conflicts when adding the current request to the bucket, as all other logic already accounts for the case where the bucket has been "overfilled". +Should an extra request slip through, the limit delay will be extended until the bucket size is back within limits. -There is one non-atomic `SET` operation used to populate the initial Timestamp value, but we can safely ignore data races there. +There is one non-atomic `Set` operation used to populate the initial Timestamp value, but we can safely ignore data races there. Any possible conflict would have to occur within a few-milliseconds window, which means that the final value can be no more than a few milliseconds off from the expected value. This error does not compound, as all further operations are relative (Increment and Add). -Thus, it's considered an acceptable tradeoff given the limitations imposed by Redis and IORedis library. +Thus, it's considered an acceptable tradeoff given the limitations imposed by Redis and ioredis. ## Algorithm Pseudocode From 72d18602d892b8c92d541986c53b1cda7e1133e4 Mon Sep 17 00:00:00 2001 From: Hazelnoot Date: Wed, 11 Dec 2024 14:07:14 -0500 Subject: [PATCH 7/8] fix SkRateLimiterService tests --- .../backend/test/unit/server/api/SkRateLimiterServiceTests.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/backend/test/unit/server/api/SkRateLimiterServiceTests.ts b/packages/backend/test/unit/server/api/SkRateLimiterServiceTests.ts index bf424852e6..d13dbd2a71 100644 --- a/packages/backend/test/unit/server/api/SkRateLimiterServiceTests.ts +++ b/packages/backend/test/unit/server/api/SkRateLimiterServiceTests.ts @@ -908,7 +908,7 @@ describe(SkRateLimiterService, () => { await serviceUnderTest().limit(limit, actor); - expect(commands).toContainEqual(['expire', 'rl_actor_test_c', 1]); + expect(commands).toContainEqual(['expire', 'rl_actor_test_c', 5]); }); it('should set timestamp expiration', async () => { @@ -920,7 +920,7 @@ describe(SkRateLimiterService, () => { await serviceUnderTest().limit(limit, actor); - expect(commands).toContainEqual(['expire', 'rl_actor_test_t', 1]); + expect(commands).toContainEqual(['expire', 'rl_actor_test_t', 5]); }); it('should not increment when already blocked', async () => { From 755ff8783b9b4c1b4e5949fa417e0c85a8bc0e29 Mon Sep 17 00:00:00 2001 From: Hazelnoot Date: Wed, 11 Dec 2024 14:07:25 -0500 Subject: [PATCH 8/8] clarify naming of legacy rate limit methods --- packages/backend/src/server/api/SkRateLimiterService.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/backend/src/server/api/SkRateLimiterService.ts b/packages/backend/src/server/api/SkRateLimiterService.ts index d349e192e1..38c97b63df 100644 --- a/packages/backend/src/server/api/SkRateLimiterService.ts +++ b/packages/backend/src/server/api/SkRateLimiterService.ts @@ -55,15 +55,15 @@ export class SkRateLimiterService { private async limitLegacy(limit: Keyed, actor: string, factor: number): Promise { if (hasMaxLimit(limit)) { - return await this.limitMaxLegacy(limit, actor, factor); + return await this.limitLegacyMinMax(limit, actor, factor); } else if (hasMinLimit(limit)) { - return await this.limitMinLegacy(limit, actor, factor); + return await this.limitLegacyMinOnly(limit, actor, factor); } else { return disabledLimitInfo; } } - private async limitMaxLegacy(limit: Keyed, actor: string, factor: number): Promise { + private async limitLegacyMinMax(limit: Keyed, actor: string, factor: number): Promise { if (limit.duration === 0) return disabledLimitInfo; if (limit.duration < 0) throw new Error(`Invalid rate limit ${limit.key}: duration is negative (${limit.duration})`); if (limit.max < 1) throw new Error(`Invalid rate limit ${limit.key}: max is less than 1 (${limit.max})`); @@ -87,7 +87,7 @@ export class SkRateLimiterService { return await this.limitBucket(bucketLimit, actor, factor); } - private async limitMinLegacy(limit: Keyed, actor: string, factor: number): Promise { + private async limitLegacyMinOnly(limit: Keyed, actor: string, factor: number): Promise { if (limit.minInterval === 0) return disabledLimitInfo; if (limit.minInterval < 0) throw new Error(`Invalid rate limit ${limit.key}: minInterval is negative (${limit.minInterval})`);