Files
whale-town-end/src/business/zulip/services/zulip_event_processor.service.ts
moyin ed04b8c92d docs(zulip): 完善Zulip业务模块功能文档
范围: src/business/zulip/README.md
- 补充对外提供的接口章节(14个公共方法)
- 添加使用的项目内部依赖说明(7个依赖)
- 完善核心特性描述(5个特性)
- 添加潜在风险评估(4个风险及缓解措施)
- 优化文档结构和内容完整性
2026-01-15 10:53:04 +08:00

1034 lines
28 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
/**
* Zulip事件处理服务
*
* 功能描述:
* - 实现事件队列轮询机制
* - 处理Zulip消息事件和格式转换
* - 实现空间过滤和消息分发
* - 支持区域广播功能
*
* 职责分离:
* - 事件轮询管理Zulip事件队列的轮询和处理
* - 消息转换将Zulip消息转换为游戏协议格式
* - 空间过滤:根据地图确定消息接收者
* - 消息分发通过WebSocket向目标玩家发送消息
*
* 主要方法:
* - startEventProcessing(): 启动事件处理循环
* - processMessageEvent(): 处理Zulip消息事件
* - convertMessageFormat(): 消息格式转换
* - distributeMessage(): 消息分发机制
* - determineTargetPlayers(): 空间过滤确定目标玩家
*
* 使用场景:
* - 后台异步处理Zulip事件
* - 消息格式转换和路由
* - 向游戏客户端分发消息
*
* 依赖模块:
* - ISessionQueryService: 会话查询接口(通过 Core 层接口解耦)
* - ConfigManagerService: 配置管理服务
* - ZulipClientPoolService: Zulip客户端池服务
* - AppLoggerService: 日志记录服务
*
* 最近修改:
* - 2026-01-14: 代码质量优化 - 移除未使用的IGameSession导入 (修改者: moyin)
* - 2026-01-14: 代码规范优化 - 完善文件头注释和职责分离描述 (修改者: moyin)
* - 2025-12-25: 功能新增 - 初始创建Zulip事件处理服务 (修改者: angjustinl)
*
* @author angjustinl
* @version 1.1.2
* @since 2025-12-25
* @lastModified 2026-01-14
*/
import { Injectable, OnModuleDestroy, Inject, Logger } from '@nestjs/common';
import {
ISessionQueryService,
SESSION_QUERY_SERVICE,
} from '../../../core/session_core/session_core.interfaces';
import { IZulipConfigService, IZulipClientPoolService } from '../../../core/zulip_core/zulip_core.interfaces';
/**
* Zulip消息接口
*/
export interface ZulipMessage {
id: number; // 消息ID
sender_email: string; // 发送者邮箱
sender_full_name: string; // 发送者全名
content: string; // 消息内容
stream_id: number; // Stream ID
subject: string; // Topic名称
timestamp: number; // 时间戳
display_recipient?: string | any[]; // Stream名称或私信接收者
type?: string; // 消息类型 (stream/private)
}
/**
* Zulip事件接口
*/
export interface ZulipEvent {
type: string; // 事件类型
message?: ZulipMessage; // 消息内容仅message事件
queue_id?: string; // 队列ID
id?: number; // 事件ID
}
/**
* 游戏消息接口 - 按guide.md格式
*/
export interface GameMessage {
t: 'chat_render';
from: string;
txt: string;
bubble: boolean;
timestamp?: number; // 可选时间戳
streamName?: string; // 可选Stream名称
topic?: string; // 可选Topic名称
}
/**
* 消息分发回调接口
*/
export interface MessageDistributor {
sendChatRender(socketId: string, from: string, txt: string, bubble: boolean): void;
broadcastToMap(mapId: string, event: string, data: any): Promise<void>;
}
/**
* 事件处理统计信息接口
*/
export interface EventProcessingStats {
isActive: boolean;
activeQueues: number;
totalQueues: number;
queueIds: string[];
processedEvents: number;
processedMessages: number;
lastEventTime?: Date;
}
/**
* Zulip事件处理服务类
*
* 职责:
* - 处理从Zulip接收的事件队列消息
* - 将Zulip消息转换为游戏协议格式
* - 管理事件队列的生命周期
* - 提供消息分发和路由功能
*
* 主要方法:
* - processEvents(): 处理Zulip事件队列
* - processMessage(): 处理单个消息事件
* - startProcessing(): 启动事件处理
* - stopProcessing(): 停止事件处理
* - registerQueue(): 注册新的事件队列
*
* 使用场景:
* - 接收Zulip服务器推送的消息
* - 将Zulip消息转发给游戏客户端
* - 管理多用户的事件队列
* - 消息格式转换和过滤
*/
@Injectable()
export class ZulipEventProcessorService implements OnModuleDestroy {
private readonly logger = new Logger(ZulipEventProcessorService.name);
private processingActive = false;
private eventQueues = new Map<string, { userId: string; isActive: boolean; lastEventId: number }>();
private messageDistributor: MessageDistributor | null = null;
private processedEvents = 0;
private processedMessages = 0;
private lastEventTime: Date | null = null;
private pollingInterval: NodeJS.Timeout | null = null;
private readonly POLLING_INTERVAL_MS = 2000; // 2秒轮询间隔
private readonly MAX_EVENTS_PER_POLL = 100;
constructor(
@Inject(SESSION_QUERY_SERVICE)
private readonly sessionManager: ISessionQueryService,
@Inject('ZULIP_CONFIG_SERVICE')
private readonly configManager: IZulipConfigService,
@Inject('ZULIP_CLIENT_POOL_SERVICE')
private readonly clientPool: IZulipClientPoolService,
) {
this.logger.log('ZulipEventProcessorService初始化完成');
}
/**
* 模块销毁时停止事件处理
*/
async onModuleDestroy(): Promise<void> {
this.logger.log('ZulipEventProcessorService模块销毁停止事件处理');
await this.stopEventProcessing();
}
/**
* 设置消息分发器
*
* 功能描述:
* 设置用于向游戏客户端发送消息的分发器接口
*
* @param distributor 消息分发器实例
*/
setMessageDistributor(distributor: MessageDistributor): void {
this.messageDistributor = distributor;
this.logger.log('消息分发器已设置');
}
/**
* 启动事件处理循环
*
* 功能描述:
* 启动后台事件处理循环监听所有活跃的Zulip事件队列
*
* 业务逻辑:
* 1. 初始化事件处理状态
* 2. 启动轮询循环
* 3. 处理接收到的事件
* 4. 错误处理和重连机制
*
* @returns Promise<void>
*/
async startEventProcessing(): Promise<void> {
if (this.processingActive) {
this.logger.warn('事件处理已在运行', {
operation: 'startEventProcessing',
});
return;
}
this.processingActive = true;
this.logger.log('启动Zulip事件处理');
try {
// 启动定时轮询
this.pollingInterval = setInterval(
() => this.eventProcessingLoop(),
this.POLLING_INTERVAL_MS
);
// 立即执行一次
await this.eventProcessingLoop();
this.logger.log('事件处理循环已启动');
} catch (error) {
const err = error as Error;
this.logger.error('启动事件处理失败', {
operation: 'startEventProcessing',
error: err.message,
timestamp: new Date().toISOString(),
}, err.stack);
this.processingActive = false;
}
}
/**
* 停止事件处理循环
*
* @returns Promise<void>
*/
async stopEventProcessing(): Promise<void> {
this.logger.log('停止Zulip事件处理');
this.processingActive = false;
// 清除定时器
if (this.pollingInterval) {
clearInterval(this.pollingInterval);
this.pollingInterval = null;
}
this.eventQueues.clear();
this.logger.log('事件处理已停止');
}
/**
* 注册事件队列
*
* 功能描述:
* 将新的事件队列添加到处理列表中
*
* @param queueId 事件队列ID
* @param userId 用户ID
* @param lastEventId 最后处理的事件ID默认-1
* @returns Promise<void>
*/
async registerEventQueue(queueId: string, userId: string, lastEventId: number = -1): Promise<void> {
this.logger.log(`注册事件队列: ${queueId}`);
this.eventQueues.set(queueId, {
userId,
isActive: true,
lastEventId,
});
}
/**
* 注销事件队列
*
* @param queueId 事件队列ID
* @returns Promise<void>
*/
async unregisterEventQueue(queueId: string): Promise<void> {
this.logger.log(`注销事件队列: ${queueId}`);
this.eventQueues.delete(queueId);
}
/**
* 事件处理循环
*
* 功能描述:
* 轮询所有注册的事件队列,处理接收到的事件
*
* @private
*/
private async eventProcessingLoop(): Promise<void> {
if (!this.processingActive) {
return;
}
try {
// 获取所有活跃的事件队列
const activeQueues = Array.from(this.eventQueues.entries())
.filter(([, info]) => info.isActive);
if (activeQueues.length === 0) {
return;
}
// 并发处理所有队列
await Promise.all(
activeQueues.map(([queueId, info]) =>
this.pollEventQueue(queueId, info.userId, info.lastEventId)
)
);
} catch (error) {
const err = error as Error;
this.logger.error('事件处理循环异常', {
operation: 'eventProcessingLoop',
error: err.message,
timestamp: new Date().toISOString(),
}, err.stack);
}
}
/**
* 轮询单个事件队列
*
* 功能描述:
* 从Zulip服务器获取指定队列的新事件并处理
*
* @param queueId 事件队列ID
* @param userId 用户ID
* @param lastEventId 最后处理的事件ID
* @private
*/
private async pollEventQueue(queueId: string, userId: string, lastEventId: number): Promise<void> {
try {
// 获取用户的Zulip客户端
const client = await this.clientPool.getUserClient(userId);
if (!client) {
this.logger.debug('用户Zulip客户端不存在跳过轮询', {
operation: 'pollEventQueue',
queueId,
userId,
});
return;
}
// 调用Zulip API获取事件
// 注意:这里使用非阻塞模式,避免长时间等待
const events = await this.fetchEventsFromClient(client, queueId, lastEventId);
if (!events || events.length === 0) {
return;
}
// 处理每个事件
for (const event of events) {
await this.processEvent(event, userId);
// 更新最后处理的事件ID
if (event.id !== undefined) {
const queueInfo = this.eventQueues.get(queueId);
if (queueInfo) {
queueInfo.lastEventId = event.id;
}
}
}
this.processedEvents += events.length;
this.lastEventTime = new Date();
} catch (error) {
const err = error as Error;
this.logger.error('轮询事件队列失败', {
operation: 'pollEventQueue',
queueId,
userId,
error: err.message,
timestamp: new Date().toISOString(),
}, err.stack);
// 如果队列出现持续错误,暂时禁用
if (this.isQueueError(error)) {
const queueInfo = this.eventQueues.get(queueId);
if (queueInfo) {
queueInfo.isActive = false;
this.logger.warn('事件队列已暂时禁用', {
operation: 'pollEventQueue',
queueId,
userId,
});
}
}
}
}
/**
* 从Zulip客户端获取事件
*
* @param client Zulip客户端实例
* @param queueId 队列ID
* @param lastEventId 最后事件ID
* @returns Promise<ZulipEvent[]> 事件列表
* @private
*/
private async fetchEventsFromClient(
client: any,
queueId: string,
lastEventId: number
): Promise<ZulipEvent[]> {
try {
// 检查客户端是否有zulipClient实例
if (!client.zulipClient) {
return [];
}
// 调用zulip-js的events.retrieve方法
const result = await client.zulipClient.events.retrieve({
queue_id: queueId,
last_event_id: lastEventId,
dont_block: true, // 非阻塞模式
});
if (result.result === 'success' && result.events) {
return result.events as ZulipEvent[];
}
return [];
} catch (error) {
const err = error as Error;
this.logger.debug('获取事件失败', {
operation: 'fetchEventsFromClient',
queueId,
error: err.message,
});
return [];
}
}
/**
* 处理单个事件
*
* 功能描述:
* 根据事件类型分发到对应的处理方法
*
* @param event Zulip事件
* @param userId 用户ID
* @private
*/
private async processEvent(event: ZulipEvent, userId: string): Promise<void> {
this.logger.debug('处理Zulip事件', {
operation: 'processEvent',
eventType: event.type,
eventId: event.id,
userId,
timestamp: new Date().toISOString(),
});
try {
switch (event.type) {
case 'message':
if (event.message) {
await this.processMessageEvent(event, userId);
}
break;
case 'heartbeat':
// 心跳事件,忽略
break;
default:
this.logger.debug('忽略未处理的事件类型', {
operation: 'processEvent',
eventType: event.type,
eventId: event.id,
});
}
} catch (error) {
const err = error as Error;
this.logger.error('处理事件失败', {
operation: 'processEvent',
eventType: event.type,
eventId: event.id,
userId,
error: err.message,
timestamp: new Date().toISOString(),
}, err.stack);
}
}
/**
* 处理Zulip消息事件
*
* 功能描述:
* 处理从Zulip接收的消息事件转换格式后分发给相关的游戏客户端
*
* 业务逻辑:
* 1. 解析消息内容和元数据
* 2. 确定目标玩家(空间过滤)
* 3. 转换消息格式
* 4. 分发给游戏客户端
*
* @param event Zulip消息事件
* @param senderUserId 发送者用户ID用于排除自己发送的消息
* @returns Promise<void>
*/
async processMessageEvent(event: ZulipEvent, senderUserId: string): Promise<void> {
const message = event.message;
if (!message) {
this.logger.warn('消息事件缺少消息内容', {
operation: 'processMessageEvent',
eventId: event.id,
});
return;
}
this.logger.log(`处理Zulip消息事件: ${message.id}`);
try {
// 1. 获取Stream名称
const streamName = this.getStreamName(message);
if (!streamName) {
this.logger.debug('无法确定Stream名称跳过消息', {
operation: 'processMessageEvent',
messageId: message.id,
});
return;
}
// 2. 确定目标玩家(空间过滤)
const targetPlayers = await this.determineTargetPlayers(message, streamName, senderUserId);
if (targetPlayers.length === 0) {
this.logger.debug('没有目标玩家,跳过消息分发', {
operation: 'processMessageEvent',
messageId: message.id,
streamName,
});
return;
}
// 3. 转换消息格式
const gameMessage = await this.convertMessageFormat(message, streamName);
// 4. 分发消息给目标玩家
await this.distributeMessage(gameMessage, targetPlayers);
this.processedMessages++;
this.logger.log(`Zulip消息处理完成: ${message.id}`);
} catch (error) {
const err = error as Error;
this.logger.error('处理Zulip消息事件失败', {
operation: 'processMessageEvent',
messageId: message.id,
error: err.message,
timestamp: new Date().toISOString(),
}, err.stack);
}
}
/**
* 获取消息的Stream名称
*
* @param message Zulip消息
* @returns string | null Stream名称
* @private
*/
private getStreamName(message: ZulipMessage): string | null {
// 检查消息类型
if (message.type === 'private') {
// 私信消息,暂不处理
return null;
}
// 从display_recipient获取Stream名称
if (typeof message.display_recipient === 'string') {
return message.display_recipient;
}
// 如果display_recipient是数组私信返回null
if (Array.isArray(message.display_recipient)) {
return null;
}
return null;
}
/**
* 确定目标玩家
*
* 功能描述:
* 根据消息的Stream确定应该接收消息的玩家空间过滤
*
* 业务逻辑:
* 1. 根据Stream名称确定对应的地图
* 2. 从SessionManager获取该地图的所有玩家
* 3. 排除消息发送者(避免收到自己的消息)
*
* @param message Zulip消息
* @param streamName Stream名称
* @param senderUserId 发送者用户ID
* @returns Promise<string[]> 目标玩家Socket ID列表
*/
async determineTargetPlayers(
message: ZulipMessage,
streamName: string,
senderUserId: string
): Promise<string[]> {
try {
// 1. 根据Stream名称确定对应的地图
const mapId = this.configManager.getMapIdByStream(streamName);
if (!mapId) {
this.logger.debug('未找到Stream对应的地图', {
operation: 'determineTargetPlayers',
streamName,
messageId: message.id,
});
return [];
}
// 2. 从SessionManager获取该地图的所有玩家Socket ID
const socketIds = await this.sessionManager.getSocketsInMap(mapId);
if (socketIds.length === 0) {
this.logger.debug('地图中没有在线玩家', {
operation: 'determineTargetPlayers',
mapId,
streamName,
});
return [];
}
// 3. 排除消息发送者
const filteredSocketIds: string[] = [];
for (const socketId of socketIds) {
const session = await this.sessionManager.getSession(socketId);
if (session && session.userId !== senderUserId) {
filteredSocketIds.push(socketId);
}
}
this.logger.debug('确定目标玩家完成', {
operation: 'determineTargetPlayers',
mapId,
streamName,
totalPlayers: socketIds.length,
targetPlayers: filteredSocketIds.length,
});
return filteredSocketIds;
} catch (error) {
const err = error as Error;
this.logger.error('确定目标玩家失败', {
operation: 'determineTargetPlayers',
messageId: message.id,
streamName,
error: err.message,
});
return [];
}
}
/**
* 消息格式转换
*
* 功能描述:
* 将Zulip消息转换为游戏协议格式按guide.md格式
*
* 业务逻辑:
* 1. 提取发送者信息
* 2. 处理消息内容Markdown转换等
* 3. 生成游戏协议消息
* 4. 确保包含所有必需信息(发送者、内容、时间戳)
*
* @param zulipMessage Zulip消息对象
* @param streamName Stream名称可选
* @returns Promise<GameMessage> 游戏协议消息
*/
async convertMessageFormat(zulipMessage: ZulipMessage, streamName?: string): Promise<GameMessage> {
this.logger.debug('开始消息格式转换', {
operation: 'convertMessageFormat',
messageId: zulipMessage.id,
sender: zulipMessage.sender_email,
timestamp: new Date().toISOString(),
});
try {
// 1. 提取发送者名称
let senderName = zulipMessage.sender_full_name;
if (!senderName || senderName.trim().length === 0) {
// 从邮箱提取用户名
senderName = zulipMessage.sender_email.split('@')[0];
}
// 2. 处理消息内容
let content = zulipMessage.content;
// 移除Markdown格式保留纯文本
content = this.stripMarkdown(content);
// 移除HTML标签Zulip可能返回HTML格式的内容
content = this.stripHtml(content);
// 限制消息长度
const maxLength = 200;
if (content.length > maxLength) {
content = content.substring(0, maxLength - 3) + '...';
}
// 3. 生成游戏协议消息按guide.md格式
const gameMessage: GameMessage = {
t: 'chat_render',
from: senderName,
txt: content,
bubble: true, // 默认显示气泡
timestamp: zulipMessage.timestamp,
streamName: streamName,
topic: zulipMessage.subject,
};
this.logger.debug('消息格式转换完成', {
operation: 'convertMessageFormat',
messageId: zulipMessage.id,
originalLength: zulipMessage.content.length,
convertedLength: content.length,
senderName,
});
return gameMessage;
} catch (error) {
const err = error as Error;
this.logger.error('消息格式转换失败', {
operation: 'convertMessageFormat',
messageId: zulipMessage.id,
error: err.message,
timestamp: new Date().toISOString(),
}, err.stack);
// 返回默认消息
return {
t: 'chat_render',
from: 'Unknown',
txt: '消息格式转换失败',
bubble: true,
};
}
}
/**
* 消息分发机制
*
* 功能描述:
* 通过WebSocket将消息发送给目标客户端
*
* 业务逻辑:
* 1. 检查消息分发器是否已设置
* 2. 遍历目标玩家列表
* 3. 向每个玩家发送消息
* 4. 记录分发结果
*
* @param gameMessage 游戏协议消息
* @param targetPlayers 目标玩家Socket ID列表
* @returns Promise<void>
*/
async distributeMessage(gameMessage: GameMessage, targetPlayers: string[]): Promise<void> {
this.logger.debug('开始消息分发', {
operation: 'distributeMessage',
targetPlayerCount: targetPlayers.length,
messageFrom: gameMessage.from,
timestamp: new Date().toISOString(),
});
try {
// 检查消息分发器是否已设置
if (!this.messageDistributor) {
this.logger.warn('消息分发器未设置,无法分发消息', {
operation: 'distributeMessage',
targetPlayerCount: targetPlayers.length,
});
return;
}
// 向每个目标玩家发送消息
let successCount = 0;
let failCount = 0;
for (const socketId of targetPlayers) {
try {
this.messageDistributor.sendChatRender(
socketId,
gameMessage.from,
gameMessage.txt,
gameMessage.bubble
);
successCount++;
this.logger.debug('消息已发送给玩家', {
operation: 'distributeMessage',
socketId,
from: gameMessage.from,
});
} catch (sendError) {
failCount++;
const err = sendError as Error;
this.logger.warn('发送消息给玩家失败', {
operation: 'distributeMessage',
socketId,
error: err.message,
});
}
}
this.logger.log(`消息分发完成,目标玩家: ${targetPlayers.length}`);
} catch (error) {
const err = error as Error;
this.logger.error('消息分发失败', {
operation: 'distributeMessage',
targetPlayerCount: targetPlayers.length,
error: err.message,
timestamp: new Date().toISOString(),
}, err.stack);
}
}
/**
* 向指定地图广播消息
*
* 功能描述:
* 向指定地图区域内的所有在线玩家广播消息
*
* @param mapId 地图ID
* @param gameMessage 游戏协议消息
* @returns Promise<void>
*/
async broadcastToMap(mapId: string, gameMessage: GameMessage): Promise<void> {
this.logger.debug('向地图广播消息', {
operation: 'broadcastToMap',
mapId,
messageFrom: gameMessage.from,
timestamp: new Date().toISOString(),
});
try {
if (!this.messageDistributor) {
this.logger.warn('消息分发器未设置,无法广播消息', {
operation: 'broadcastToMap',
mapId,
});
return;
}
await this.messageDistributor.broadcastToMap(mapId, 'chat_render', gameMessage);
this.logger.log(`地图广播完成: ${mapId}`);
} catch (error) {
const err = error as Error;
this.logger.error('地图广播失败', {
operation: 'broadcastToMap',
mapId,
error: err.message,
}, err.stack);
}
}
/**
* 移除Markdown格式
*
* @param content 包含Markdown的内容
* @returns 纯文本内容
* @private
*/
private stripMarkdown(content: string): string {
return content
.replace(/\*\*(.*?)\*\*/g, '$1') // 粗体 **text**
.replace(/\*(.*?)\*/g, '$1') // 斜体 *text*
.replace(/__(.*?)__/g, '$1') // 粗体 __text__
.replace(/_(.*?)_/g, '$1') // 斜体 _text_
.replace(/~~(.*?)~~/g, '$1') // 删除线 ~~text~~
.replace(/`{3}[\s\S]*?`{3}/g, '[代码块]') // 代码块
.replace(/`(.*?)`/g, '$1') // 行内代码 `code`
.replace(/\[(.*?)\]\(.*?\)/g, '$1') // 链接 [text](url)
.replace(/!\[(.*?)\]\(.*?\)/g, '[图片]') // 图片 ![alt](url)
.replace(/^#+\s*/gm, '') // 标题 # ## ###
.replace(/^\s*[-*+]\s*/gm, '• ') // 无序列表
.replace(/^\s*\d+\.\s*/gm, '') // 有序列表
.replace(/^\s*>\s*/gm, '') // 引用
.replace(/---+/g, '') // 分隔线
.replace(/\n{3,}/g, '\n\n') // 多余空行
.trim();
}
/**
* 移除HTML标签
*
* @param content 包含HTML的内容
* @returns 纯文本内容
* @private
*/
private stripHtml(content: string): string {
return content
.replace(/<[^>]*>/g, '') // 移除所有HTML标签
.replace(/&nbsp;/g, ' ') // 替换HTML空格
.replace(/&lt;/g, '<') // 替换HTML实体
.replace(/&gt;/g, '>')
.replace(/&amp;/g, '&')
.replace(/&quot;/g, '"')
.replace(/&#39;/g, "'")
.trim();
}
/**
* 判断是否为队列错误
*
* @param error 错误对象
* @returns boolean 是否为队列错误
* @private
*/
private isQueueError(error: any): boolean {
if (!error) return false;
const message = error.message || '';
// 检查常见的队列错误
return (
message.includes('BAD_EVENT_QUEUE_ID') ||
message.includes('queue does not exist') ||
message.includes('Invalid queue id')
);
}
/**
* 获取事件处理统计信息
*
* @returns EventProcessingStats 事件处理统计信息
*/
getProcessingStats(): EventProcessingStats {
const activeQueues = Array.from(this.eventQueues.entries())
.filter(([, info]) => info.isActive);
return {
isActive: this.processingActive,
activeQueues: activeQueues.length,
totalQueues: this.eventQueues.size,
queueIds: Array.from(this.eventQueues.keys()),
processedEvents: this.processedEvents,
processedMessages: this.processedMessages,
lastEventTime: this.lastEventTime || undefined,
};
}
/**
* 重置统计信息
*/
resetStats(): void {
this.processedEvents = 0;
this.processedMessages = 0;
this.lastEventTime = null;
this.logger.log('事件处理统计已重置');
}
/**
* 重新激活被禁用的队列
*
* @param queueId 队列ID
* @returns boolean 是否成功激活
*/
reactivateQueue(queueId: string): boolean {
const queueInfo = this.eventQueues.get(queueId);
if (queueInfo) {
queueInfo.isActive = true;
this.logger.log(`事件队列已重新激活: ${queueId}`);
return true;
}
return false;
}
/**
* 手动处理单个消息事件(用于测试)
*
* @param message Zulip消息
* @param senderUserId 发送者用户ID
* @returns Promise<{success: boolean, targetCount: number}>
*/
async processMessageManually(
message: ZulipMessage,
senderUserId: string
): Promise<{ success: boolean; targetCount: number }> {
try {
const streamName = this.getStreamName(message);
if (!streamName) {
return { success: false, targetCount: 0 };
}
const targetPlayers = await this.determineTargetPlayers(message, streamName, senderUserId);
if (targetPlayers.length === 0) {
return { success: true, targetCount: 0 };
}
const gameMessage = await this.convertMessageFormat(message, streamName);
await this.distributeMessage(gameMessage, targetPlayers);
return { success: true, targetCount: targetPlayers.length };
} catch (error) {
const err = error as Error;
this.logger.error('手动处理消息失败', {
operation: 'processMessageManually',
messageId: message.id,
error: err.message,
});
return { success: false, targetCount: 0 };
}
}
}