/** * Zulip事件处理服务 * * 功能描述: * - 实现事件队列轮询机制 * - 处理Zulip消息事件和格式转换 * - 实现空间过滤和消息分发 * - 支持区域广播功能 * * 主要方法: * - startEventProcessing(): 启动事件处理循环 * - processMessageEvent(): 处理Zulip消息事件 * - convertMessageFormat(): 消息格式转换 * - distributeMessage(): 消息分发机制 * - determineTargetPlayers(): 空间过滤确定目标玩家 * * 使用场景: * - 后台异步处理Zulip事件 * - 消息格式转换和路由 * - 向游戏客户端分发消息 * * 依赖模块: * - SessionManagerService: 会话管理服务 * - ConfigManagerService: 配置管理服务 * - ZulipClientPoolService: Zulip客户端池服务 * - AppLoggerService: 日志记录服务 * * @author 开发团队 * @version 1.0.0 * @since 2025-12-25 */ import { Injectable, OnModuleDestroy, Inject, forwardRef, Logger } from '@nestjs/common'; import { SessionManagerService } from './session-manager.service'; import { ConfigManagerService } from './config-manager.service'; import { ZulipClientPoolService } from './zulip-client-pool.service'; /** * Zulip消息接口 */ export interface ZulipMessage { id: number; // 消息ID sender_email: string; // 发送者邮箱 sender_full_name: string; // 发送者全名 content: string; // 消息内容 stream_id: number; // Stream ID subject: string; // Topic名称 timestamp: number; // 时间戳 display_recipient?: string | any[]; // Stream名称或私信接收者 type?: string; // 消息类型 (stream/private) } /** * Zulip事件接口 */ export interface ZulipEvent { type: string; // 事件类型 message?: ZulipMessage; // 消息内容(仅message事件) queue_id?: string; // 队列ID id?: number; // 事件ID } /** * 游戏消息接口 - 按guide.md格式 */ export interface GameMessage { t: 'chat_render'; from: string; txt: string; bubble: boolean; timestamp?: number; // 可选时间戳 streamName?: string; // 可选Stream名称 topic?: string; // 可选Topic名称 } /** * 消息分发回调接口 */ export interface MessageDistributor { sendChatRender(socketId: string, from: string, txt: string, bubble: boolean): void; broadcastToMap(mapId: string, event: string, data: any): Promise; } /** * 事件处理统计信息接口 */ export interface EventProcessingStats { isActive: boolean; activeQueues: number; totalQueues: number; queueIds: string[]; processedEvents: number; processedMessages: number; lastEventTime?: Date; } @Injectable() export class ZulipEventProcessorService implements OnModuleDestroy { private readonly logger = new Logger(ZulipEventProcessorService.name); private processingActive = false; private eventQueues = new Map(); private messageDistributor: MessageDistributor | null = null; private processedEvents = 0; private processedMessages = 0; private lastEventTime: Date | null = null; private pollingInterval: NodeJS.Timeout | null = null; private readonly POLLING_INTERVAL_MS = 2000; // 2秒轮询间隔 private readonly MAX_EVENTS_PER_POLL = 100; constructor( private readonly sessionManager: SessionManagerService, private readonly configManager: ConfigManagerService, @Inject(forwardRef(() => ZulipClientPoolService)) private readonly clientPool: ZulipClientPoolService, ) { this.logger.log('ZulipEventProcessorService初始化完成'); } /** * 模块销毁时停止事件处理 */ async onModuleDestroy(): Promise { this.logger.log('ZulipEventProcessorService模块销毁,停止事件处理'); await this.stopEventProcessing(); } /** * 设置消息分发器 * * 功能描述: * 设置用于向游戏客户端发送消息的分发器接口 * * @param distributor 消息分发器实例 */ setMessageDistributor(distributor: MessageDistributor): void { this.messageDistributor = distributor; this.logger.log('消息分发器已设置'); } /** * 启动事件处理循环 * * 功能描述: * 启动后台事件处理循环,监听所有活跃的Zulip事件队列 * * 业务逻辑: * 1. 初始化事件处理状态 * 2. 启动轮询循环 * 3. 处理接收到的事件 * 4. 错误处理和重连机制 * * @returns Promise */ async startEventProcessing(): Promise { if (this.processingActive) { this.logger.warn('事件处理已在运行', { operation: 'startEventProcessing', }); return; } this.processingActive = true; this.logger.log('启动Zulip事件处理'); try { // 启动定时轮询 this.pollingInterval = setInterval( () => this.eventProcessingLoop(), this.POLLING_INTERVAL_MS ); // 立即执行一次 await this.eventProcessingLoop(); this.logger.log('事件处理循环已启动'); } catch (error) { const err = error as Error; this.logger.error('启动事件处理失败', { operation: 'startEventProcessing', error: err.message, timestamp: new Date().toISOString(), }, err.stack); this.processingActive = false; } } /** * 停止事件处理循环 * * @returns Promise */ async stopEventProcessing(): Promise { this.logger.log('停止Zulip事件处理'); this.processingActive = false; // 清除定时器 if (this.pollingInterval) { clearInterval(this.pollingInterval); this.pollingInterval = null; } this.eventQueues.clear(); this.logger.log('事件处理已停止'); } /** * 注册事件队列 * * 功能描述: * 将新的事件队列添加到处理列表中 * * @param queueId 事件队列ID * @param userId 用户ID * @param lastEventId 最后处理的事件ID(默认-1) * @returns Promise */ async registerEventQueue(queueId: string, userId: string, lastEventId: number = -1): Promise { this.logger.log(`注册事件队列: ${queueId}`); this.eventQueues.set(queueId, { userId, isActive: true, lastEventId, }); } /** * 注销事件队列 * * @param queueId 事件队列ID * @returns Promise */ async unregisterEventQueue(queueId: string): Promise { this.logger.log(`注销事件队列: ${queueId}`); this.eventQueues.delete(queueId); } /** * 事件处理循环 * * 功能描述: * 轮询所有注册的事件队列,处理接收到的事件 * * @private */ private async eventProcessingLoop(): Promise { if (!this.processingActive) { return; } try { // 获取所有活跃的事件队列 const activeQueues = Array.from(this.eventQueues.entries()) .filter(([, info]) => info.isActive); if (activeQueues.length === 0) { return; } // 并发处理所有队列 await Promise.all( activeQueues.map(([queueId, info]) => this.pollEventQueue(queueId, info.userId, info.lastEventId) ) ); } catch (error) { const err = error as Error; this.logger.error('事件处理循环异常', { operation: 'eventProcessingLoop', error: err.message, timestamp: new Date().toISOString(), }, err.stack); } } /** * 轮询单个事件队列 * * 功能描述: * 从Zulip服务器获取指定队列的新事件并处理 * * @param queueId 事件队列ID * @param userId 用户ID * @param lastEventId 最后处理的事件ID * @private */ private async pollEventQueue(queueId: string, userId: string, lastEventId: number): Promise { try { // 获取用户的Zulip客户端 const client = await this.clientPool.getUserClient(userId); if (!client) { this.logger.debug('用户Zulip客户端不存在,跳过轮询', { operation: 'pollEventQueue', queueId, userId, }); return; } // 调用Zulip API获取事件 // 注意:这里使用非阻塞模式,避免长时间等待 const events = await this.fetchEventsFromClient(client, queueId, lastEventId); if (!events || events.length === 0) { return; } // 处理每个事件 for (const event of events) { await this.processEvent(event, userId); // 更新最后处理的事件ID if (event.id !== undefined) { const queueInfo = this.eventQueues.get(queueId); if (queueInfo) { queueInfo.lastEventId = event.id; } } } this.processedEvents += events.length; this.lastEventTime = new Date(); } catch (error) { const err = error as Error; this.logger.error('轮询事件队列失败', { operation: 'pollEventQueue', queueId, userId, error: err.message, timestamp: new Date().toISOString(), }, err.stack); // 如果队列出现持续错误,暂时禁用 if (this.isQueueError(error)) { const queueInfo = this.eventQueues.get(queueId); if (queueInfo) { queueInfo.isActive = false; this.logger.warn('事件队列已暂时禁用', { operation: 'pollEventQueue', queueId, userId, }); } } } } /** * 从Zulip客户端获取事件 * * @param client Zulip客户端实例 * @param queueId 队列ID * @param lastEventId 最后事件ID * @returns Promise 事件列表 * @private */ private async fetchEventsFromClient( client: any, queueId: string, lastEventId: number ): Promise { try { // 检查客户端是否有zulipClient实例 if (!client.zulipClient) { return []; } // 调用zulip-js的events.retrieve方法 const result = await client.zulipClient.events.retrieve({ queue_id: queueId, last_event_id: lastEventId, dont_block: true, // 非阻塞模式 }); if (result.result === 'success' && result.events) { return result.events as ZulipEvent[]; } return []; } catch (error) { const err = error as Error; this.logger.debug('获取事件失败', { operation: 'fetchEventsFromClient', queueId, error: err.message, }); return []; } } /** * 处理单个事件 * * 功能描述: * 根据事件类型分发到对应的处理方法 * * @param event Zulip事件 * @param userId 用户ID * @private */ private async processEvent(event: ZulipEvent, userId: string): Promise { this.logger.debug('处理Zulip事件', { operation: 'processEvent', eventType: event.type, eventId: event.id, userId, timestamp: new Date().toISOString(), }); try { switch (event.type) { case 'message': if (event.message) { await this.processMessageEvent(event, userId); } break; case 'heartbeat': // 心跳事件,忽略 break; default: this.logger.debug('忽略未处理的事件类型', { operation: 'processEvent', eventType: event.type, eventId: event.id, }); } } catch (error) { const err = error as Error; this.logger.error('处理事件失败', { operation: 'processEvent', eventType: event.type, eventId: event.id, userId, error: err.message, timestamp: new Date().toISOString(), }, err.stack); } } /** * 处理Zulip消息事件 * * 功能描述: * 处理从Zulip接收的消息事件,转换格式后分发给相关的游戏客户端 * * 业务逻辑: * 1. 解析消息内容和元数据 * 2. 确定目标玩家(空间过滤) * 3. 转换消息格式 * 4. 分发给游戏客户端 * * @param event Zulip消息事件 * @param senderUserId 发送者用户ID(用于排除自己发送的消息) * @returns Promise */ async processMessageEvent(event: ZulipEvent, senderUserId: string): Promise { const message = event.message; if (!message) { this.logger.warn('消息事件缺少消息内容', { operation: 'processMessageEvent', eventId: event.id, }); return; } this.logger.log(`处理Zulip消息事件: ${message.id}`); try { // 1. 获取Stream名称 const streamName = this.getStreamName(message); if (!streamName) { this.logger.debug('无法确定Stream名称,跳过消息', { operation: 'processMessageEvent', messageId: message.id, }); return; } // 2. 确定目标玩家(空间过滤) const targetPlayers = await this.determineTargetPlayers(message, streamName, senderUserId); if (targetPlayers.length === 0) { this.logger.debug('没有目标玩家,跳过消息分发', { operation: 'processMessageEvent', messageId: message.id, streamName, }); return; } // 3. 转换消息格式 const gameMessage = await this.convertMessageFormat(message, streamName); // 4. 分发消息给目标玩家 await this.distributeMessage(gameMessage, targetPlayers); this.processedMessages++; this.logger.log(`Zulip消息处理完成: ${message.id}`); } catch (error) { const err = error as Error; this.logger.error('处理Zulip消息事件失败', { operation: 'processMessageEvent', messageId: message.id, error: err.message, timestamp: new Date().toISOString(), }, err.stack); } } /** * 获取消息的Stream名称 * * @param message Zulip消息 * @returns string | null Stream名称 * @private */ private getStreamName(message: ZulipMessage): string | null { // 检查消息类型 if (message.type === 'private') { // 私信消息,暂不处理 return null; } // 从display_recipient获取Stream名称 if (typeof message.display_recipient === 'string') { return message.display_recipient; } // 如果display_recipient是数组(私信),返回null if (Array.isArray(message.display_recipient)) { return null; } return null; } /** * 确定目标玩家 * * 功能描述: * 根据消息的Stream确定应该接收消息的玩家(空间过滤) * * 业务逻辑: * 1. 根据Stream名称确定对应的地图 * 2. 从SessionManager获取该地图的所有玩家 * 3. 排除消息发送者(避免收到自己的消息) * * @param message Zulip消息 * @param streamName Stream名称 * @param senderUserId 发送者用户ID * @returns Promise 目标玩家Socket ID列表 */ async determineTargetPlayers( message: ZulipMessage, streamName: string, senderUserId: string ): Promise { try { // 1. 根据Stream名称确定对应的地图 const mapId = this.configManager.getMapIdByStream(streamName); if (!mapId) { this.logger.debug('未找到Stream对应的地图', { operation: 'determineTargetPlayers', streamName, messageId: message.id, }); return []; } // 2. 从SessionManager获取该地图的所有玩家Socket ID const socketIds = await this.sessionManager.getSocketsInMap(mapId); if (socketIds.length === 0) { this.logger.debug('地图中没有在线玩家', { operation: 'determineTargetPlayers', mapId, streamName, }); return []; } // 3. 排除消息发送者 const filteredSocketIds: string[] = []; for (const socketId of socketIds) { const session = await this.sessionManager.getSession(socketId); if (session && session.userId !== senderUserId) { filteredSocketIds.push(socketId); } } this.logger.debug('确定目标玩家完成', { operation: 'determineTargetPlayers', mapId, streamName, totalPlayers: socketIds.length, targetPlayers: filteredSocketIds.length, }); return filteredSocketIds; } catch (error) { const err = error as Error; this.logger.error('确定目标玩家失败', { operation: 'determineTargetPlayers', messageId: message.id, streamName, error: err.message, }); return []; } } /** * 消息格式转换 * * 功能描述: * 将Zulip消息转换为游戏协议格式(按guide.md格式) * * 业务逻辑: * 1. 提取发送者信息 * 2. 处理消息内容(Markdown转换等) * 3. 生成游戏协议消息 * 4. 确保包含所有必需信息(发送者、内容、时间戳) * * @param zulipMessage Zulip消息对象 * @param streamName Stream名称(可选) * @returns Promise 游戏协议消息 */ async convertMessageFormat(zulipMessage: ZulipMessage, streamName?: string): Promise { this.logger.debug('开始消息格式转换', { operation: 'convertMessageFormat', messageId: zulipMessage.id, sender: zulipMessage.sender_email, timestamp: new Date().toISOString(), }); try { // 1. 提取发送者名称 let senderName = zulipMessage.sender_full_name; if (!senderName || senderName.trim().length === 0) { // 从邮箱提取用户名 senderName = zulipMessage.sender_email.split('@')[0]; } // 2. 处理消息内容 let content = zulipMessage.content; // 移除Markdown格式,保留纯文本 content = this.stripMarkdown(content); // 移除HTML标签(Zulip可能返回HTML格式的内容) content = this.stripHtml(content); // 限制消息长度 const maxLength = 200; if (content.length > maxLength) { content = content.substring(0, maxLength - 3) + '...'; } // 3. 生成游戏协议消息(按guide.md格式) const gameMessage: GameMessage = { t: 'chat_render', from: senderName, txt: content, bubble: true, // 默认显示气泡 timestamp: zulipMessage.timestamp, streamName: streamName, topic: zulipMessage.subject, }; this.logger.debug('消息格式转换完成', { operation: 'convertMessageFormat', messageId: zulipMessage.id, originalLength: zulipMessage.content.length, convertedLength: content.length, senderName, }); return gameMessage; } catch (error) { const err = error as Error; this.logger.error('消息格式转换失败', { operation: 'convertMessageFormat', messageId: zulipMessage.id, error: err.message, timestamp: new Date().toISOString(), }, err.stack); // 返回默认消息 return { t: 'chat_render', from: 'Unknown', txt: '消息格式转换失败', bubble: true, }; } } /** * 消息分发机制 * * 功能描述: * 通过WebSocket将消息发送给目标客户端 * * 业务逻辑: * 1. 检查消息分发器是否已设置 * 2. 遍历目标玩家列表 * 3. 向每个玩家发送消息 * 4. 记录分发结果 * * @param gameMessage 游戏协议消息 * @param targetPlayers 目标玩家Socket ID列表 * @returns Promise */ async distributeMessage(gameMessage: GameMessage, targetPlayers: string[]): Promise { this.logger.debug('开始消息分发', { operation: 'distributeMessage', targetPlayerCount: targetPlayers.length, messageFrom: gameMessage.from, timestamp: new Date().toISOString(), }); try { // 检查消息分发器是否已设置 if (!this.messageDistributor) { this.logger.warn('消息分发器未设置,无法分发消息', { operation: 'distributeMessage', targetPlayerCount: targetPlayers.length, }); return; } // 向每个目标玩家发送消息 let successCount = 0; let failCount = 0; for (const socketId of targetPlayers) { try { this.messageDistributor.sendChatRender( socketId, gameMessage.from, gameMessage.txt, gameMessage.bubble ); successCount++; this.logger.debug('消息已发送给玩家', { operation: 'distributeMessage', socketId, from: gameMessage.from, }); } catch (sendError) { failCount++; const err = sendError as Error; this.logger.warn('发送消息给玩家失败', { operation: 'distributeMessage', socketId, error: err.message, }); } } this.logger.log(`消息分发完成,目标玩家: ${targetPlayers.length}`); } catch (error) { const err = error as Error; this.logger.error('消息分发失败', { operation: 'distributeMessage', targetPlayerCount: targetPlayers.length, error: err.message, timestamp: new Date().toISOString(), }, err.stack); } } /** * 向指定地图广播消息 * * 功能描述: * 向指定地图区域内的所有在线玩家广播消息 * * @param mapId 地图ID * @param gameMessage 游戏协议消息 * @returns Promise */ async broadcastToMap(mapId: string, gameMessage: GameMessage): Promise { this.logger.debug('向地图广播消息', { operation: 'broadcastToMap', mapId, messageFrom: gameMessage.from, timestamp: new Date().toISOString(), }); try { if (!this.messageDistributor) { this.logger.warn('消息分发器未设置,无法广播消息', { operation: 'broadcastToMap', mapId, }); return; } await this.messageDistributor.broadcastToMap(mapId, 'chat_render', gameMessage); this.logger.log(`地图广播完成: ${mapId}`); } catch (error) { const err = error as Error; this.logger.error('地图广播失败', { operation: 'broadcastToMap', mapId, error: err.message, }, err.stack); } } /** * 移除Markdown格式 * * @param content 包含Markdown的内容 * @returns 纯文本内容 * @private */ private stripMarkdown(content: string): string { return content .replace(/\*\*(.*?)\*\*/g, '$1') // 粗体 **text** .replace(/\*(.*?)\*/g, '$1') // 斜体 *text* .replace(/__(.*?)__/g, '$1') // 粗体 __text__ .replace(/_(.*?)_/g, '$1') // 斜体 _text_ .replace(/~~(.*?)~~/g, '$1') // 删除线 ~~text~~ .replace(/`{3}[\s\S]*?`{3}/g, '[代码块]') // 代码块 .replace(/`(.*?)`/g, '$1') // 行内代码 `code` .replace(/\[(.*?)\]\(.*?\)/g, '$1') // 链接 [text](url) .replace(/!\[(.*?)\]\(.*?\)/g, '[图片]') // 图片 ![alt](url) .replace(/^#+\s*/gm, '') // 标题 # ## ### .replace(/^\s*[-*+]\s*/gm, '• ') // 无序列表 .replace(/^\s*\d+\.\s*/gm, '') // 有序列表 .replace(/^\s*>\s*/gm, '') // 引用 .replace(/---+/g, '') // 分隔线 .replace(/\n{3,}/g, '\n\n') // 多余空行 .trim(); } /** * 移除HTML标签 * * @param content 包含HTML的内容 * @returns 纯文本内容 * @private */ private stripHtml(content: string): string { return content .replace(/<[^>]*>/g, '') // 移除所有HTML标签 .replace(/ /g, ' ') // 替换HTML空格 .replace(/</g, '<') // 替换HTML实体 .replace(/>/g, '>') .replace(/&/g, '&') .replace(/"/g, '"') .replace(/'/g, "'") .trim(); } /** * 判断是否为队列错误 * * @param error 错误对象 * @returns boolean 是否为队列错误 * @private */ private isQueueError(error: any): boolean { if (!error) return false; const message = error.message || ''; // 检查常见的队列错误 return ( message.includes('BAD_EVENT_QUEUE_ID') || message.includes('queue does not exist') || message.includes('Invalid queue id') ); } /** * 获取事件处理统计信息 * * @returns EventProcessingStats 事件处理统计信息 */ getProcessingStats(): EventProcessingStats { const activeQueues = Array.from(this.eventQueues.entries()) .filter(([, info]) => info.isActive); return { isActive: this.processingActive, activeQueues: activeQueues.length, totalQueues: this.eventQueues.size, queueIds: Array.from(this.eventQueues.keys()), processedEvents: this.processedEvents, processedMessages: this.processedMessages, lastEventTime: this.lastEventTime || undefined, }; } /** * 重置统计信息 */ resetStats(): void { this.processedEvents = 0; this.processedMessages = 0; this.lastEventTime = null; this.logger.log('事件处理统计已重置'); } /** * 重新激活被禁用的队列 * * @param queueId 队列ID * @returns boolean 是否成功激活 */ reactivateQueue(queueId: string): boolean { const queueInfo = this.eventQueues.get(queueId); if (queueInfo) { queueInfo.isActive = true; this.logger.log(`事件队列已重新激活: ${queueId}`); return true; } return false; } /** * 手动处理单个消息事件(用于测试) * * @param message Zulip消息 * @param senderUserId 发送者用户ID * @returns Promise<{success: boolean, targetCount: number}> */ async processMessageManually( message: ZulipMessage, senderUserId: string ): Promise<{ success: boolean; targetCount: number }> { try { const streamName = this.getStreamName(message); if (!streamName) { return { success: false, targetCount: 0 }; } const targetPlayers = await this.determineTargetPlayers(message, streamName, senderUserId); if (targetPlayers.length === 0) { return { success: true, targetCount: 0 }; } const gameMessage = await this.convertMessageFormat(message, streamName); await this.distributeMessage(gameMessage, targetPlayers); return { success: true, targetCount: targetPlayers.length }; } catch (error) { const err = error as Error; this.logger.error('手动处理消息失败', { operation: 'processMessageManually', messageId: message.id, error: err.message, }); return { success: false, targetCount: 0 }; } } }