Files
whale-town-end/src/business/zulip/services/zulip-client-pool.service.ts
angjustinl 3dd5f23d79 fix(zulip): Fix e2e test errors and pdate author attribution across all Zulip integration files
- Standardize author attribution across 27 files in the Zulip integration module
- Maintain consistent code documentation and authorship tracking
2025-12-25 23:37:26 +08:00

656 lines
17 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
/**
* Zulip客户端池服务
*
* 功能描述:
* - 为每个用户维护专用的Zulip客户端实例
* - 管理Zulip API Key和事件队列注册
* - 提供客户端获取、创建和销毁接口
*
* 主要方法:
* - createUserClient(): 为用户创建专用Zulip客户端
* - getUserClient(): 获取用户的Zulip客户端
* - registerEventQueue(): 注册事件队列
* - sendMessage(): 发送消息到指定Stream/Topic
* - destroyUserClient(): 注销事件队列并清理客户端
*
* 使用场景:
* - 用户登录时创建Zulip客户端
* - 消息发送时获取用户客户端
* - 用户登出时清理客户端资源
*
* 依赖模块:
* - ZulipClientService: Zulip客户端核心服务
* - AppLoggerService: 日志记录服务
*
* @author angjustinl
* @version 1.0.0
* @since 2025-12-25
*/
import { Injectable, Logger, OnModuleDestroy } from '@nestjs/common';
import {
ZulipClientService,
ZulipClientConfig,
ZulipClientInstance,
SendMessageResult,
RegisterQueueResult,
GetEventsResult,
} from './zulip-client.service';
/**
* 用户客户端信息接口
*/
export interface UserClientInfo {
userId: string;
clientInstance: ZulipClientInstance;
eventPollingActive: boolean;
eventCallback?: (events: any[]) => void;
}
/**
* 客户端池统计信息接口
*/
export interface PoolStats {
totalClients: number;
activeClients: number;
clientsWithQueues: number;
clientIds: string[];
}
@Injectable()
export class ZulipClientPoolService implements OnModuleDestroy {
private readonly clientPool = new Map<string, UserClientInfo>();
private readonly pollingIntervals = new Map<string, NodeJS.Timeout>();
private readonly logger = new Logger(ZulipClientPoolService.name);
constructor(
private readonly zulipClientService: ZulipClientService,
) {
this.logger.log('ZulipClientPoolService初始化完成');
}
/**
* 模块销毁时清理所有客户端
*/
async onModuleDestroy(): Promise<void> {
this.logger.log('ZulipClientPoolService模块销毁开始清理所有客户端', {
operation: 'onModuleDestroy',
clientCount: this.clientPool.size,
timestamp: new Date().toISOString(),
});
// 停止所有轮询
for (const [userId, interval] of this.pollingIntervals) {
clearInterval(interval);
this.logger.debug('停止用户事件轮询', { userId });
}
this.pollingIntervals.clear();
// 销毁所有客户端
const destroyPromises = Array.from(this.clientPool.keys()).map(userId =>
this.destroyUserClient(userId)
);
await Promise.allSettled(destroyPromises);
this.logger.log('ZulipClientPoolService清理完成', {
operation: 'onModuleDestroy',
timestamp: new Date().toISOString(),
});
}
/**
* 为用户创建专用Zulip客户端
*
* 功能描述:
* 使用用户的Zulip API Key创建客户端实例并注册事件队列
*
* 业务逻辑:
* 1. 检查是否已存在客户端
* 2. 验证API Key的有效性
* 3. 创建zulip-js客户端实例
* 4. 向Zulip服务器注册事件队列
* 5. 将客户端实例存储到池中
* 6. 返回客户端实例
*
* @param userId 用户ID
* @param config Zulip客户端配置
* @returns Promise<ZulipClientInstance> 创建的Zulip客户端实例
*
* @throws Error 当API Key无效或创建失败时
*/
async createUserClient(userId: string, config: ZulipClientConfig): Promise<ZulipClientInstance> {
const startTime = Date.now();
this.logger.log('开始创建用户Zulip客户端', {
operation: 'createUserClient',
userId,
realm: config.realm,
timestamp: new Date().toISOString(),
});
try {
// 1. 检查是否已存在客户端
const existingInfo = this.clientPool.get(userId);
if (existingInfo && existingInfo.clientInstance.isValid) {
this.logger.log('用户Zulip客户端已存在返回现有实例', {
operation: 'createUserClient',
userId,
queueId: existingInfo.clientInstance.queueId,
});
// 更新最后活动时间
existingInfo.clientInstance.lastActivity = new Date();
return existingInfo.clientInstance;
}
// 2. 创建新的客户端实例
const clientInstance = await this.zulipClientService.createClient(userId, config);
// 3. 注册事件队列
const registerResult = await this.zulipClientService.registerQueue(clientInstance);
if (!registerResult.success) {
throw new Error(`事件队列注册失败: ${registerResult.error}`);
}
// 4. 存储到客户端池
const userClientInfo: UserClientInfo = {
userId,
clientInstance,
eventPollingActive: false,
};
this.clientPool.set(userId, userClientInfo);
const duration = Date.now() - startTime;
this.logger.log('用户Zulip客户端创建成功', {
operation: 'createUserClient',
userId,
queueId: clientInstance.queueId,
duration,
timestamp: new Date().toISOString(),
});
return clientInstance;
} catch (error) {
const err = error as Error;
const duration = Date.now() - startTime;
this.logger.error('创建用户Zulip客户端失败', {
operation: 'createUserClient',
userId,
error: err.message,
duration,
timestamp: new Date().toISOString(),
}, err.stack);
throw error;
}
}
/**
* 获取用户的Zulip客户端
*
* @param userId 用户ID
* @returns Promise<ZulipClientInstance | null> 用户的Zulip客户端实例不存在时返回null
*/
async getUserClient(userId: string): Promise<ZulipClientInstance | null> {
const userInfo = this.clientPool.get(userId);
if (userInfo && userInfo.clientInstance.isValid) {
// 更新最后活动时间
userInfo.clientInstance.lastActivity = new Date();
this.logger.debug('获取用户Zulip客户端', {
operation: 'getUserClient',
userId,
queueId: userInfo.clientInstance.queueId,
timestamp: new Date().toISOString(),
});
return userInfo.clientInstance;
}
this.logger.debug('用户Zulip客户端不存在或无效', {
operation: 'getUserClient',
userId,
exists: !!userInfo,
isValid: userInfo?.clientInstance.isValid,
timestamp: new Date().toISOString(),
});
return null;
}
/**
* 检查用户客户端是否存在
*
* @param userId 用户ID
* @returns boolean 客户端是否存在且有效
*/
hasUserClient(userId: string): boolean {
const userInfo = this.clientPool.get(userId);
return !!(userInfo && userInfo.clientInstance.isValid);
}
/**
* 注册事件队列
*
* 功能描述:
* 为用户的Zulip客户端注册事件队列
*
* @param userId 用户ID
* @returns Promise<RegisterQueueResult> 注册结果
*/
async registerEventQueue(userId: string): Promise<RegisterQueueResult> {
this.logger.log('注册用户Zulip事件队列', {
operation: 'registerEventQueue',
userId,
timestamp: new Date().toISOString(),
});
try {
const userInfo = this.clientPool.get(userId);
if (!userInfo || !userInfo.clientInstance.isValid) {
return {
success: false,
error: '用户Zulip客户端不存在或无效',
};
}
// 如果已有队列,先注销
if (userInfo.clientInstance.queueId) {
await this.zulipClientService.deregisterQueue(userInfo.clientInstance);
}
// 注册新队列
const result = await this.zulipClientService.registerQueue(userInfo.clientInstance);
this.logger.log('用户事件队列注册完成', {
operation: 'registerEventQueue',
userId,
success: result.success,
queueId: result.queueId,
timestamp: new Date().toISOString(),
});
return result;
} catch (error) {
const err = error as Error;
this.logger.error('注册用户事件队列失败', {
operation: 'registerEventQueue',
userId,
error: err.message,
timestamp: new Date().toISOString(),
}, err.stack);
return {
success: false,
error: err.message,
};
}
}
/**
* 注销事件队列
*
* @param userId 用户ID
* @returns Promise<boolean> 是否成功注销
*/
async deregisterEventQueue(userId: string): Promise<boolean> {
this.logger.log('注销用户Zulip事件队列', {
operation: 'deregisterEventQueue',
userId,
timestamp: new Date().toISOString(),
});
try {
const userInfo = this.clientPool.get(userId);
if (!userInfo) {
this.logger.log('用户客户端不存在,跳过注销', {
operation: 'deregisterEventQueue',
userId,
});
return true;
}
// 停止事件轮询
this.stopEventPolling(userId);
// 注销队列
const result = await this.zulipClientService.deregisterQueue(userInfo.clientInstance);
this.logger.log('用户事件队列注销完成', {
operation: 'deregisterEventQueue',
userId,
success: result,
timestamp: new Date().toISOString(),
});
return result;
} catch (error) {
const err = error as Error;
this.logger.error('注销用户事件队列失败', {
operation: 'deregisterEventQueue',
userId,
error: err.message,
timestamp: new Date().toISOString(),
}, err.stack);
return false;
}
}
/**
* 发送消息到指定Stream/Topic
*
* 功能描述:
* 使用用户的Zulip客户端发送消息到指定的Stream和Topic
*
* @param userId 用户ID
* @param stream 目标Stream名称
* @param topic 目标Topic名称
* @param content 消息内容
* @returns Promise<SendMessageResult> 发送结果
*/
async sendMessage(
userId: string,
stream: string,
topic: string,
content: string
): Promise<SendMessageResult> {
this.logger.log('发送消息到Zulip', {
operation: 'sendMessage',
userId,
stream,
topic,
contentLength: content.length,
timestamp: new Date().toISOString(),
});
try {
const userInfo = this.clientPool.get(userId);
if (!userInfo || !userInfo.clientInstance.isValid) {
return {
success: false,
error: '用户Zulip客户端不存在或无效',
};
}
const result = await this.zulipClientService.sendMessage(
userInfo.clientInstance,
stream,
topic,
content
);
this.logger.log('消息发送完成', {
operation: 'sendMessage',
userId,
stream,
topic,
success: result.success,
messageId: result.messageId,
timestamp: new Date().toISOString(),
});
return result;
} catch (error) {
const err = error as Error;
this.logger.error('发送消息失败', {
operation: 'sendMessage',
userId,
stream,
topic,
error: err.message,
timestamp: new Date().toISOString(),
}, err.stack);
return {
success: false,
error: err.message,
};
}
}
/**
* 开始事件轮询
*
* 功能描述:
* 启动异步监听器轮询Zulip事件队列获取新消息
*
* @param userId 用户ID
* @param callback 事件处理回调函数
* @param intervalMs 轮询间隔毫秒默认5000ms
*/
startEventPolling(
userId: string,
callback: (events: any[]) => void,
intervalMs: number = 5000
): void {
this.logger.log('开始用户事件轮询', {
operation: 'startEventPolling',
userId,
intervalMs,
timestamp: new Date().toISOString(),
});
const userInfo = this.clientPool.get(userId);
if (!userInfo || !userInfo.clientInstance.isValid) {
this.logger.warn('无法启动事件轮询:客户端不存在或无效', {
operation: 'startEventPolling',
userId,
});
return;
}
// 停止现有轮询
this.stopEventPolling(userId);
// 保存回调
userInfo.eventCallback = callback;
userInfo.eventPollingActive = true;
// 启动轮询
const pollEvents = async () => {
if (!userInfo.eventPollingActive) {
return;
}
try {
const result = await this.zulipClientService.getEvents(
userInfo.clientInstance,
true // 不阻塞
);
if (result.success && result.events && result.events.length > 0) {
this.logger.debug('收到Zulip事件', {
operation: 'pollEvents',
userId,
eventCount: result.events.length,
});
if (userInfo.eventCallback) {
userInfo.eventCallback(result.events);
}
}
} catch (error) {
const err = error as Error;
this.logger.error('事件轮询异常', {
operation: 'pollEvents',
userId,
error: err.message,
});
}
};
// 立即执行一次
pollEvents();
// 设置定时轮询
const interval = setInterval(pollEvents, intervalMs);
this.pollingIntervals.set(userId, interval);
this.logger.log('用户事件轮询已启动', {
operation: 'startEventPolling',
userId,
timestamp: new Date().toISOString(),
});
}
/**
* 停止事件轮询
*
* @param userId 用户ID
*/
stopEventPolling(userId: string): void {
const interval = this.pollingIntervals.get(userId);
if (interval) {
clearInterval(interval);
this.pollingIntervals.delete(userId);
this.logger.log('用户事件轮询已停止', {
operation: 'stopEventPolling',
userId,
timestamp: new Date().toISOString(),
});
}
const userInfo = this.clientPool.get(userId);
if (userInfo) {
userInfo.eventPollingActive = false;
userInfo.eventCallback = undefined;
}
}
/**
* 注销事件队列并清理客户端
*
* 功能描述:
* 注销用户的Zulip事件队列清理客户端实例和相关资源
*
* @param userId 用户ID
* @returns Promise<void>
*/
async destroyUserClient(userId: string): Promise<void> {
this.logger.log('开始销毁用户Zulip客户端', {
operation: 'destroyUserClient',
userId,
timestamp: new Date().toISOString(),
});
try {
// 1. 停止事件轮询
this.stopEventPolling(userId);
// 2. 获取客户端信息
const userInfo = this.clientPool.get(userId);
if (!userInfo) {
this.logger.log('用户Zulip客户端不存在跳过销毁', {
operation: 'destroyUserClient',
userId,
});
return;
}
// 3. 销毁客户端实例
await this.zulipClientService.destroyClient(userInfo.clientInstance);
// 4. 从池中移除
this.clientPool.delete(userId);
this.logger.log('用户Zulip客户端销毁成功', {
operation: 'destroyUserClient',
userId,
timestamp: new Date().toISOString(),
});
} catch (error) {
const err = error as Error;
this.logger.error('销毁用户Zulip客户端失败', {
operation: 'destroyUserClient',
userId,
error: err.message,
timestamp: new Date().toISOString(),
}, err.stack);
// 即使销毁失败也要从池中移除
this.clientPool.delete(userId);
}
}
/**
* 获取客户端池统计信息
*
* @returns PoolStats 客户端池统计信息
*/
getPoolStats(): PoolStats {
const now = new Date();
const fiveMinutesAgo = new Date(now.getTime() - 5 * 60 * 1000);
const clients = Array.from(this.clientPool.values());
const activeClients = clients.filter(
info => info.clientInstance.lastActivity > fiveMinutesAgo
);
const clientsWithQueues = clients.filter(
info => info.clientInstance.queueId !== undefined
);
return {
totalClients: this.clientPool.size,
activeClients: activeClients.length,
clientsWithQueues: clientsWithQueues.length,
clientIds: Array.from(this.clientPool.keys()),
};
}
/**
* 清理过期客户端
*
* 功能描述:
* 清理超过指定时间未活动的客户端
*
* @param maxIdleMinutes 最大空闲时间分钟默认30分钟
* @returns Promise<number> 清理的客户端数量
*/
async cleanupIdleClients(maxIdleMinutes: number = 30): Promise<number> {
this.logger.log('开始清理过期客户端', {
operation: 'cleanupIdleClients',
maxIdleMinutes,
totalClients: this.clientPool.size,
timestamp: new Date().toISOString(),
});
const now = new Date();
const cutoffTime = new Date(now.getTime() - maxIdleMinutes * 60 * 1000);
const expiredUserIds: string[] = [];
for (const [userId, userInfo] of this.clientPool) {
if (userInfo.clientInstance.lastActivity < cutoffTime) {
expiredUserIds.push(userId);
}
}
// 销毁过期客户端
for (const userId of expiredUserIds) {
await this.destroyUserClient(userId);
}
this.logger.log('过期客户端清理完成', {
operation: 'cleanupIdleClients',
cleanedCount: expiredUserIds.length,
remainingClients: this.clientPool.size,
timestamp: new Date().toISOString(),
});
return expiredUserIds.length;
}
}