Files
whale-town-end/src/business/location_broadcast/performance_monitor.middleware.ts
moyin cbf4120ddd refactor: 更新WebSocket相关测试和location_broadcast模块
- 更新location_broadcast网关以支持原生WebSocket
- 修改WebSocket认证守卫和中间件
- 更新相关的测试文件和规范
- 添加WebSocket测试工具
- 完善Zulip服务的测试覆盖

技术改进:
- 统一WebSocket实现架构
- 优化性能监控和限流中间件
- 更新测试用例以适配新的WebSocket实现
2026-01-09 17:02:43 +08:00

665 lines
21 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
/**
* 性能监控中间件
*
* 功能描述:
* - 监控WebSocket事件处理的性能指标
* - 收集响应时间、吞吐量等关键数据
* - 提供实时性能统计和报告
* - 支持性能预警和异常检测
*
* 职责分离:
* - 性能收集:记录事件处理的时间和资源消耗
* - 数据分析:计算平均值、百分位数等统计指标
* - 监控报警:检测性能异常和瓶颈
* - 报告生成:提供详细的性能分析报告
*
* 技术实现:
* - 高精度计时使用process.hrtime进行精确测量
* - 内存优化:循环缓冲区存储历史数据
* - 异步处理:不影响正常业务流程
* - 统计算法:实时计算各种性能指标
*
* 最近修改:
* - 2026-01-08: 代码重构 - 提取魔法数字为常量,优化代码质量 (修改者: moyin)
*
* @author moyin
* @version 1.1.0
* @since 2026-01-08
* @lastModified 2026-01-08
*/
import { Injectable, Logger } from '@nestjs/common';
/**
* 扩展的WebSocket接口
*/
interface ExtendedWebSocket extends WebSocket {
id: string;
userId?: string;
}
/**
* 性能指标接口
*/
interface PerformanceMetric {
/** 事件名称 */
eventName: string;
/** 处理时间(毫秒) */
duration: number;
/** 时间戳 */
timestamp: number;
/** 用户ID */
userId?: string;
/** Socket ID */
socketId: string;
/** 是否成功 */
success: boolean;
/** 错误信息 */
error?: string;
}
/**
* 事件统计信息
*/
export interface EventStats {
/** 事件名称 */
eventName: string;
/** 总请求数 */
totalRequests: number;
/** 成功请求数 */
successRequests: number;
/** 失败请求数 */
failedRequests: number;
/** 平均响应时间 */
avgDuration: number;
/** 最小响应时间 */
minDuration: number;
/** 最大响应时间 */
maxDuration: number;
/** 95百分位响应时间 */
p95Duration: number;
/** 99百分位响应时间 */
p99Duration: number;
/** 每秒请求数 */
requestsPerSecond: number;
/** 成功率 */
successRate: number;
}
/**
* 系统性能概览
*/
export interface SystemPerformance {
/** 总连接数 */
totalConnections: number;
/** 活跃连接数 */
activeConnections: number;
/** 总事件数 */
totalEvents: number;
/** 平均响应时间 */
avgResponseTime: number;
/** 系统吞吐量(事件/秒) */
throughput: number;
/** 错误率 */
errorRate: number;
/** 内存使用情况 */
memoryUsage: {
used: number;
total: number;
percentage: number;
};
/** 统计时间戳 */
timestamp: number;
}
/**
* 性能预警配置
*/
interface AlertConfig {
/** 响应时间阈值(毫秒) */
responseTimeThreshold: number;
/** 错误率阈值(百分比) */
errorRateThreshold: number;
/** 吞吐量下限 */
throughputThreshold: number;
/** 内存使用率阈值 */
memoryThreshold: number;
/** 是否启用预警 */
enabled: boolean;
}
@Injectable()
export class PerformanceMonitorMiddleware {
private readonly logger = new Logger(PerformanceMonitorMiddleware.name);
/** 性能指标缓存最大数量 */
private static readonly MAX_METRICS = 10000;
/** 统计更新间隔(毫秒) */
private static readonly STATS_UPDATE_INTERVAL = 10000;
/** 清理间隔(毫秒) */
private static readonly CLEANUP_INTERVAL = 300000;
/** 响应时间阈值(毫秒) */
private static readonly RESPONSE_TIME_THRESHOLD = 1000;
/** 错误率阈值(百分比) */
private static readonly ERROR_RATE_THRESHOLD = 5;
/** 吞吐量阈值(事件/秒) */
private static readonly THROUGHPUT_THRESHOLD = 10;
/** 内存使用率阈值(百分比) */
private static readonly MEMORY_THRESHOLD = 80;
/** 时间转换常量 */
private static readonly MILLISECONDS_PER_SECOND = 1000;
private static readonly SECONDS_PER_MINUTE = 60;
private static readonly MINUTES_PER_HOUR = 60;
private static readonly HOURS_PER_DAY = 24;
/** 百分位数计算常量 */
private static readonly PERCENTILE_95 = 95;
private static readonly PERCENTILE_99 = 99;
/** 精度计算常量 */
private static readonly PRECISION_MULTIPLIER = 100;
private static readonly HIGH_PRECISION_MULTIPLIER = 10000;
/** 内存单位转换 */
private static readonly BYTES_PER_KB = 1024;
private static readonly KB_PER_MB = 1024;
/** 性能趋势间隔(分钟) */
private static readonly TREND_INTERVAL_MINUTES = 5;
/** 窗口数据保留倍数 */
private static readonly WINDOW_RETENTION_MULTIPLIER = 10;
/** 报告默认时间范围(小时) */
private static readonly DEFAULT_REPORT_HOURS = 1;
/** 慢事件默认限制数量 */
private static readonly DEFAULT_SLOW_EVENTS_LIMIT = 10;
/** 性能指标缓存(循环缓冲区) */
private readonly metrics: PerformanceMetric[] = [];
private readonly maxMetrics = PerformanceMonitorMiddleware.MAX_METRICS;
private metricsIndex = 0;
/** 事件统计缓存 */
private readonly eventStats = new Map<string, EventStats>();
/** 连接统计 */
private connectionCount = 0;
private activeConnections = new Set<string>();
/** 预警配置 */
private alertConfig: AlertConfig = {
responseTimeThreshold: PerformanceMonitorMiddleware.RESPONSE_TIME_THRESHOLD,
errorRateThreshold: PerformanceMonitorMiddleware.ERROR_RATE_THRESHOLD,
throughputThreshold: PerformanceMonitorMiddleware.THROUGHPUT_THRESHOLD,
memoryThreshold: PerformanceMonitorMiddleware.MEMORY_THRESHOLD,
enabled: true,
};
constructor() {
// 定期更新统计信息
setInterval(() => {
this.updateEventStats();
this.checkAlerts();
}, PerformanceMonitorMiddleware.STATS_UPDATE_INTERVAL);
// 定期清理过期数据
setInterval(() => {
this.cleanupOldMetrics();
}, PerformanceMonitorMiddleware.CLEANUP_INTERVAL);
}
/**
* 开始监控事件处理
*
* @param eventName 事件名称
* @param client WebSocket客户端
* @returns 监控上下文
*/
startMonitoring(eventName: string, client: ExtendedWebSocket): { startTime: [number, number]; eventName: string; client: ExtendedWebSocket } {
const startTime = process.hrtime();
// 记录连接
this.activeConnections.add(client.id);
return { startTime, eventName, client };
}
/**
* 结束监控并记录指标
*
* @param context 监控上下文
* @param success 是否成功
* @param error 错误信息
*/
endMonitoring(
context: { startTime: [number, number]; eventName: string; client: ExtendedWebSocket },
success: boolean = true,
error?: string,
): void {
const endTime = process.hrtime(context.startTime);
const duration = endTime[0] * PerformanceMonitorMiddleware.MILLISECONDS_PER_SECOND + endTime[1] / (PerformanceMonitorMiddleware.MILLISECONDS_PER_SECOND * PerformanceMonitorMiddleware.MILLISECONDS_PER_SECOND);
const metric: PerformanceMetric = {
eventName: context.eventName,
duration,
timestamp: Date.now(),
userId: context.client.userId,
socketId: context.client.id,
success,
error,
};
this.recordMetric(metric);
}
/**
* 记录连接事件
*
* @param client WebSocket客户端
* @param connected 是否连接
*/
recordConnection(client: ExtendedWebSocket, connected: boolean): void {
if (connected) {
this.connectionCount++;
this.activeConnections.add(client.id);
} else {
this.activeConnections.delete(client.id);
}
this.logger.debug('连接状态变更', {
socketId: client.id,
connected,
totalConnections: this.connectionCount,
activeConnections: this.activeConnections.size,
});
}
/**
* 获取事件统计信息
*
* @param eventName 事件名称
* @returns 统计信息
*/
getEventStats(eventName?: string): EventStats[] {
if (eventName) {
const stats = this.eventStats.get(eventName);
return stats ? [stats] : [];
}
return Array.from(this.eventStats.values());
}
/**
* 获取系统性能概览
*
* @returns 系统性能信息
*/
getSystemPerformance(): SystemPerformance {
const now = Date.now();
const recentMetrics = this.getRecentMetrics(PerformanceMonitorMiddleware.SECONDS_PER_MINUTE * PerformanceMonitorMiddleware.MILLISECONDS_PER_SECOND); // 最近1分钟的数据
const totalEvents = recentMetrics.length;
const successfulEvents = recentMetrics.filter(m => m.success).length;
const avgResponseTime = totalEvents > 0
? recentMetrics.reduce((sum, m) => sum + m.duration, 0) / totalEvents
: 0;
const throughput = totalEvents / PerformanceMonitorMiddleware.SECONDS_PER_MINUTE; // 每秒事件数
const errorRate = totalEvents > 0 ? ((totalEvents - successfulEvents) / totalEvents) * PerformanceMonitorMiddleware.PRECISION_MULTIPLIER : 0;
// 获取内存使用情况
const memUsage = process.memoryUsage();
const memoryUsage = {
used: Math.round(memUsage.heapUsed / PerformanceMonitorMiddleware.BYTES_PER_KB / PerformanceMonitorMiddleware.KB_PER_MB), // MB
total: Math.round(memUsage.heapTotal / PerformanceMonitorMiddleware.BYTES_PER_KB / PerformanceMonitorMiddleware.KB_PER_MB), // MB
percentage: Math.round((memUsage.heapUsed / memUsage.heapTotal) * PerformanceMonitorMiddleware.PRECISION_MULTIPLIER),
};
return {
totalConnections: this.connectionCount,
activeConnections: this.activeConnections.size,
totalEvents,
avgResponseTime: Math.round(avgResponseTime * PerformanceMonitorMiddleware.PRECISION_MULTIPLIER) / PerformanceMonitorMiddleware.PRECISION_MULTIPLIER,
throughput: Math.round(throughput * PerformanceMonitorMiddleware.PRECISION_MULTIPLIER) / PerformanceMonitorMiddleware.PRECISION_MULTIPLIER,
errorRate: Math.round(errorRate * PerformanceMonitorMiddleware.PRECISION_MULTIPLIER) / PerformanceMonitorMiddleware.PRECISION_MULTIPLIER,
memoryUsage,
timestamp: now,
};
}
/**
* 获取性能报告
*
* @param timeRange 时间范围(毫秒)
* @returns 性能报告
*/
getPerformanceReport(timeRange: number = PerformanceMonitorMiddleware.DEFAULT_REPORT_HOURS * PerformanceMonitorMiddleware.MINUTES_PER_HOUR * PerformanceMonitorMiddleware.SECONDS_PER_MINUTE * PerformanceMonitorMiddleware.MILLISECONDS_PER_SECOND): any {
const metrics = this.getRecentMetrics(timeRange);
const eventGroups = this.groupMetricsByEvent(metrics);
const report = {
timeRange,
totalMetrics: metrics.length,
systemPerformance: this.getSystemPerformance(),
eventStats: this.getEventStats(),
topSlowEvents: this.getTopSlowEvents(metrics, PerformanceMonitorMiddleware.DEFAULT_SLOW_EVENTS_LIMIT),
errorSummary: this.getErrorSummary(metrics),
performanceTrends: this.getPerformanceTrends(metrics),
timestamp: Date.now(),
};
return report;
}
/**
* 更新预警配置
*
* @param config 新配置
*/
updateAlertConfig(config: Partial<AlertConfig>): void {
this.alertConfig = { ...this.alertConfig, ...config };
this.logger.log('性能预警配置已更新', {
config: this.alertConfig,
timestamp: new Date().toISOString(),
});
}
/**
* 清理性能数据
*/
clearMetrics(): void {
this.metrics.length = 0;
this.metricsIndex = 0;
this.eventStats.clear();
this.logger.log('性能监控数据已清理', {
timestamp: new Date().toISOString(),
});
}
/**
* 记录性能指标
*
* @param metric 性能指标
* @private
*/
private recordMetric(metric: PerformanceMetric): void {
// 使用循环缓冲区存储指标
this.metrics[this.metricsIndex] = metric;
this.metricsIndex = (this.metricsIndex + 1) % this.maxMetrics;
// 记录慢请求
if (metric.duration > this.alertConfig.responseTimeThreshold) {
this.logger.warn('检测到慢请求', {
eventName: metric.eventName,
duration: metric.duration,
userId: metric.userId,
socketId: metric.socketId,
threshold: this.alertConfig.responseTimeThreshold,
});
}
// 记录错误
if (!metric.success) {
this.logger.error('事件处理失败', {
eventName: metric.eventName,
error: metric.error,
userId: metric.userId,
socketId: metric.socketId,
duration: metric.duration,
});
}
}
/**
* 更新事件统计信息
*
* @private
*/
private updateEventStats(): void {
const recentMetrics = this.getRecentMetrics(60000); // 最近1分钟
const eventGroups = this.groupMetricsByEvent(recentMetrics);
for (const [eventName, metrics] of eventGroups.entries()) {
const durations = metrics.map(m => m.duration).sort((a, b) => a - b);
const successCount = metrics.filter(m => m.success).length;
const stats: EventStats = {
eventName,
totalRequests: metrics.length,
successRequests: successCount,
failedRequests: metrics.length - successCount,
avgDuration: Math.round((durations.reduce((sum, d) => sum + d, 0) / durations.length) * PerformanceMonitorMiddleware.PRECISION_MULTIPLIER) / PerformanceMonitorMiddleware.PRECISION_MULTIPLIER,
minDuration: durations[0] || 0,
maxDuration: durations[durations.length - 1] || 0,
p95Duration: this.getPercentile(durations, PerformanceMonitorMiddleware.PERCENTILE_95),
p99Duration: this.getPercentile(durations, PerformanceMonitorMiddleware.PERCENTILE_99),
requestsPerSecond: Math.round((metrics.length / PerformanceMonitorMiddleware.SECONDS_PER_MINUTE) * PerformanceMonitorMiddleware.PRECISION_MULTIPLIER) / PerformanceMonitorMiddleware.PRECISION_MULTIPLIER,
successRate: Math.round((successCount / metrics.length) * PerformanceMonitorMiddleware.HIGH_PRECISION_MULTIPLIER) / PerformanceMonitorMiddleware.PRECISION_MULTIPLIER,
};
this.eventStats.set(eventName, stats);
}
}
/**
* 检查性能预警
*
* @private
*/
private checkAlerts(): void {
if (!this.alertConfig.enabled) {
return;
}
const systemPerf = this.getSystemPerformance();
// 检查响应时间
if (systemPerf.avgResponseTime > this.alertConfig.responseTimeThreshold) {
this.logger.warn('响应时间过高预警', {
current: systemPerf.avgResponseTime,
threshold: this.alertConfig.responseTimeThreshold,
timestamp: new Date().toISOString(),
});
}
// 检查错误率
if (systemPerf.errorRate > this.alertConfig.errorRateThreshold) {
this.logger.warn('错误率过高预警', {
current: systemPerf.errorRate,
threshold: this.alertConfig.errorRateThreshold,
timestamp: new Date().toISOString(),
});
}
// 检查吞吐量
if (systemPerf.throughput < this.alertConfig.throughputThreshold) {
this.logger.warn('吞吐量过低预警', {
current: systemPerf.throughput,
threshold: this.alertConfig.throughputThreshold,
timestamp: new Date().toISOString(),
});
}
// 检查内存使用
if (systemPerf.memoryUsage.percentage > this.alertConfig.memoryThreshold) {
this.logger.warn('内存使用率过高预警', {
current: systemPerf.memoryUsage.percentage,
threshold: this.alertConfig.memoryThreshold,
used: systemPerf.memoryUsage.used,
total: systemPerf.memoryUsage.total,
timestamp: new Date().toISOString(),
});
}
}
/**
* 获取最近的性能指标
*
* @param timeRange 时间范围(毫秒)
* @returns 性能指标列表
* @private
*/
private getRecentMetrics(timeRange: number): PerformanceMetric[] {
const now = Date.now();
const cutoff = now - timeRange;
return this.metrics.filter(metric => metric && metric.timestamp > cutoff);
}
/**
* 按事件名称分组指标
*
* @param metrics 性能指标列表
* @returns 分组后的指标
* @private
*/
private groupMetricsByEvent(metrics: PerformanceMetric[]): Map<string, PerformanceMetric[]> {
const groups = new Map<string, PerformanceMetric[]>();
for (const metric of metrics) {
if (!groups.has(metric.eventName)) {
groups.set(metric.eventName, []);
}
groups.get(metric.eventName)!.push(metric);
}
return groups;
}
/**
* 计算百分位数
*
* @param values 数值数组(已排序)
* @param percentile 百分位数
* @returns 百分位值
* @private
*/
private getPercentile(values: number[], percentile: number): number {
if (values.length === 0) return 0;
const index = Math.ceil((percentile / PerformanceMonitorMiddleware.PRECISION_MULTIPLIER) * values.length) - 1;
return Math.round(values[Math.max(0, index)] * PerformanceMonitorMiddleware.PRECISION_MULTIPLIER) / PerformanceMonitorMiddleware.PRECISION_MULTIPLIER;
}
/**
* 获取最慢的事件
*
* @param metrics 性能指标
* @param limit 限制数量
* @returns 最慢事件列表
* @private
*/
private getTopSlowEvents(metrics: PerformanceMetric[], limit: number): PerformanceMetric[] {
return metrics
.sort((a, b) => b.duration - a.duration)
.slice(0, limit);
}
/**
* 获取错误摘要
*
* @param metrics 性能指标
* @returns 错误摘要
* @private
*/
private getErrorSummary(metrics: PerformanceMetric[]): any {
const errors = metrics.filter(m => !m.success);
const errorGroups = new Map<string, number>();
for (const error of errors) {
const key = error.error || 'Unknown Error';
errorGroups.set(key, (errorGroups.get(key) || 0) + 1);
}
return {
totalErrors: errors.length,
errorRate: metrics.length > 0 ? (errors.length / metrics.length) * PerformanceMonitorMiddleware.PRECISION_MULTIPLIER : 0,
errorTypes: Array.from(errorGroups.entries()).map(([error, count]) => ({ error, count })),
};
}
/**
* 获取性能趋势
*
* @param metrics 性能指标
* @returns 性能趋势数据
* @private
*/
private getPerformanceTrends(metrics: PerformanceMetric[]): any {
// 按5分钟间隔分组
const intervals = new Map<number, PerformanceMetric[]>();
const intervalSize = PerformanceMonitorMiddleware.TREND_INTERVAL_MINUTES * PerformanceMonitorMiddleware.SECONDS_PER_MINUTE * PerformanceMonitorMiddleware.MILLISECONDS_PER_SECOND;
for (const metric of metrics) {
const interval = Math.floor(metric.timestamp / intervalSize) * intervalSize;
if (!intervals.has(interval)) {
intervals.set(interval, []);
}
intervals.get(interval)!.push(metric);
}
return Array.from(intervals.entries()).map(([interval, intervalMetrics]) => ({
timestamp: interval,
avgDuration: intervalMetrics.reduce((sum, m) => sum + m.duration, 0) / intervalMetrics.length,
requestCount: intervalMetrics.length,
errorCount: intervalMetrics.filter(m => !m.success).length,
}));
}
/**
* 清理过期指标
*
* @private
*/
private cleanupOldMetrics(): void {
const cutoff = Date.now() - (PerformanceMonitorMiddleware.HOURS_PER_DAY * PerformanceMonitorMiddleware.MINUTES_PER_HOUR * PerformanceMonitorMiddleware.SECONDS_PER_MINUTE * PerformanceMonitorMiddleware.MILLISECONDS_PER_SECOND);
let cleanedCount = 0;
for (let i = 0; i < this.metrics.length; i++) {
if (this.metrics[i] && this.metrics[i].timestamp < cutoff) {
delete this.metrics[i];
cleanedCount++;
}
}
if (cleanedCount > 0) {
this.logger.debug('清理过期性能指标', {
cleanedCount,
remainingCount: this.metrics.filter(m => m).length,
timestamp: new Date().toISOString(),
});
}
}
}
/**
* 性能监控装饰器
*
* 使用示例:
* ```typescript
* @PerformanceMonitor('position_update')
* @SubscribeMessage('position_update')
* async handlePositionUpdate(@ConnectedSocket() client: AuthenticatedSocket, @MessageBody() message: PositionUpdateMessage) {
* // 处理位置更新
* }
* ```
*/
export function PerformanceMonitor(eventName?: string) {
return function (_target: any, propertyName: string, descriptor: PropertyDescriptor) {
const method = descriptor.value;
const finalEventName = eventName || propertyName;
descriptor.value = async function (...args: any[]) {
const client = args[0] as ExtendedWebSocket;
const performanceMonitor = new PerformanceMonitorMiddleware();
const context = performanceMonitor.startMonitoring(finalEventName, client);
try {
const result = await method.apply(this, args);
performanceMonitor.endMonitoring(context, true);
return result;
} catch (error) {
performanceMonitor.endMonitoring(context, false, error instanceof Error ? error.message : String(error));
throw error;
}
};
};
}