forked from datawhale/whale-town-end
- 将技术实现服务从business层迁移到core层 - 创建src/core/zulip/核心服务模块,包含API客户端、连接池等技术服务 - 保留src/business/zulip/业务逻辑,专注游戏相关的业务规则 - 通过依赖注入实现业务层与核心层的解耦 - 更新模块导入关系,确保架构分层清晰 重构后的架构符合单一职责原则,提高了代码的可维护性和可测试性
354 lines
9.9 KiB
TypeScript
354 lines
9.9 KiB
TypeScript
/**
|
||
* Stream初始化服务
|
||
*
|
||
* 功能描述:
|
||
* - 在系统启动时检查并创建所有地图对应的Zulip Streams
|
||
* - 确保所有配置的Streams在Zulip服务器上存在
|
||
* - 提供Stream创建和验证功能
|
||
*
|
||
* 主要方法:
|
||
* - initializeStreams(): 初始化所有Streams
|
||
* - checkStreamExists(): 检查Stream是否存在
|
||
* - createStream(): 创建Stream
|
||
*
|
||
* 使用场景:
|
||
* - 系统启动时自动初始化
|
||
* - 配置更新后重新初始化
|
||
*
|
||
* @author angjustinl
|
||
* @version 1.0.0
|
||
* @since 2025-12-25
|
||
*/
|
||
|
||
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
|
||
import { ConfigManagerService } from './config_manager.service';
|
||
|
||
/**
|
||
* Stream初始化服务类
|
||
*
|
||
* 职责:
|
||
* - 系统启动时自动检查并创建Zulip Streams
|
||
* - 确保所有地图对应的Stream都存在
|
||
* - 验证Stream配置的完整性
|
||
* - 提供Stream初始化状态监控
|
||
*
|
||
* 主要方法:
|
||
* - onModuleInit(): 模块初始化时自动执行
|
||
* - initializeStreams(): 初始化所有必需的Streams
|
||
* - createStreamIfNotExists(): 检查并创建单个Stream
|
||
* - validateStreamConfig(): 验证Stream配置
|
||
* - getInitializationStatus(): 获取初始化状态
|
||
*
|
||
* 使用场景:
|
||
* - 系统启动时自动初始化Streams
|
||
* - 确保消息路由的目标Stream存在
|
||
* - 新增地图时自动创建对应Stream
|
||
* - 系统部署和配置验证
|
||
*/
|
||
@Injectable()
|
||
export class StreamInitializerService implements OnModuleInit {
|
||
private readonly logger = new Logger(StreamInitializerService.name);
|
||
private initializationComplete = false;
|
||
|
||
constructor(
|
||
private readonly configManager: ConfigManagerService,
|
||
) {
|
||
this.logger.log('StreamInitializerService初始化完成');
|
||
}
|
||
|
||
/**
|
||
* 模块初始化时自动执行
|
||
*/
|
||
async onModuleInit(): Promise<void> {
|
||
// 延迟5秒执行,确保其他服务已初始化
|
||
setTimeout(async () => {
|
||
await this.initializeStreams();
|
||
}, 5000);
|
||
}
|
||
|
||
/**
|
||
* 初始化所有Streams
|
||
*
|
||
* 功能描述:
|
||
* 检查配置中的所有Streams是否存在,不存在则创建
|
||
*
|
||
* @returns Promise<{success: boolean, created: string[], existing: string[], failed: string[]}>
|
||
*/
|
||
async initializeStreams(): Promise<{
|
||
success: boolean;
|
||
created: string[];
|
||
existing: string[];
|
||
failed: string[];
|
||
}> {
|
||
this.logger.log('开始初始化Zulip Streams', {
|
||
operation: 'initializeStreams',
|
||
timestamp: new Date().toISOString(),
|
||
});
|
||
|
||
const created: string[] = [];
|
||
const existing: string[] = [];
|
||
const failed: string[] = [];
|
||
|
||
try {
|
||
// 获取所有地图配置
|
||
const mapConfigs = this.configManager.getAllMapConfigs();
|
||
|
||
if (mapConfigs.length === 0) {
|
||
this.logger.warn('没有找到地图配置,跳过Stream初始化', {
|
||
operation: 'initializeStreams',
|
||
});
|
||
return { success: true, created, existing, failed };
|
||
}
|
||
|
||
// 获取所有唯一的Stream名称
|
||
const streamNames = new Set<string>();
|
||
mapConfigs.forEach(config => {
|
||
streamNames.add(config.zulipStream);
|
||
});
|
||
|
||
this.logger.log(`找到 ${streamNames.size} 个需要检查的Streams`, {
|
||
operation: 'initializeStreams',
|
||
streamCount: streamNames.size,
|
||
streams: Array.from(streamNames),
|
||
});
|
||
|
||
// 检查并创建每个Stream
|
||
for (const streamName of streamNames) {
|
||
try {
|
||
const exists = await this.checkStreamExists(streamName);
|
||
|
||
if (exists) {
|
||
existing.push(streamName);
|
||
this.logger.log(`Stream已存在: ${streamName}`, {
|
||
operation: 'initializeStreams',
|
||
streamName,
|
||
});
|
||
} else {
|
||
const createResult = await this.createStream(streamName);
|
||
|
||
if (createResult) {
|
||
created.push(streamName);
|
||
this.logger.log(`Stream创建成功: ${streamName}`, {
|
||
operation: 'initializeStreams',
|
||
streamName,
|
||
});
|
||
} else {
|
||
failed.push(streamName);
|
||
this.logger.warn(`Stream创建失败: ${streamName}`, {
|
||
operation: 'initializeStreams',
|
||
streamName,
|
||
});
|
||
}
|
||
}
|
||
} catch (error) {
|
||
const err = error as Error;
|
||
failed.push(streamName);
|
||
this.logger.error(`处理Stream失败: ${streamName}`, {
|
||
operation: 'initializeStreams',
|
||
streamName,
|
||
error: err.message,
|
||
});
|
||
}
|
||
}
|
||
|
||
this.initializationComplete = true;
|
||
|
||
const success = failed.length === 0;
|
||
|
||
this.logger.log('Stream初始化完成', {
|
||
operation: 'initializeStreams',
|
||
success,
|
||
totalStreams: streamNames.size,
|
||
created: created.length,
|
||
existing: existing.length,
|
||
failed: failed.length,
|
||
timestamp: new Date().toISOString(),
|
||
});
|
||
|
||
return { success, created, existing, failed };
|
||
|
||
} catch (error) {
|
||
const err = error as Error;
|
||
this.logger.error('Stream初始化失败', {
|
||
operation: 'initializeStreams',
|
||
error: err.message,
|
||
timestamp: new Date().toISOString(),
|
||
}, err.stack);
|
||
|
||
return { success: false, created, existing, failed };
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 检查Stream是否存在
|
||
*
|
||
* 功能描述:
|
||
* 使用Bot API Key检查指定的Stream是否在Zulip服务器上存在
|
||
*
|
||
* @param streamName Stream名称
|
||
* @returns Promise<boolean> 是否存在
|
||
*/
|
||
private async checkStreamExists(streamName: string): Promise<boolean> {
|
||
try {
|
||
// 获取Zulip配置
|
||
const zulipConfig = this.configManager.getZulipConfig();
|
||
|
||
if (!zulipConfig.zulipBotApiKey) {
|
||
this.logger.warn('Bot API Key未配置,跳过Stream检查', {
|
||
operation: 'checkStreamExists',
|
||
streamName,
|
||
});
|
||
return false;
|
||
}
|
||
|
||
// 动态导入zulip-js
|
||
const zulipModule: any = await import('zulip-js');
|
||
const zulipFactory = zulipModule.default || zulipModule;
|
||
|
||
// 创建Bot客户端
|
||
const client = await zulipFactory({
|
||
username: zulipConfig.zulipBotEmail,
|
||
apiKey: zulipConfig.zulipBotApiKey,
|
||
realm: zulipConfig.zulipServerUrl,
|
||
});
|
||
|
||
// 获取所有Streams
|
||
const result = await client.streams.retrieve();
|
||
|
||
if (result.result === 'success' && result.streams) {
|
||
const exists = result.streams.some(
|
||
(stream: any) => stream.name.toLowerCase() === streamName.toLowerCase()
|
||
);
|
||
return exists;
|
||
}
|
||
|
||
return false;
|
||
|
||
} catch (error) {
|
||
const err = error as Error;
|
||
this.logger.error('检查Stream失败', {
|
||
operation: 'checkStreamExists',
|
||
streamName,
|
||
error: err.message,
|
||
});
|
||
return false;
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 创建Stream
|
||
*
|
||
* 功能描述:
|
||
* 使用Bot API Key在Zulip服务器上创建新的Stream
|
||
*
|
||
* @param streamName Stream名称
|
||
* @param description Stream描述(可选)
|
||
* @returns Promise<boolean> 是否创建成功
|
||
*/
|
||
private async createStream(
|
||
streamName: string,
|
||
description?: string
|
||
): Promise<boolean> {
|
||
try {
|
||
// 获取Zulip配置
|
||
const zulipConfig = this.configManager.getZulipConfig();
|
||
|
||
if (!zulipConfig.zulipBotApiKey) {
|
||
this.logger.warn('Bot API Key未配置,无法创建Stream', {
|
||
operation: 'createStream',
|
||
streamName,
|
||
});
|
||
return false;
|
||
}
|
||
|
||
// 动态导入zulip-js
|
||
const zulipModule: any = await import('zulip-js');
|
||
const zulipFactory = zulipModule.default || zulipModule;
|
||
|
||
// 创建Bot客户端
|
||
const client = await zulipFactory({
|
||
username: zulipConfig.zulipBotEmail,
|
||
apiKey: zulipConfig.zulipBotApiKey,
|
||
realm: zulipConfig.zulipServerUrl,
|
||
});
|
||
|
||
// 查找对应的地图配置以获取描述
|
||
const mapConfig = this.configManager.getMapConfigByStream(streamName);
|
||
const streamDescription = description ||
|
||
(mapConfig ? `${mapConfig.mapName} - ${mapConfig.description || 'Game chat channel'}` :
|
||
`Game chat channel for ${streamName}`);
|
||
|
||
// 使用callEndpoint创建Stream
|
||
const result = await client.callEndpoint(
|
||
'/users/me/subscriptions',
|
||
'POST',
|
||
{
|
||
subscriptions: JSON.stringify([
|
||
{
|
||
name: streamName,
|
||
description: streamDescription
|
||
}
|
||
])
|
||
}
|
||
);
|
||
|
||
if (result.result === 'success') {
|
||
this.logger.log('Stream创建成功', {
|
||
operation: 'createStream',
|
||
streamName,
|
||
description: streamDescription,
|
||
});
|
||
return true;
|
||
} else {
|
||
this.logger.warn('Stream创建失败', {
|
||
operation: 'createStream',
|
||
streamName,
|
||
error: result.msg,
|
||
});
|
||
return false;
|
||
}
|
||
|
||
} catch (error) {
|
||
const err = error as Error;
|
||
this.logger.error('创建Stream异常', {
|
||
operation: 'createStream',
|
||
streamName,
|
||
error: err.message,
|
||
}, err.stack);
|
||
return false;
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 检查初始化是否完成
|
||
*
|
||
* @returns boolean 是否完成
|
||
*/
|
||
isInitializationComplete(): boolean {
|
||
return this.initializationComplete;
|
||
}
|
||
|
||
/**
|
||
* 手动触发Stream初始化
|
||
*
|
||
* 功能描述:
|
||
* 允许手动触发Stream初始化,用于配置更新后重新初始化
|
||
*
|
||
* @returns Promise<{success: boolean, created: string[], existing: string[], failed: string[]}>
|
||
*/
|
||
async reinitializeStreams(): Promise<{
|
||
success: boolean;
|
||
created: string[];
|
||
existing: string[];
|
||
failed: string[];
|
||
}> {
|
||
this.logger.log('手动触发Stream重新初始化', {
|
||
operation: 'reinitializeStreams',
|
||
timestamp: new Date().toISOString(),
|
||
});
|
||
|
||
this.initializationComplete = false;
|
||
return await this.initializeStreams();
|
||
}
|
||
}
|