/** * 系统监控服务 * * 功能描述: * - 记录连接、API调用、消息转发日志 * - 实现操作确认机制 * - 系统资源监控和告警 * * 主要方法: * - logConnection(): 记录连接日志 * - logApiCall(): 记录API调用日志 * - logMessageForward(): 记录消息转发日志 * - confirmOperation(): 操作确认 * - checkSystemHealth(): 系统健康检查 * - sendAlert(): 发送告警 * * 使用场景: * - WebSocket连接管理监控 * - Zulip API调用监控 * - 消息转发性能监控 * - 系统资源告警 * * 依赖模块: * - AppLoggerService: 日志记录服务 * - ConfigService: 配置服务 * * @author angjustinl * @version 1.0.0 * @since 2025-12-25 */ import { Injectable, OnModuleInit, OnModuleDestroy, Logger } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; import { EventEmitter } from 'events'; /** * 连接事件类型 */ export enum ConnectionEventType { CONNECTED = 'connected', DISCONNECTED = 'disconnected', ERROR = 'error', TIMEOUT = 'timeout', } /** * API调用结果类型 */ export enum ApiCallResult { SUCCESS = 'success', FAILURE = 'failure', TIMEOUT = 'timeout', RATE_LIMITED = 'rate_limited', } /** * 告警级别 */ export enum AlertLevel { INFO = 'info', WARNING = 'warning', ERROR = 'error', CRITICAL = 'critical', } /** * 连接日志接口 */ export interface ConnectionLog { socketId: string; userId?: string; eventType: ConnectionEventType; timestamp: Date; duration?: number; error?: string; metadata?: Record; } /** * API调用日志接口 */ export interface ApiCallLog { operation: string; userId: string; result: ApiCallResult; responseTime: number; timestamp: Date; statusCode?: number; error?: string; metadata?: Record; } /** * 消息转发日志接口 */ export interface MessageForwardLog { messageId?: number; fromUserId: string; toUserIds: string[]; stream: string; topic: string; direction: 'upstream' | 'downstream'; success: boolean; latency: number; timestamp: Date; error?: string; } /** * 操作确认接口 */ export interface OperationConfirmation { operationId: string; operation: string; userId: string; success: boolean; timestamp: Date; details?: Record; } /** * 系统健康状态接口 */ export interface SystemHealthStatus { status: 'healthy' | 'degraded' | 'unhealthy'; components: { websocket: ComponentHealth; zulipApi: ComponentHealth; redis: ComponentHealth; memory: ComponentHealth; }; timestamp: Date; } /** * 组件健康状态接口 */ export interface ComponentHealth { status: 'healthy' | 'degraded' | 'unhealthy'; latency?: number; errorRate?: number; details?: Record; } /** * 告警接口 */ export interface Alert { id: string; level: AlertLevel; title: string; message: string; component: string; timestamp: Date; metadata?: Record; } /** * 监控统计接口 */ export interface MonitoringStats { connections: { total: number; active: number; errors: number; }; apiCalls: { total: number; success: number; failures: number; avgResponseTime: number; }; messages: { upstream: number; downstream: number; errors: number; avgLatency: number; }; alerts: { total: number; byLevel: Record; }; } /** * 监控服务类 * * 职责: * - 监控Zulip集成系统的运行状态 * - 收集和统计系统性能指标 * - 提供健康检查和告警功能 * - 生成系统监控报告 * * 主要方法: * - recordConnection(): 记录连接统计 * - recordApiCall(): 记录API调用统计 * - recordMessage(): 记录消息统计 * - triggerAlert(): 触发告警 * - getSystemStats(): 获取系统统计信息 * - performHealthCheck(): 执行健康检查 * * 使用场景: * - 系统性能监控和统计 * - 异常情况的告警通知 * - 系统健康状态检查 * - 运维数据的收集和分析 */ @Injectable() export class MonitoringService extends EventEmitter implements OnModuleInit, OnModuleDestroy { private readonly logger = new Logger(MonitoringService.name); // 统计数据 private connectionStats = { total: 0, active: 0, errors: 0 }; private apiCallStats = { total: 0, success: 0, failures: 0, totalResponseTime: 0 }; private messageStats = { upstream: 0, downstream: 0, errors: 0, totalLatency: 0 }; private alertStats: Record = { [AlertLevel.INFO]: 0, [AlertLevel.WARNING]: 0, [AlertLevel.ERROR]: 0, [AlertLevel.CRITICAL]: 0, }; // 最近的日志记录(用于分析) private recentApiCalls: ApiCallLog[] = []; private recentAlerts: Alert[] = []; private readonly maxRecentLogs = 100; // 健康检查间隔 private healthCheckInterval: NodeJS.Timeout | null = null; private readonly healthCheckIntervalMs: number; // 告警阈值 private readonly errorRateThreshold: number; private readonly responseTimeThreshold: number; private readonly memoryThreshold: number; constructor( private readonly configService: ConfigService, ) { super(); // 从配置读取阈值 this.healthCheckIntervalMs = this.configService.get('MONITORING_HEALTH_CHECK_INTERVAL', 60000); this.errorRateThreshold = this.configService.get('MONITORING_ERROR_RATE_THRESHOLD', 0.1); this.responseTimeThreshold = this.configService.get('MONITORING_RESPONSE_TIME_THRESHOLD', 5000); this.memoryThreshold = this.configService.get('MONITORING_MEMORY_THRESHOLD', 0.9); this.logger.log('MonitoringService初始化完成'); } /** * 模块初始化时启动健康检查 */ onModuleInit(): void { this.startHealthCheck(); } /** * 模块销毁时清理资源 */ onModuleDestroy(): void { if (this.healthCheckInterval) { clearInterval(this.healthCheckInterval); this.healthCheckInterval = null; } } /** * 记录连接日志 * * 功能描述: * 记录WebSocket连接建立、断开和异常日志 * * @param log 连接日志 */ logConnection(log: ConnectionLog): void { // 更新统计 switch (log.eventType) { case ConnectionEventType.CONNECTED: this.connectionStats.total++; this.connectionStats.active++; break; case ConnectionEventType.DISCONNECTED: this.connectionStats.active = Math.max(0, this.connectionStats.active - 1); break; case ConnectionEventType.ERROR: case ConnectionEventType.TIMEOUT: this.connectionStats.errors++; break; } // 记录日志 if (log.eventType === ConnectionEventType.ERROR) { this.logger.warn(`WebSocket连接事件: ${log.eventType}`, { operation: 'logConnection', socketId: log.socketId, userId: log.userId, eventType: log.eventType, duration: log.duration, error: log.error, ...log.metadata, timestamp: log.timestamp.toISOString(), }); } else { this.logger.log(`WebSocket连接事件: ${log.eventType}`); } // 发出事件 this.emit('connection_event', log); } /** * 记录API调用日志 * * 功能描述: * 记录Zulip API调用的响应时间和结果 * * @param log API调用日志 */ logApiCall(log: ApiCallLog): void { // 更新统计 this.apiCallStats.total++; this.apiCallStats.totalResponseTime += log.responseTime; if (log.result === ApiCallResult.SUCCESS) { this.apiCallStats.success++; } else { this.apiCallStats.failures++; } // 保存最近的调用记录 this.recentApiCalls.push(log); if (this.recentApiCalls.length > this.maxRecentLogs) { this.recentApiCalls.shift(); } // 记录日志 if (log.result === ApiCallResult.SUCCESS) { this.logger.log(`Zulip API调用: ${log.operation}`); } else { this.logger.warn(`Zulip API调用: ${log.operation}`, { operation: 'logApiCall', apiOperation: log.operation, userId: log.userId, result: log.result, responseTime: log.responseTime, statusCode: log.statusCode, error: log.error, ...log.metadata, timestamp: log.timestamp.toISOString(), }); } // 检查是否需要告警 if (log.responseTime > this.responseTimeThreshold) { this.sendAlert({ id: `api-slow-${Date.now()}`, level: AlertLevel.WARNING, title: 'API响应时间过长', message: `API调用 ${log.operation} 响应时间 ${log.responseTime}ms 超过阈值 ${this.responseTimeThreshold}ms`, component: 'zulip-api', timestamp: new Date(), metadata: { operation: log.operation, responseTime: log.responseTime }, }); } // 发出事件 this.emit('api_call', log); } /** * 记录消息转发日志 * * 功能描述: * 记录消息转发的成功率和延迟 * * @param log 消息转发日志 */ logMessageForward(log: MessageForwardLog): void { // 更新统计 if (log.direction === 'upstream') { this.messageStats.upstream++; } else { this.messageStats.downstream++; } if (!log.success) { this.messageStats.errors++; } this.messageStats.totalLatency += log.latency; // 记录日志 if (log.success) { this.logger.log(`消息转发: ${log.direction}`); } else { this.logger.warn(`消息转发: ${log.direction}`, { operation: 'logMessageForward', messageId: log.messageId, fromUserId: log.fromUserId, toUserCount: log.toUserIds.length, stream: log.stream, topic: log.topic, direction: log.direction, success: log.success, latency: log.latency, error: log.error, timestamp: log.timestamp.toISOString(), }); } // 发出事件 this.emit('message_forward', log); } /** * 操作确认 * * 功能描述: * 记录操作确认信息,用于审计和追踪 * * @param confirmation 操作确认信息 */ confirmOperation(confirmation: OperationConfirmation): void { this.logger.log(`操作确认: ${confirmation.operation}`); // 发出事件 this.emit('operation_confirmed', confirmation); } /** * 检查系统健康状态 * * 功能描述: * 检查各组件的健康状态,返回综合健康报告 * * @returns Promise 系统健康状态 */ async checkSystemHealth(): Promise { const components = { websocket: this.checkWebSocketHealth(), zulipApi: this.checkZulipApiHealth(), redis: await this.checkRedisHealth(), memory: this.checkMemoryHealth(), }; // 确定整体状态 const componentStatuses = Object.values(components).map(c => c.status); let overallStatus: 'healthy' | 'degraded' | 'unhealthy' = 'healthy'; if (componentStatuses.includes('unhealthy')) { overallStatus = 'unhealthy'; } else if (componentStatuses.includes('degraded')) { overallStatus = 'degraded'; } const healthStatus: SystemHealthStatus = { status: overallStatus, components, timestamp: new Date(), }; this.logger.debug('系统健康检查完成', { operation: 'checkSystemHealth', status: overallStatus, components: Object.fromEntries( Object.entries(components).map(([k, v]) => [k, v.status]) ), timestamp: new Date().toISOString(), }); // 如果状态不健康,发送告警 if (overallStatus !== 'healthy') { this.sendAlert({ id: `health-${Date.now()}`, level: overallStatus === 'unhealthy' ? AlertLevel.CRITICAL : AlertLevel.WARNING, title: '系统健康状态异常', message: `系统状态: ${overallStatus}`, component: 'system', timestamp: new Date(), metadata: { components }, }); } return healthStatus; } /** * 发送告警 * * 功能描述: * 发送系统告警通知 * * @param alert 告警信息 */ sendAlert(alert: Alert): void { // 更新统计 this.alertStats[alert.level]++; // 保存最近的告警 this.recentAlerts.push(alert); if (this.recentAlerts.length > this.maxRecentLogs) { this.recentAlerts.shift(); } // 根据级别选择日志方法 if (alert.level === AlertLevel.CRITICAL || alert.level === AlertLevel.ERROR) { this.logger.error(`系统告警: ${alert.title}`, { operation: 'sendAlert', alertId: alert.id, level: alert.level, title: alert.title, message: alert.message, component: alert.component, ...alert.metadata, timestamp: alert.timestamp.toISOString(), }); } else if (alert.level === AlertLevel.WARNING) { this.logger.warn(`系统告警: ${alert.title}`, { operation: 'sendAlert', alertId: alert.id, level: alert.level, title: alert.title, message: alert.message, component: alert.component, ...alert.metadata, timestamp: alert.timestamp.toISOString(), }); } else { this.logger.log(`系统告警: ${alert.title}`); } // 发出事件 this.emit('alert', alert); } /** * 获取监控统计信息 * * @returns MonitoringStats 监控统计 */ getStats(): MonitoringStats { const totalApiCalls = this.apiCallStats.total || 1; const totalMessages = this.messageStats.upstream + this.messageStats.downstream || 1; return { connections: { ...this.connectionStats }, apiCalls: { total: this.apiCallStats.total, success: this.apiCallStats.success, failures: this.apiCallStats.failures, avgResponseTime: this.apiCallStats.totalResponseTime / totalApiCalls, }, messages: { upstream: this.messageStats.upstream, downstream: this.messageStats.downstream, errors: this.messageStats.errors, avgLatency: this.messageStats.totalLatency / totalMessages, }, alerts: { total: Object.values(this.alertStats).reduce((a, b) => a + b, 0), byLevel: { ...this.alertStats }, }, }; } /** * 获取最近的告警 * * @param limit 返回数量限制 * @returns Alert[] 最近的告警列表 */ getRecentAlerts(limit: number = 10): Alert[] { return this.recentAlerts.slice(-limit); } /** * 重置统计数据 */ resetStats(): void { this.connectionStats = { total: 0, active: 0, errors: 0 }; this.apiCallStats = { total: 0, success: 0, failures: 0, totalResponseTime: 0 }; this.messageStats = { upstream: 0, downstream: 0, errors: 0, totalLatency: 0 }; this.alertStats = { [AlertLevel.INFO]: 0, [AlertLevel.WARNING]: 0, [AlertLevel.ERROR]: 0, [AlertLevel.CRITICAL]: 0, }; this.recentApiCalls = []; this.recentAlerts = []; this.logger.log('监控统计数据已重置'); } /** * 启动健康检查 * @private */ private startHealthCheck(): void { this.healthCheckInterval = setInterval(async () => { await this.checkSystemHealth(); }, this.healthCheckIntervalMs); this.logger.log('健康检查已启动'); } /** * 检查WebSocket健康状态 * @private */ private checkWebSocketHealth(): ComponentHealth { const errorRate = this.connectionStats.total > 0 ? this.connectionStats.errors / this.connectionStats.total : 0; let status: 'healthy' | 'degraded' | 'unhealthy' = 'healthy'; if (errorRate > this.errorRateThreshold * 2) { status = 'unhealthy'; } else if (errorRate > this.errorRateThreshold) { status = 'degraded'; } return { status, errorRate, details: { activeConnections: this.connectionStats.active, totalConnections: this.connectionStats.total, errors: this.connectionStats.errors, }, }; } /** * 检查Zulip API健康状态 * @private */ private checkZulipApiHealth(): ComponentHealth { const totalCalls = this.apiCallStats.total || 1; const errorRate = this.apiCallStats.failures / totalCalls; const avgResponseTime = this.apiCallStats.totalResponseTime / totalCalls; let status: 'healthy' | 'degraded' | 'unhealthy' = 'healthy'; if (errorRate > this.errorRateThreshold * 2 || avgResponseTime > this.responseTimeThreshold * 2) { status = 'unhealthy'; } else if (errorRate > this.errorRateThreshold || avgResponseTime > this.responseTimeThreshold) { status = 'degraded'; } return { status, latency: avgResponseTime, errorRate, details: { totalCalls: this.apiCallStats.total, successCalls: this.apiCallStats.success, failedCalls: this.apiCallStats.failures, }, }; } /** * 检查Redis健康状态 * @private */ private async checkRedisHealth(): Promise { // 简单的健康检查,实际应该ping Redis return { status: 'healthy', details: { note: 'Redis健康检查需要实际连接测试', }, }; } /** * 检查内存健康状态 * @private */ private checkMemoryHealth(): ComponentHealth { const memUsage = process.memoryUsage(); const heapUsedRatio = memUsage.heapUsed / memUsage.heapTotal; let status: 'healthy' | 'degraded' | 'unhealthy' = 'healthy'; if (heapUsedRatio > this.memoryThreshold) { status = 'unhealthy'; } else if (heapUsedRatio > this.memoryThreshold * 0.8) { status = 'degraded'; } return { status, details: { heapUsed: memUsage.heapUsed, heapTotal: memUsage.heapTotal, heapUsedRatio, rss: memUsage.rss, external: memUsage.external, }, }; } }