refactor:重构Zulip模块按业务功能模块化架构

- 将技术实现服务从business层迁移到core层
- 创建src/core/zulip/核心服务模块,包含API客户端、连接池等技术服务
- 保留src/business/zulip/业务逻辑,专注游戏相关的业务规则
- 通过依赖注入实现业务层与核心层的解耦
- 更新模块导入关系,确保架构分层清晰

重构后的架构符合单一职责原则,提高了代码的可维护性和可测试性
This commit is contained in:
moyin
2025-12-31 15:44:36 +08:00
parent 5140bd1a54
commit 2d10131838
36 changed files with 2773 additions and 125 deletions

View File

@@ -0,0 +1,705 @@
/**
* 系统监控服务
*
* 功能描述:
* - 记录连接、API调用、消息转发日志
* - 实现操作确认机制
* - 系统资源监控和告警
*
* 主要方法:
* - logConnection(): 记录连接日志
* - logApiCall(): 记录API调用日志
* - logMessageForward(): 记录消息转发日志
* - confirmOperation(): 操作确认
* - checkSystemHealth(): 系统健康检查
* - sendAlert(): 发送告警
*
* 使用场景:
* - WebSocket连接管理监控
* - Zulip API调用监控
* - 消息转发性能监控
* - 系统资源告警
*
* 依赖模块:
* - AppLoggerService: 日志记录服务
* - ConfigService: 配置服务
*
* @author angjustinl
* @version 1.0.0
* @since 2025-12-25
*/
import { Injectable, OnModuleInit, OnModuleDestroy, Logger } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { EventEmitter } from 'events';
/**
* 连接事件类型
*/
export enum ConnectionEventType {
CONNECTED = 'connected',
DISCONNECTED = 'disconnected',
ERROR = 'error',
TIMEOUT = 'timeout',
}
/**
* API调用结果类型
*/
export enum ApiCallResult {
SUCCESS = 'success',
FAILURE = 'failure',
TIMEOUT = 'timeout',
RATE_LIMITED = 'rate_limited',
}
/**
* 告警级别
*/
export enum AlertLevel {
INFO = 'info',
WARNING = 'warning',
ERROR = 'error',
CRITICAL = 'critical',
}
/**
* 连接日志接口
*/
export interface ConnectionLog {
socketId: string;
userId?: string;
eventType: ConnectionEventType;
timestamp: Date;
duration?: number;
error?: string;
metadata?: Record<string, any>;
}
/**
* API调用日志接口
*/
export interface ApiCallLog {
operation: string;
userId: string;
result: ApiCallResult;
responseTime: number;
timestamp: Date;
statusCode?: number;
error?: string;
metadata?: Record<string, any>;
}
/**
* 消息转发日志接口
*/
export interface MessageForwardLog {
messageId?: number;
fromUserId: string;
toUserIds: string[];
stream: string;
topic: string;
direction: 'upstream' | 'downstream';
success: boolean;
latency: number;
timestamp: Date;
error?: string;
}
/**
* 操作确认接口
*/
export interface OperationConfirmation {
operationId: string;
operation: string;
userId: string;
success: boolean;
timestamp: Date;
details?: Record<string, any>;
}
/**
* 系统健康状态接口
*/
export interface SystemHealthStatus {
status: 'healthy' | 'degraded' | 'unhealthy';
components: {
websocket: ComponentHealth;
zulipApi: ComponentHealth;
redis: ComponentHealth;
memory: ComponentHealth;
};
timestamp: Date;
}
/**
* 组件健康状态接口
*/
export interface ComponentHealth {
status: 'healthy' | 'degraded' | 'unhealthy';
latency?: number;
errorRate?: number;
details?: Record<string, any>;
}
/**
* 告警接口
*/
export interface Alert {
id: string;
level: AlertLevel;
title: string;
message: string;
component: string;
timestamp: Date;
metadata?: Record<string, any>;
}
/**
* 监控统计接口
*/
export interface MonitoringStats {
connections: {
total: number;
active: number;
errors: number;
};
apiCalls: {
total: number;
success: number;
failures: number;
avgResponseTime: number;
};
messages: {
upstream: number;
downstream: number;
errors: number;
avgLatency: number;
};
alerts: {
total: number;
byLevel: Record<AlertLevel, number>;
};
}
/**
* 监控服务类
*
* 职责:
* - 监控Zulip集成系统的运行状态
* - 收集和统计系统性能指标
* - 提供健康检查和告警功能
* - 生成系统监控报告
*
* 主要方法:
* - recordConnection(): 记录连接统计
* - recordApiCall(): 记录API调用统计
* - recordMessage(): 记录消息统计
* - triggerAlert(): 触发告警
* - getSystemStats(): 获取系统统计信息
* - performHealthCheck(): 执行健康检查
*
* 使用场景:
* - 系统性能监控和统计
* - 异常情况的告警通知
* - 系统健康状态检查
* - 运维数据的收集和分析
*/
@Injectable()
export class MonitoringService extends EventEmitter implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(MonitoringService.name);
// 统计数据
private connectionStats = { total: 0, active: 0, errors: 0 };
private apiCallStats = { total: 0, success: 0, failures: 0, totalResponseTime: 0 };
private messageStats = { upstream: 0, downstream: 0, errors: 0, totalLatency: 0 };
private alertStats: Record<AlertLevel, number> = {
[AlertLevel.INFO]: 0,
[AlertLevel.WARNING]: 0,
[AlertLevel.ERROR]: 0,
[AlertLevel.CRITICAL]: 0,
};
// 最近的日志记录(用于分析)
private recentApiCalls: ApiCallLog[] = [];
private recentAlerts: Alert[] = [];
private readonly maxRecentLogs = 100;
// 健康检查间隔
private healthCheckInterval: NodeJS.Timeout | null = null;
private readonly healthCheckIntervalMs: number;
// 告警阈值
private readonly errorRateThreshold: number;
private readonly responseTimeThreshold: number;
private readonly memoryThreshold: number;
constructor(
private readonly configService: ConfigService,
) {
super();
// 从配置读取阈值
this.healthCheckIntervalMs = this.configService.get<number>('MONITORING_HEALTH_CHECK_INTERVAL', 60000);
this.errorRateThreshold = this.configService.get<number>('MONITORING_ERROR_RATE_THRESHOLD', 0.1);
this.responseTimeThreshold = this.configService.get<number>('MONITORING_RESPONSE_TIME_THRESHOLD', 5000);
this.memoryThreshold = this.configService.get<number>('MONITORING_MEMORY_THRESHOLD', 0.9);
this.logger.log('MonitoringService初始化完成');
}
/**
* 模块初始化时启动健康检查
*/
onModuleInit(): void {
this.startHealthCheck();
}
/**
* 模块销毁时清理资源
*/
onModuleDestroy(): void {
if (this.healthCheckInterval) {
clearInterval(this.healthCheckInterval);
this.healthCheckInterval = null;
}
}
/**
* 记录连接日志
*
* 功能描述:
* 记录WebSocket连接建立、断开和异常日志
*
* @param log 连接日志
*/
logConnection(log: ConnectionLog): void {
// 更新统计
switch (log.eventType) {
case ConnectionEventType.CONNECTED:
this.connectionStats.total++;
this.connectionStats.active++;
break;
case ConnectionEventType.DISCONNECTED:
this.connectionStats.active = Math.max(0, this.connectionStats.active - 1);
break;
case ConnectionEventType.ERROR:
case ConnectionEventType.TIMEOUT:
this.connectionStats.errors++;
break;
}
// 记录日志
if (log.eventType === ConnectionEventType.ERROR) {
this.logger.warn(`WebSocket连接事件: ${log.eventType}`, {
operation: 'logConnection',
socketId: log.socketId,
userId: log.userId,
eventType: log.eventType,
duration: log.duration,
error: log.error,
...log.metadata,
timestamp: log.timestamp.toISOString(),
});
} else {
this.logger.log(`WebSocket连接事件: ${log.eventType}`);
}
// 发出事件
this.emit('connection_event', log);
}
/**
* 记录API调用日志
*
* 功能描述:
* 记录Zulip API调用的响应时间和结果
*
* @param log API调用日志
*/
logApiCall(log: ApiCallLog): void {
// 更新统计
this.apiCallStats.total++;
this.apiCallStats.totalResponseTime += log.responseTime;
if (log.result === ApiCallResult.SUCCESS) {
this.apiCallStats.success++;
} else {
this.apiCallStats.failures++;
}
// 保存最近的调用记录
this.recentApiCalls.push(log);
if (this.recentApiCalls.length > this.maxRecentLogs) {
this.recentApiCalls.shift();
}
// 记录日志
if (log.result === ApiCallResult.SUCCESS) {
this.logger.log(`Zulip API调用: ${log.operation}`);
} else {
this.logger.warn(`Zulip API调用: ${log.operation}`, {
operation: 'logApiCall',
apiOperation: log.operation,
userId: log.userId,
result: log.result,
responseTime: log.responseTime,
statusCode: log.statusCode,
error: log.error,
...log.metadata,
timestamp: log.timestamp.toISOString(),
});
}
// 检查是否需要告警
if (log.responseTime > this.responseTimeThreshold) {
this.sendAlert({
id: `api-slow-${Date.now()}`,
level: AlertLevel.WARNING,
title: 'API响应时间过长',
message: `API调用 ${log.operation} 响应时间 ${log.responseTime}ms 超过阈值 ${this.responseTimeThreshold}ms`,
component: 'zulip-api',
timestamp: new Date(),
metadata: { operation: log.operation, responseTime: log.responseTime },
});
}
// 发出事件
this.emit('api_call', log);
}
/**
* 记录消息转发日志
*
* 功能描述:
* 记录消息转发的成功率和延迟
*
* @param log 消息转发日志
*/
logMessageForward(log: MessageForwardLog): void {
// 更新统计
if (log.direction === 'upstream') {
this.messageStats.upstream++;
} else {
this.messageStats.downstream++;
}
if (!log.success) {
this.messageStats.errors++;
}
this.messageStats.totalLatency += log.latency;
// 记录日志
if (log.success) {
this.logger.log(`消息转发: ${log.direction}`);
} else {
this.logger.warn(`消息转发: ${log.direction}`, {
operation: 'logMessageForward',
messageId: log.messageId,
fromUserId: log.fromUserId,
toUserCount: log.toUserIds.length,
stream: log.stream,
topic: log.topic,
direction: log.direction,
success: log.success,
latency: log.latency,
error: log.error,
timestamp: log.timestamp.toISOString(),
});
}
// 发出事件
this.emit('message_forward', log);
}
/**
* 操作确认
*
* 功能描述:
* 记录操作确认信息,用于审计和追踪
*
* @param confirmation 操作确认信息
*/
confirmOperation(confirmation: OperationConfirmation): void {
this.logger.log(`操作确认: ${confirmation.operation}`);
// 发出事件
this.emit('operation_confirmed', confirmation);
}
/**
* 检查系统健康状态
*
* 功能描述:
* 检查各组件的健康状态,返回综合健康报告
*
* @returns Promise<SystemHealthStatus> 系统健康状态
*/
async checkSystemHealth(): Promise<SystemHealthStatus> {
const components = {
websocket: this.checkWebSocketHealth(),
zulipApi: this.checkZulipApiHealth(),
redis: await this.checkRedisHealth(),
memory: this.checkMemoryHealth(),
};
// 确定整体状态
const componentStatuses = Object.values(components).map(c => c.status);
let overallStatus: 'healthy' | 'degraded' | 'unhealthy' = 'healthy';
if (componentStatuses.includes('unhealthy')) {
overallStatus = 'unhealthy';
} else if (componentStatuses.includes('degraded')) {
overallStatus = 'degraded';
}
const healthStatus: SystemHealthStatus = {
status: overallStatus,
components,
timestamp: new Date(),
};
this.logger.debug('系统健康检查完成', {
operation: 'checkSystemHealth',
status: overallStatus,
components: Object.fromEntries(
Object.entries(components).map(([k, v]) => [k, v.status])
),
timestamp: new Date().toISOString(),
});
// 如果状态不健康,发送告警
if (overallStatus !== 'healthy') {
this.sendAlert({
id: `health-${Date.now()}`,
level: overallStatus === 'unhealthy' ? AlertLevel.CRITICAL : AlertLevel.WARNING,
title: '系统健康状态异常',
message: `系统状态: ${overallStatus}`,
component: 'system',
timestamp: new Date(),
metadata: { components },
});
}
return healthStatus;
}
/**
* 发送告警
*
* 功能描述:
* 发送系统告警通知
*
* @param alert 告警信息
*/
sendAlert(alert: Alert): void {
// 更新统计
this.alertStats[alert.level]++;
// 保存最近的告警
this.recentAlerts.push(alert);
if (this.recentAlerts.length > this.maxRecentLogs) {
this.recentAlerts.shift();
}
// 根据级别选择日志方法
if (alert.level === AlertLevel.CRITICAL || alert.level === AlertLevel.ERROR) {
this.logger.error(`系统告警: ${alert.title}`, {
operation: 'sendAlert',
alertId: alert.id,
level: alert.level,
title: alert.title,
message: alert.message,
component: alert.component,
...alert.metadata,
timestamp: alert.timestamp.toISOString(),
});
} else if (alert.level === AlertLevel.WARNING) {
this.logger.warn(`系统告警: ${alert.title}`, {
operation: 'sendAlert',
alertId: alert.id,
level: alert.level,
title: alert.title,
message: alert.message,
component: alert.component,
...alert.metadata,
timestamp: alert.timestamp.toISOString(),
});
} else {
this.logger.log(`系统告警: ${alert.title}`);
}
// 发出事件
this.emit('alert', alert);
}
/**
* 获取监控统计信息
*
* @returns MonitoringStats 监控统计
*/
getStats(): MonitoringStats {
const totalApiCalls = this.apiCallStats.total || 1;
const totalMessages = this.messageStats.upstream + this.messageStats.downstream || 1;
return {
connections: { ...this.connectionStats },
apiCalls: {
total: this.apiCallStats.total,
success: this.apiCallStats.success,
failures: this.apiCallStats.failures,
avgResponseTime: this.apiCallStats.totalResponseTime / totalApiCalls,
},
messages: {
upstream: this.messageStats.upstream,
downstream: this.messageStats.downstream,
errors: this.messageStats.errors,
avgLatency: this.messageStats.totalLatency / totalMessages,
},
alerts: {
total: Object.values(this.alertStats).reduce((a, b) => a + b, 0),
byLevel: { ...this.alertStats },
},
};
}
/**
* 获取最近的告警
*
* @param limit 返回数量限制
* @returns Alert[] 最近的告警列表
*/
getRecentAlerts(limit: number = 10): Alert[] {
return this.recentAlerts.slice(-limit);
}
/**
* 重置统计数据
*/
resetStats(): void {
this.connectionStats = { total: 0, active: 0, errors: 0 };
this.apiCallStats = { total: 0, success: 0, failures: 0, totalResponseTime: 0 };
this.messageStats = { upstream: 0, downstream: 0, errors: 0, totalLatency: 0 };
this.alertStats = {
[AlertLevel.INFO]: 0,
[AlertLevel.WARNING]: 0,
[AlertLevel.ERROR]: 0,
[AlertLevel.CRITICAL]: 0,
};
this.recentApiCalls = [];
this.recentAlerts = [];
this.logger.log('监控统计数据已重置');
}
/**
* 启动健康检查
* @private
*/
private startHealthCheck(): void {
this.healthCheckInterval = setInterval(async () => {
await this.checkSystemHealth();
}, this.healthCheckIntervalMs);
this.logger.log('健康检查已启动');
}
/**
* 检查WebSocket健康状态
* @private
*/
private checkWebSocketHealth(): ComponentHealth {
const errorRate = this.connectionStats.total > 0
? this.connectionStats.errors / this.connectionStats.total
: 0;
let status: 'healthy' | 'degraded' | 'unhealthy' = 'healthy';
if (errorRate > this.errorRateThreshold * 2) {
status = 'unhealthy';
} else if (errorRate > this.errorRateThreshold) {
status = 'degraded';
}
return {
status,
errorRate,
details: {
activeConnections: this.connectionStats.active,
totalConnections: this.connectionStats.total,
errors: this.connectionStats.errors,
},
};
}
/**
* 检查Zulip API健康状态
* @private
*/
private checkZulipApiHealth(): ComponentHealth {
const totalCalls = this.apiCallStats.total || 1;
const errorRate = this.apiCallStats.failures / totalCalls;
const avgResponseTime = this.apiCallStats.totalResponseTime / totalCalls;
let status: 'healthy' | 'degraded' | 'unhealthy' = 'healthy';
if (errorRate > this.errorRateThreshold * 2 || avgResponseTime > this.responseTimeThreshold * 2) {
status = 'unhealthy';
} else if (errorRate > this.errorRateThreshold || avgResponseTime > this.responseTimeThreshold) {
status = 'degraded';
}
return {
status,
latency: avgResponseTime,
errorRate,
details: {
totalCalls: this.apiCallStats.total,
successCalls: this.apiCallStats.success,
failedCalls: this.apiCallStats.failures,
},
};
}
/**
* 检查Redis健康状态
* @private
*/
private async checkRedisHealth(): Promise<ComponentHealth> {
// 简单的健康检查实际应该ping Redis
return {
status: 'healthy',
details: {
note: 'Redis健康检查需要实际连接测试',
},
};
}
/**
* 检查内存健康状态
* @private
*/
private checkMemoryHealth(): ComponentHealth {
const memUsage = process.memoryUsage();
const heapUsedRatio = memUsage.heapUsed / memUsage.heapTotal;
let status: 'healthy' | 'degraded' | 'unhealthy' = 'healthy';
if (heapUsedRatio > this.memoryThreshold) {
status = 'unhealthy';
} else if (heapUsedRatio > this.memoryThreshold * 0.8) {
status = 'degraded';
}
return {
status,
details: {
heapUsed: memUsage.heapUsed,
heapTotal: memUsage.heapTotal,
heapUsedRatio,
rss: memUsage.rss,
external: memUsage.external,
},
};
}
}