范围: src/business/chat/ 涉及文件: - chat.module.ts - chat.service.ts 主要功能: - 添加ZulipAccountsModule依赖,支持查询用户Zulip账号 - 实现initializeZulipClientForUser方法,登录时自动初始化Zulip客户端 - 从数据库获取用户Zulip账号信息和API Key - 优化会话创建流程,使用已创建的Zulip客户端队列ID - 移除登出时的API Key删除逻辑,保持持久化 - 支持基于目标地图的消息发送(mapId参数) 技术改进: - 分离Zulip客户端初始化逻辑,提高代码可维护性 - 添加完整的错误处理和日志记录 - 支持用户没有Zulip账号的场景(优雅降级)
621 lines
19 KiB
TypeScript
621 lines
19 KiB
TypeScript
/**
|
||
* 聊天业务服务
|
||
*
|
||
* 功能描述:
|
||
* - 实现聊天相关的业务逻辑
|
||
* - 协调会话管理、消息过滤等子服务
|
||
* - 实现游戏内实时聊天 + Zulip 异步同步
|
||
*
|
||
* 架构层级:Business Layer(业务层)
|
||
*
|
||
* 核心优化:
|
||
* - 🚀 游戏内实时广播:后端直接广播给同区域用户
|
||
* - 🔄 Zulip异步同步:消息异步存储到Zulip
|
||
* - ⚡ 低延迟聊天体验
|
||
*
|
||
* 最近修改:
|
||
* - 2026-01-15: 功能完善 - WebSocket登录时自动初始化用户Zulip客户端 (修改者: AI)
|
||
* - 2026-01-14: 代码规范优化 - 提取魔法数字为常量 (修改者: moyin)
|
||
* - 2026-01-14: 代码规范优化 - 补充类级别JSDoc注释 (修改者: moyin)
|
||
* - 2026-01-14: 代码规范优化 - 补充接口定义的JSDoc注释 (修改者: moyin)
|
||
* - 2026-01-14: 代码规范优化 - 完善文件头注释和方法注释规范 (修改者: moyin)
|
||
*
|
||
* @author moyin
|
||
* @version 1.1.0
|
||
* @since 2026-01-14
|
||
* @lastModified 2026-01-15
|
||
*/
|
||
|
||
import { Injectable, Logger, Inject } from '@nestjs/common';
|
||
import { randomUUID } from 'crypto';
|
||
import { ChatSessionService } from './services/chat_session.service';
|
||
import { ChatFilterService } from './services/chat_filter.service';
|
||
import {
|
||
IZulipClientPoolService,
|
||
IApiKeySecurityService,
|
||
} from '../../core/zulip_core/zulip_core.interfaces';
|
||
import { LoginCoreService } from '../../core/login_core/login_core.service';
|
||
import { ZulipAccountsService } from '../../core/db/zulip_accounts/zulip_accounts.service';
|
||
import { ZulipAccountsMemoryService } from '../../core/db/zulip_accounts/zulip_accounts_memory.service';
|
||
|
||
// ========== 接口定义 ==========
|
||
|
||
/**
|
||
* 聊天消息请求接口
|
||
*/
|
||
export interface ChatMessageRequest {
|
||
/** WebSocket连接ID */
|
||
socketId: string;
|
||
/** 消息内容 */
|
||
content: string;
|
||
/** 消息范围:local(本地)、global(全局) */
|
||
scope: string;
|
||
/** 目标地图ID(可选,不传则使用会话当前地图) */
|
||
mapId?: string;
|
||
}
|
||
|
||
/**
|
||
* 聊天消息响应接口
|
||
*/
|
||
export interface ChatMessageResponse {
|
||
/** 是否成功 */
|
||
success: boolean;
|
||
/** 消息ID(成功时返回) */
|
||
messageId?: string;
|
||
/** 错误信息(失败时返回) */
|
||
error?: string;
|
||
}
|
||
|
||
/**
|
||
* 玩家登录请求接口
|
||
*/
|
||
export interface PlayerLoginRequest {
|
||
/** 认证Token */
|
||
token: string;
|
||
/** WebSocket连接ID */
|
||
socketId: string;
|
||
}
|
||
|
||
/**
|
||
* 登录响应接口
|
||
*/
|
||
export interface LoginResponse {
|
||
/** 是否成功 */
|
||
success: boolean;
|
||
/** 会话ID(成功时返回) */
|
||
sessionId?: string;
|
||
/** 用户ID(成功时返回) */
|
||
userId?: string;
|
||
/** 用户名(成功时返回) */
|
||
username?: string;
|
||
/** 当前地图ID(成功时返回) */
|
||
currentMap?: string;
|
||
/** 错误信息(失败时返回) */
|
||
error?: string;
|
||
}
|
||
|
||
/**
|
||
* 位置更新请求接口
|
||
*/
|
||
export interface PositionUpdateRequest {
|
||
/** WebSocket连接ID */
|
||
socketId: string;
|
||
/** X坐标 */
|
||
x: number;
|
||
/** Y坐标 */
|
||
y: number;
|
||
/** 地图ID */
|
||
mapId: string;
|
||
}
|
||
|
||
/**
|
||
* 游戏聊天消息格式(用于WebSocket广播)
|
||
*/
|
||
interface GameChatMessage {
|
||
/** 消息类型标识 */
|
||
t: 'chat_render';
|
||
/** 发送者用户名 */
|
||
from: string;
|
||
/** 消息文本内容 */
|
||
txt: string;
|
||
/** 是否显示气泡 */
|
||
bubble: boolean;
|
||
/** 时间戳(ISO格式) */
|
||
timestamp: string;
|
||
/** 消息ID */
|
||
messageId: string;
|
||
/** 地图ID */
|
||
mapId: string;
|
||
/** 消息范围 */
|
||
scope: string;
|
||
}
|
||
|
||
/**
|
||
* 聊天WebSocket网关接口
|
||
*/
|
||
interface IChatWebSocketGateway {
|
||
/**
|
||
* 向指定地图广播消息
|
||
* @param mapId 地图ID
|
||
* @param data 广播数据
|
||
* @param excludeId 排除的socketId(可选)
|
||
*/
|
||
broadcastToMap(mapId: string, data: any, excludeId?: string): void;
|
||
/**
|
||
* 向指定玩家发送消息
|
||
* @param socketId WebSocket连接ID
|
||
* @param data 发送数据
|
||
*/
|
||
sendToPlayer(socketId: string, data: any): void;
|
||
}
|
||
|
||
/**
|
||
* 聊天业务服务类
|
||
*
|
||
* 职责:
|
||
* - 处理玩家登录/登出的会话管理
|
||
* - 协调消息过滤和验证流程
|
||
* - 实现游戏内实时广播和Zulip异步同步
|
||
*
|
||
* 主要方法:
|
||
* - handlePlayerLogin() - 处理玩家登录认证和会话创建
|
||
* - handlePlayerLogout() - 处理玩家登出和资源清理
|
||
* - sendChatMessage() - 发送聊天消息并广播
|
||
* - updatePlayerPosition() - 更新玩家位置信息
|
||
*
|
||
* 使用场景:
|
||
* - 游戏客户端通过WebSocket连接后的聊天功能
|
||
* - 需要实时广播和持久化存储的聊天场景
|
||
*/
|
||
@Injectable()
|
||
export class ChatService {
|
||
private readonly logger = new Logger(ChatService.name);
|
||
private readonly DEFAULT_MAP = 'whale_port';
|
||
private readonly DEFAULT_POSITION = { x: 400, y: 300 };
|
||
private readonly DEFAULT_PAGE_SIZE = 50;
|
||
private readonly HISTORY_TIME_OFFSET_MS = 3600000; // 1小时
|
||
private websocketGateway: IChatWebSocketGateway;
|
||
|
||
constructor(
|
||
@Inject('ZULIP_CLIENT_POOL_SERVICE')
|
||
private readonly zulipClientPool: IZulipClientPoolService,
|
||
private readonly sessionService: ChatSessionService,
|
||
private readonly filterService: ChatFilterService,
|
||
@Inject('API_KEY_SECURITY_SERVICE')
|
||
private readonly apiKeySecurityService: IApiKeySecurityService,
|
||
private readonly loginCoreService: LoginCoreService,
|
||
@Inject('ZulipAccountsService')
|
||
private readonly zulipAccountsService: ZulipAccountsService | ZulipAccountsMemoryService,
|
||
) {
|
||
this.logger.log('ChatService初始化完成');
|
||
}
|
||
|
||
/**
|
||
* 设置WebSocket网关引用
|
||
* @param gateway WebSocket网关实例
|
||
*/
|
||
setWebSocketGateway(gateway: IChatWebSocketGateway): void {
|
||
this.websocketGateway = gateway;
|
||
this.logger.log('WebSocket网关引用设置完成');
|
||
}
|
||
|
||
/**
|
||
* 处理玩家登录
|
||
* @param request 登录请求,包含token和socketId
|
||
* @returns 登录响应,包含会话信息或错误信息
|
||
*/
|
||
async handlePlayerLogin(request: PlayerLoginRequest): Promise<LoginResponse> {
|
||
const startTime = Date.now();
|
||
|
||
this.logger.log('开始处理玩家登录', {
|
||
operation: 'handlePlayerLogin',
|
||
socketId: request.socketId,
|
||
});
|
||
|
||
try {
|
||
// 1. 验证参数
|
||
if (!request.token?.trim() || !request.socketId?.trim()) {
|
||
return { success: false, error: 'Token或socketId不能为空' };
|
||
}
|
||
|
||
// 2. 验证Token
|
||
const userInfo = await this.validateGameToken(request.token);
|
||
if (!userInfo) {
|
||
return { success: false, error: 'Token验证失败' };
|
||
}
|
||
|
||
// 3. 初始化用户的Zulip客户端(从数据库获取Zulip账号信息)
|
||
await this.initializeZulipClientForUser(userInfo.userId);
|
||
|
||
// 4. 创建会话
|
||
const sessionResult = await this.createUserSession(request.socketId, userInfo);
|
||
|
||
this.logger.log('玩家登录成功', {
|
||
operation: 'handlePlayerLogin',
|
||
socketId: request.socketId,
|
||
userId: userInfo.userId,
|
||
duration: Date.now() - startTime,
|
||
});
|
||
|
||
return {
|
||
success: true,
|
||
sessionId: sessionResult.sessionId,
|
||
userId: userInfo.userId,
|
||
username: userInfo.username,
|
||
currentMap: sessionResult.currentMap,
|
||
};
|
||
|
||
} catch (error) {
|
||
const err = error as Error;
|
||
this.logger.error('玩家登录失败', { error: err.message });
|
||
return { success: false, error: '登录失败,请稍后重试' };
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 处理玩家登出
|
||
* @param socketId WebSocket连接ID
|
||
* @param reason 登出原因:manual(手动)、timeout(超时)、disconnect(断开)
|
||
*/
|
||
async handlePlayerLogout(socketId: string, reason: 'manual' | 'timeout' | 'disconnect' = 'manual'): Promise<void> {
|
||
this.logger.log('开始处理玩家登出', { socketId, reason });
|
||
|
||
try {
|
||
const session = await this.sessionService.getSession(socketId);
|
||
if (!session) return;
|
||
|
||
const userId = session.userId;
|
||
|
||
// 清理Zulip客户端(注意:不删除Redis中的API Key,保持持久化)
|
||
if (userId) {
|
||
try {
|
||
await this.zulipClientPool.destroyUserClient(userId);
|
||
} catch (e) {
|
||
this.logger.warn('Zulip客户端清理失败', { error: (e as Error).message });
|
||
}
|
||
}
|
||
|
||
// 销毁会话
|
||
await this.sessionService.destroySession(socketId);
|
||
|
||
this.logger.log('玩家登出完成', { socketId, userId, reason });
|
||
|
||
} catch (error) {
|
||
this.logger.error('玩家登出失败', { error: (error as Error).message });
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 发送聊天消息
|
||
* @param request 聊天消息请求,包含socketId、content和scope
|
||
* @returns 发送结果,包含messageId或错误信息
|
||
*/
|
||
async sendChatMessage(request: ChatMessageRequest): Promise<ChatMessageResponse> {
|
||
const startTime = Date.now();
|
||
|
||
this.logger.log('开始处理聊天消息', {
|
||
operation: 'sendChatMessage',
|
||
socketId: request.socketId,
|
||
contentLength: request.content.length,
|
||
});
|
||
|
||
try {
|
||
// 1. 获取会话
|
||
const session = await this.sessionService.getSession(request.socketId);
|
||
if (!session) {
|
||
return { success: false, error: '会话不存在,请重新登录' };
|
||
}
|
||
|
||
// 2. 确定目标地图(优先使用请求中的mapId,否则使用会话当前地图)
|
||
const targetMapId = request.mapId || session.currentMap;
|
||
|
||
// 3. 获取上下文
|
||
const context = await this.sessionService.injectContext(request.socketId, targetMapId);
|
||
const targetStream = context.stream;
|
||
const targetTopic = context.topic || 'General';
|
||
|
||
// 4. 消息验证
|
||
const validationResult = await this.filterService.validateMessage(
|
||
session.userId,
|
||
request.content,
|
||
targetStream,
|
||
targetMapId,
|
||
);
|
||
|
||
if (!validationResult.allowed) {
|
||
return { success: false, error: validationResult.reason || '消息发送失败' };
|
||
}
|
||
|
||
const messageContent = validationResult.filteredContent || request.content;
|
||
const messageId = `game_${Date.now()}_${session.userId}`;
|
||
|
||
// 5. 🚀 立即广播给游戏内玩家(根据scope决定广播范围)
|
||
const gameMessage: GameChatMessage = {
|
||
t: 'chat_render',
|
||
from: session.username,
|
||
txt: messageContent,
|
||
bubble: true,
|
||
timestamp: new Date().toISOString(),
|
||
messageId,
|
||
mapId: targetMapId,
|
||
scope: request.scope,
|
||
};
|
||
|
||
// local: 只广播给目标地图的玩家; global: 广播给所有玩家(暂时也用地图广播)
|
||
this.broadcastToGamePlayers(targetMapId, gameMessage, request.socketId)
|
||
.catch(e => this.logger.warn('游戏内广播失败', { error: (e as Error).message }));
|
||
|
||
// 6. 🔄 异步同步到Zulip
|
||
this.syncToZulipAsync(session.userId, targetStream, targetTopic, messageContent, messageId)
|
||
.catch(e => this.logger.warn('Zulip同步失败', { error: (e as Error).message }));
|
||
|
||
this.logger.log('聊天消息发送完成', {
|
||
operation: 'sendChatMessage',
|
||
messageId,
|
||
duration: Date.now() - startTime,
|
||
});
|
||
|
||
return { success: true, messageId };
|
||
|
||
} catch (error) {
|
||
this.logger.error('聊天消息发送失败', { error: (error as Error).message });
|
||
return { success: false, error: '消息发送失败,请稍后重试' };
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 更新玩家位置
|
||
* @param request 位置更新请求,包含socketId、坐标和mapId
|
||
* @returns 更新是否成功
|
||
*/
|
||
async updatePlayerPosition(request: PositionUpdateRequest): Promise<boolean> {
|
||
try {
|
||
if (!request.socketId?.trim() || !request.mapId?.trim()) {
|
||
return false;
|
||
}
|
||
|
||
return await this.sessionService.updatePlayerPosition(
|
||
request.socketId,
|
||
request.mapId,
|
||
request.x,
|
||
request.y,
|
||
);
|
||
} catch (error) {
|
||
this.logger.error('更新位置失败', { error: (error as Error).message });
|
||
return false;
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 获取聊天历史
|
||
* @param query 查询参数,包含mapId、limit和offset
|
||
* @returns 聊天历史记录列表
|
||
*/
|
||
async getChatHistory(query: { mapId?: string; limit?: number; offset?: number }) {
|
||
// 模拟数据,实际应从Zulip获取
|
||
const mockMessages = [
|
||
{
|
||
id: 1,
|
||
sender: 'Player_123',
|
||
content: '大家好!',
|
||
scope: 'local',
|
||
mapId: query.mapId || 'whale_port',
|
||
timestamp: new Date(Date.now() - this.HISTORY_TIME_OFFSET_MS).toISOString(),
|
||
streamName: 'Whale Port',
|
||
topicName: 'Game Chat',
|
||
},
|
||
];
|
||
|
||
const limit = query.limit || this.DEFAULT_PAGE_SIZE;
|
||
const offset = query.offset || 0;
|
||
|
||
return {
|
||
success: true,
|
||
messages: mockMessages.slice(offset, offset + limit),
|
||
total: mockMessages.length,
|
||
count: Math.min(mockMessages.length, limit),
|
||
};
|
||
}
|
||
|
||
/**
|
||
* 获取会话信息
|
||
* @param socketId WebSocket连接ID
|
||
* @returns 会话信息或null
|
||
*/
|
||
async getSession(socketId: string) {
|
||
return this.sessionService.getSession(socketId);
|
||
}
|
||
|
||
// ========== 私有方法 ==========
|
||
|
||
/**
|
||
* 初始化用户的Zulip客户端
|
||
*
|
||
* 功能描述:
|
||
* 1. 从数据库获取用户的Zulip账号信息
|
||
* 2. 检查Redis中是否已有API Key缓存
|
||
* 3. 如果Redis中没有,从数据库标记判断是否需要重新获取
|
||
* 4. 创建Zulip客户端实例
|
||
*
|
||
* @param userId 用户ID
|
||
*/
|
||
private async initializeZulipClientForUser(userId: string): Promise<void> {
|
||
this.logger.log('开始初始化用户Zulip客户端', {
|
||
operation: 'initializeZulipClientForUser',
|
||
userId,
|
||
});
|
||
|
||
try {
|
||
// 1. 从数据库获取用户的Zulip账号信息
|
||
const zulipAccount = await this.zulipAccountsService.findByGameUserId(userId);
|
||
|
||
if (!zulipAccount) {
|
||
this.logger.debug('用户没有关联的Zulip账号,跳过Zulip客户端初始化', {
|
||
operation: 'initializeZulipClientForUser',
|
||
userId,
|
||
});
|
||
return;
|
||
}
|
||
|
||
if (zulipAccount.status !== 'active') {
|
||
this.logger.warn('用户Zulip账号状态异常,跳过初始化', {
|
||
operation: 'initializeZulipClientForUser',
|
||
userId,
|
||
status: zulipAccount.status,
|
||
});
|
||
return;
|
||
}
|
||
|
||
// 2. 检查Redis中是否已有API Key
|
||
const existingApiKey = await this.apiKeySecurityService.getApiKey(userId);
|
||
|
||
if (existingApiKey.success && existingApiKey.apiKey) {
|
||
this.logger.log('Redis中已有API Key缓存,直接创建Zulip客户端', {
|
||
operation: 'initializeZulipClientForUser',
|
||
userId,
|
||
zulipEmail: zulipAccount.zulipEmail,
|
||
});
|
||
|
||
// 创建Zulip客户端
|
||
await this.createZulipClientWithApiKey(
|
||
userId,
|
||
zulipAccount.zulipEmail,
|
||
existingApiKey.apiKey
|
||
);
|
||
return;
|
||
}
|
||
|
||
// 3. Redis中没有API Key,记录警告
|
||
// 注意:由于登录时没有用户密码,无法重新生成API Key
|
||
// API Key应该在用户注册时存储到Redis,如果丢失需要用户重新绑定Zulip账号
|
||
this.logger.warn('Redis中没有用户的Zulip API Key缓存,无法创建Zulip客户端', {
|
||
operation: 'initializeZulipClientForUser',
|
||
userId,
|
||
zulipEmail: zulipAccount.zulipEmail,
|
||
hint: '用户可能需要重新绑定Zulip账号',
|
||
});
|
||
|
||
} catch (error) {
|
||
const err = error as Error;
|
||
this.logger.error('初始化用户Zulip客户端失败', {
|
||
operation: 'initializeZulipClientForUser',
|
||
userId,
|
||
error: err.message,
|
||
});
|
||
// 不抛出异常,允许用户继续登录(只是没有Zulip功能)
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 使用API Key创建Zulip客户端
|
||
*
|
||
* @param userId 用户ID
|
||
* @param zulipEmail Zulip邮箱
|
||
* @param apiKey API Key
|
||
*/
|
||
private async createZulipClientWithApiKey(
|
||
userId: string,
|
||
zulipEmail: string,
|
||
apiKey: string
|
||
): Promise<void> {
|
||
try {
|
||
const clientInstance = await this.zulipClientPool.createUserClient(userId, {
|
||
username: zulipEmail,
|
||
apiKey: apiKey,
|
||
realm: process.env.ZULIP_SERVER_URL || 'https://zulip.xinghangee.icu/',
|
||
});
|
||
|
||
this.logger.log('Zulip客户端创建成功', {
|
||
operation: 'createZulipClientWithApiKey',
|
||
userId,
|
||
zulipEmail,
|
||
queueId: clientInstance.queueId,
|
||
});
|
||
} catch (error) {
|
||
const err = error as Error;
|
||
this.logger.error('创建Zulip客户端失败', {
|
||
operation: 'createZulipClientWithApiKey',
|
||
userId,
|
||
zulipEmail,
|
||
error: err.message,
|
||
});
|
||
throw error;
|
||
}
|
||
}
|
||
|
||
private async validateGameToken(token: string) {
|
||
try {
|
||
const payload = await this.loginCoreService.verifyToken(token, 'access');
|
||
if (!payload?.sub) return null;
|
||
|
||
return {
|
||
userId: payload.sub,
|
||
username: payload.username || `user_${payload.sub}`,
|
||
email: payload.email || `${payload.sub}@example.com`,
|
||
zulipEmail: undefined,
|
||
zulipApiKey: undefined,
|
||
};
|
||
} catch (error) {
|
||
this.logger.warn('Token验证失败', { error: (error as Error).message });
|
||
return null;
|
||
}
|
||
}
|
||
|
||
private async createUserSession(socketId: string, userInfo: any) {
|
||
const sessionId = randomUUID();
|
||
|
||
// 尝试获取已创建的Zulip客户端的队列ID
|
||
let zulipQueueId = `queue_${sessionId}`;
|
||
try {
|
||
const existingClient = await this.zulipClientPool.getUserClient(userInfo.userId);
|
||
if (existingClient?.queueId) {
|
||
zulipQueueId = existingClient.queueId;
|
||
}
|
||
} catch (e) {
|
||
this.logger.debug('获取Zulip客户端队列ID失败,使用默认值', {
|
||
error: (e as Error).message
|
||
});
|
||
}
|
||
|
||
const session = await this.sessionService.createSession(
|
||
socketId,
|
||
userInfo.userId,
|
||
zulipQueueId,
|
||
userInfo.username,
|
||
this.DEFAULT_MAP,
|
||
this.DEFAULT_POSITION,
|
||
);
|
||
|
||
return { sessionId, currentMap: session.currentMap };
|
||
}
|
||
|
||
private async broadcastToGamePlayers(mapId: string, message: GameChatMessage, excludeSocketId?: string) {
|
||
if (!this.websocketGateway) {
|
||
throw new Error('WebSocket网关未设置');
|
||
}
|
||
|
||
const sockets = await this.sessionService.getSocketsInMap(mapId);
|
||
const targetSockets = sockets.filter(id => id !== excludeSocketId);
|
||
|
||
for (const socketId of targetSockets) {
|
||
try {
|
||
this.websocketGateway.sendToPlayer(socketId, message);
|
||
} catch (e) {
|
||
this.logger.warn('发送消息失败', { socketId, error: (e as Error).message });
|
||
}
|
||
}
|
||
}
|
||
|
||
private async syncToZulipAsync(userId: string, stream: string, topic: string, content: string, gameMessageId: string) {
|
||
try {
|
||
const apiKeyResult = await this.apiKeySecurityService.getApiKey(userId);
|
||
if (!apiKeyResult.success || !apiKeyResult.apiKey) return;
|
||
|
||
const zulipContent = `${content}\n\n*[游戏消息ID: ${gameMessageId}]*`;
|
||
await this.zulipClientPool.sendMessage(userId, stream, topic, zulipContent);
|
||
} catch (error) {
|
||
this.logger.warn('Zulip同步异常', { error: (error as Error).message });
|
||
}
|
||
}
|
||
}
|