Files
whale-town-end/src/business/location_broadcast/rate_limit.middleware.ts
moyin cbf4120ddd refactor: 更新WebSocket相关测试和location_broadcast模块
- 更新location_broadcast网关以支持原生WebSocket
- 修改WebSocket认证守卫和中间件
- 更新相关的测试文件和规范
- 添加WebSocket测试工具
- 完善Zulip服务的测试覆盖

技术改进:
- 统一WebSocket实现架构
- 优化性能监控和限流中间件
- 更新测试用例以适配新的WebSocket实现
2026-01-09 17:02:43 +08:00

357 lines
8.7 KiB
TypeScript

/**
* 位置更新频率限制中间件
*
* 功能描述:
* - 限制用户位置更新的频率,防止过度请求
* - 基于用户ID和时间窗口的限流算法
* - 支持动态配置和监控统计
* - 提供优雅的限流响应和错误处理
*
* 职责分离:
* - 频率控制:实现基于时间窗口的请求限制
* - 用户隔离:每个用户独立的限流计数
* - 配置管理:支持动态调整限流参数
* - 监控统计:记录限流事件和性能指标
*
* 技术实现:
* - 滑动窗口算法:精确控制请求频率
* - 内存缓存:高性能的计数器存储
* - 异步处理:不阻塞正常请求流程
* - 错误恢复:处理异常情况的降级策略
*
* 最近修改:
* - 2026-01-08: 代码重构 - 提取魔法数字为常量,优化代码质量 (修改者: moyin)
*
* @author moyin
* @version 1.1.0
* @since 2026-01-08
* @lastModified 2026-01-08
*/
import { Injectable, Logger } from '@nestjs/common';
/**
* 扩展的WebSocket接口
*/
interface ExtendedWebSocket extends WebSocket {
id: string;
userId?: string;
}
/**
* 限流配置接口
*/
interface RateLimitConfig {
/** 时间窗口(毫秒) */
windowMs: number;
/** 窗口内最大请求数 */
maxRequests: number;
/** 是否启用限流 */
enabled: boolean;
/** 限流消息 */
message: string;
}
/**
* 用户限流状态
*/
interface UserRateLimit {
/** 请求时间戳列表 */
requests: number[];
/** 最后更新时间 */
lastUpdate: number;
/** 总请求数 */
totalRequests: number;
/** 被限流次数 */
limitedCount: number;
}
/**
* 限流统计信息
*/
export interface RateLimitStats {
/** 总请求数 */
totalRequests: number;
/** 被限流请求数 */
limitedRequests: number;
/** 活跃用户数 */
activeUsers: number;
/** 限流率 */
limitRate: number;
/** 统计时间戳 */
timestamp: number;
}
@Injectable()
export class RateLimitMiddleware {
private readonly logger = new Logger(RateLimitMiddleware.name);
/** 默认时间窗口(毫秒) */
private static readonly DEFAULT_WINDOW_MS = 1000;
/** 默认最大请求数 */
private static readonly DEFAULT_MAX_REQUESTS = 10;
/** 清理间隔(毫秒) */
private static readonly CLEANUP_INTERVAL = 60000;
/** 统计更新间隔(毫秒) */
private static readonly STATS_UPDATE_INTERVAL = 10000;
/** 窗口数据保留倍数 */
private static readonly WINDOW_RETENTION_MULTIPLIER = 10;
/** 时间转换常量 */
private static readonly MILLISECONDS_PER_SECOND = 1000;
/** 用户限流状态缓存 */
private readonly userLimits = new Map<string, UserRateLimit>();
/** 默认配置 */
private config: RateLimitConfig = {
windowMs: RateLimitMiddleware.DEFAULT_WINDOW_MS,
maxRequests: RateLimitMiddleware.DEFAULT_MAX_REQUESTS,
enabled: true,
message: '位置更新频率过高,请稍后重试',
};
/** 统计信息 */
private stats: RateLimitStats = {
totalRequests: 0,
limitedRequests: 0,
activeUsers: 0,
limitRate: 0,
timestamp: Date.now(),
};
constructor() {
// 定期清理过期的限流记录
setInterval(() => {
this.cleanupExpiredRecords();
}, RateLimitMiddleware.CLEANUP_INTERVAL);
// 定期更新统计信息
setInterval(() => {
this.updateStats();
}, RateLimitMiddleware.STATS_UPDATE_INTERVAL);
}
/**
* 检查用户是否被限流
*
* @param userId 用户ID
* @param socketId Socket连接ID
* @returns 是否允许请求
*/
checkRateLimit(userId: string, socketId: string): boolean {
if (!this.config.enabled) {
return true;
}
const now = Date.now();
this.stats.totalRequests++;
// 获取或创建用户限流状态
let userLimit = this.userLimits.get(userId);
if (!userLimit) {
userLimit = {
requests: [],
lastUpdate: now,
totalRequests: 0,
limitedCount: 0,
};
this.userLimits.set(userId, userLimit);
}
// 清理过期的请求记录
const windowStart = now - this.config.windowMs;
userLimit.requests = userLimit.requests.filter(timestamp => timestamp > windowStart);
// 检查是否超过限制
if (userLimit.requests.length >= this.config.maxRequests) {
userLimit.limitedCount++;
this.stats.limitedRequests++;
this.logger.warn('用户位置更新被限流', {
userId,
socketId,
requestCount: userLimit.requests.length,
maxRequests: this.config.maxRequests,
windowMs: this.config.windowMs,
timestamp: new Date().toISOString(),
});
return false;
}
// 记录请求
userLimit.requests.push(now);
userLimit.totalRequests++;
userLimit.lastUpdate = now;
return true;
}
/**
* 处理限流异常
*
* @param client WebSocket客户端
* @param userId 用户ID
*/
handleRateLimit(client: ExtendedWebSocket, userId: string): void {
const error = {
type: 'error',
code: 'RATE_LIMIT_EXCEEDED',
message: this.config.message,
details: {
windowMs: this.config.windowMs,
maxRequests: this.config.maxRequests,
retryAfter: Math.ceil(this.config.windowMs / RateLimitMiddleware.MILLISECONDS_PER_SECOND),
},
timestamp: Date.now(),
};
if (client.readyState === WebSocket.OPEN) {
client.send(JSON.stringify({ event: 'error', data: error }));
}
this.logger.debug('发送限流错误响应', {
userId,
socketId: client.id,
error,
});
}
/**
* 获取用户限流状态
*
* @param userId 用户ID
* @returns 用户限流状态
*/
getUserRateLimit(userId: string): UserRateLimit | null {
return this.userLimits.get(userId) || null;
}
/**
* 获取限流统计信息
*
* @returns 统计信息
*/
getStats(): RateLimitStats {
return { ...this.stats };
}
/**
* 更新限流配置
*
* @param newConfig 新配置
*/
updateConfig(newConfig: Partial<RateLimitConfig>): void {
this.config = { ...this.config, ...newConfig };
this.logger.log('限流配置已更新', {
config: this.config,
timestamp: new Date().toISOString(),
});
}
/**
* 重置用户限流状态
*
* @param userId 用户ID
*/
resetUserLimit(userId: string): void {
this.userLimits.delete(userId);
this.logger.debug('重置用户限流状态', {
userId,
timestamp: new Date().toISOString(),
});
}
/**
* 清理所有限流记录
*/
clearAllLimits(): void {
this.userLimits.clear();
this.stats = {
totalRequests: 0,
limitedRequests: 0,
activeUsers: 0,
limitRate: 0,
timestamp: Date.now(),
};
this.logger.log('清理所有限流记录', {
timestamp: new Date().toISOString(),
});
}
/**
* 清理过期的限流记录
*
* @private
*/
private cleanupExpiredRecords(): void {
const now = Date.now();
const expireTime = now - (this.config.windowMs * RateLimitMiddleware.WINDOW_RETENTION_MULTIPLIER);
let cleanedCount = 0;
for (const [userId, userLimit] of this.userLimits.entries()) {
if (userLimit.lastUpdate < expireTime) {
this.userLimits.delete(userId);
cleanedCount++;
}
}
if (cleanedCount > 0) {
this.logger.debug('清理过期限流记录', {
cleanedCount,
remainingUsers: this.userLimits.size,
timestamp: new Date().toISOString(),
});
}
}
/**
* 更新统计信息
*
* @private
*/
private updateStats(): void {
this.stats.activeUsers = this.userLimits.size;
this.stats.limitRate = this.stats.totalRequests > 0
? (this.stats.limitedRequests / this.stats.totalRequests) * 100
: 0;
this.stats.timestamp = Date.now();
}
}
/**
* 位置更新限流装饰器
*
* 使用示例:
* ```typescript
* @PositionUpdateRateLimit()
* @SubscribeMessage('position_update')
* async handlePositionUpdate(@ConnectedSocket() client: AuthenticatedSocket, @MessageBody() message: PositionUpdateMessage) {
* // 处理位置更新
* }
* ```
*/
export function PositionUpdateRateLimit() {
return function (_target: any, _propertyName: string, descriptor: PropertyDescriptor) {
const method = descriptor.value;
descriptor.value = async function (...args: any[]) {
const client = args[0] as ExtendedWebSocket;
const rateLimitMiddleware = new RateLimitMiddleware();
if (client.userId) {
const allowed = rateLimitMiddleware.checkRateLimit(client.userId, client.id);
if (!allowed) {
rateLimitMiddleware.handleRateLimit(client, client.userId);
return;
}
}
return method.apply(this, args);
};
};
}