feat(gateway/chat): 新增聊天网关模块

范围:src/gateway/chat/
- 新增 ChatWebSocketGateway WebSocket 网关,处理实时聊天通信
- 新增 ChatController HTTP 控制器,提供聊天历史和系统状态接口
- 新增 ChatGatewayModule 模块配置,整合网关层组件
- 新增请求/响应 DTO 定义,提供数据验证和类型约束
- 新增完整的单元测试覆盖
- 新增模块 README 文档,包含接口说明、核心特性和风险评估
This commit is contained in:
moyin
2026-01-14 19:11:25 +08:00
parent 3f3c29354e
commit 5bcf3cb678
8 changed files with 1702 additions and 0 deletions

View File

@@ -0,0 +1,461 @@
/**
* 聊天 WebSocket 网关
*
* 功能描述:
* - 处理 WebSocket 协议连接和消息
* - 只做协议转换,不包含业务逻辑
* - 将消息路由到 Business 层处理
*
* 架构层级Gateway Layer网关层
*
* 职责:
* - WebSocket 连接管理
* - 消息协议解析
* - 路由到业务层
* - 错误转换
*
* WebSocket 事件:
* - connection: 客户端连接事件
* - message: 消息接收事件login/logout/chat/position
* - close: 客户端断开事件
* - error: 错误处理事件
*
* 最近修改:
* - 2026-01-14: 代码规范优化 - 提取常量、替换弃用API (修改者: moyin)
* - 2026-01-14: 代码规范优化 - 完善注释规范 (修改者: moyin)
*
* @author moyin
* @version 1.0.2
* @since 2026-01-14
* @lastModified 2026-01-14
*/
import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import * as WebSocket from 'ws';
import { ChatService } from '../../business/chat/chat.service';
/** WebSocket 服务器默认端口 */
const DEFAULT_WEBSOCKET_PORT = 3001;
/** 默认地图 ID */
const DEFAULT_MAP_ID = 'whale_port';
/**
* 扩展的 WebSocket 接口
*/
interface ExtendedWebSocket extends WebSocket {
id: string;
isAlive?: boolean;
authenticated?: boolean;
userId?: string;
username?: string;
sessionId?: string;
currentMap?: string;
}
/**
* WebSocket 网关接口 - 供业务层调用
*/
export interface IChatWebSocketGateway {
sendToPlayer(socketId: string, data: any): void;
broadcastToMap(mapId: string, data: any, excludeId?: string): void;
getConnectionCount(): number;
getAuthenticatedConnectionCount(): number;
getMapPlayerCounts(): Record<string, number>;
getMapPlayers(mapId: string): string[];
}
@Injectable()
/**
* 聊天 WebSocket 网关类
*
* 职责:
* - 管理 WebSocket 客户端连接
* - 解析和路由 WebSocket 消息
* - 管理地图房间和玩家广播
*
* 主要方法:
* - sendToPlayer() - 向指定玩家发送消息
* - broadcastToMap() - 向地图内所有玩家广播
* - getConnectionCount() - 获取连接数统计
*
* 使用场景:
* - 游戏内实时聊天通信
* - 玩家位置同步广播
*/
export class ChatWebSocketGateway implements OnModuleInit, OnModuleDestroy, IChatWebSocketGateway {
private server: WebSocket.Server;
private readonly logger = new Logger(ChatWebSocketGateway.name);
private clients = new Map<string, ExtendedWebSocket>();
private mapRooms = new Map<string, Set<string>>();
constructor(private readonly chatService: ChatService) {}
async onModuleInit() {
const port = process.env.WEBSOCKET_PORT ? parseInt(process.env.WEBSOCKET_PORT) : DEFAULT_WEBSOCKET_PORT;
this.server = new WebSocket.Server({
port,
path: '/game'
});
this.server.on('connection', (ws: ExtendedWebSocket) => {
ws.id = this.generateClientId();
ws.isAlive = true;
ws.authenticated = false;
this.clients.set(ws.id, ws);
this.logger.log(`新的WebSocket连接: ${ws.id}`);
ws.on('message', (data) => this.handleRawMessage(ws, data));
ws.on('close', (code, reason) => this.handleClose(ws, code, reason));
ws.on('error', (error) => this.handleError(ws, error));
this.sendMessage(ws, {
type: 'connected',
message: '连接成功',
socketId: ws.id
});
});
// 设置网关引用到业务层
this.chatService.setWebSocketGateway(this);
this.logger.log(`WebSocket服务器启动成功端口: ${port},路径: /game`);
}
async onModuleDestroy() {
if (this.server) {
this.server.close();
this.logger.log('WebSocket服务器已关闭');
}
}
/**
* 处理原始消息 - 协议解析
*
* @param ws WebSocket 连接实例
* @param data 原始消息数据
*/
private handleRawMessage(ws: ExtendedWebSocket, data: WebSocket.RawData) {
try {
const message = JSON.parse(data.toString());
this.routeMessage(ws, message);
} catch (error) {
this.logger.error('解析消息失败', error);
this.sendError(ws, '消息格式错误');
}
}
/**
* 消息路由 - 根据类型分发到业务层
*
* @param ws WebSocket 连接实例
* @param message 解析后的消息对象
*/
private async routeMessage(ws: ExtendedWebSocket, message: any) {
const messageType = message.type || message.t;
this.logger.log(`收到消息: ${ws.id}, 类型: ${messageType}`);
switch (messageType) {
case 'login':
await this.handleLogin(ws, message);
break;
case 'logout':
await this.handleLogout(ws);
break;
case 'chat':
await this.handleChat(ws, message);
break;
case 'position':
await this.handlePosition(ws, message);
break;
default:
this.logger.warn(`未知消息类型: ${messageType}`);
this.sendError(ws, `未知消息类型: ${messageType}`);
}
}
/**
* 处理登录 - 协议转换后调用业务层
*
* @param ws WebSocket 连接实例
* @param message 登录消息(包含 token
*/
private async handleLogin(ws: ExtendedWebSocket, message: any) {
if (!message.token) {
this.sendError(ws, 'Token不能为空');
return;
}
try {
const result = await this.chatService.handlePlayerLogin({
socketId: ws.id,
token: message.token
});
if (result.success) {
ws.authenticated = true;
ws.userId = result.userId;
ws.username = result.username;
ws.sessionId = result.sessionId;
ws.currentMap = result.currentMap || DEFAULT_MAP_ID;
this.joinMapRoom(ws.id, ws.currentMap);
this.sendMessage(ws, {
t: 'login_success',
sessionId: result.sessionId,
userId: result.userId,
username: result.username,
currentMap: ws.currentMap
});
this.logger.log(`用户登录成功: ${result.username} (${ws.id})`);
} else {
this.sendMessage(ws, {
t: 'login_error',
message: result.error || '登录失败'
});
}
} catch (error) {
this.logger.error('登录处理失败', error);
this.sendError(ws, '登录处理失败');
}
}
/**
* 处理登出
*
* @param ws WebSocket 连接实例
*/
private async handleLogout(ws: ExtendedWebSocket) {
if (!ws.authenticated) {
this.sendError(ws, '用户未登录');
return;
}
try {
await this.chatService.handlePlayerLogout(ws.id, 'manual');
this.cleanupClient(ws);
this.sendMessage(ws, {
t: 'logout_success',
message: '登出成功'
});
ws.close(1000, '用户主动登出');
} catch (error) {
this.logger.error('登出处理失败', error);
this.sendError(ws, '登出处理失败');
}
}
/**
* 处理聊天消息
*
* @param ws WebSocket 连接实例
* @param message 聊天消息(包含 content, scope
*/
private async handleChat(ws: ExtendedWebSocket, message: any) {
if (!ws.authenticated) {
this.sendError(ws, '请先登录');
return;
}
if (!message.content) {
this.sendError(ws, '消息内容不能为空');
return;
}
try {
const result = await this.chatService.sendChatMessage({
socketId: ws.id,
content: message.content,
scope: message.scope || 'local'
});
if (result.success) {
this.sendMessage(ws, {
t: 'chat_sent',
messageId: result.messageId,
message: '消息发送成功'
});
} else {
this.sendMessage(ws, {
t: 'chat_error',
message: result.error || '消息发送失败'
});
}
} catch (error) {
this.logger.error('聊天处理失败', error);
this.sendError(ws, '聊天处理失败');
}
}
/**
* 处理位置更新
*
* @param ws WebSocket 连接实例
* @param message 位置消息(包含 x, y, mapId
*/
private async handlePosition(ws: ExtendedWebSocket, message: any) {
if (!ws.authenticated) {
this.sendError(ws, '请先登录');
return;
}
try {
// 如果切换地图,更新房间
if (ws.currentMap !== message.mapId) {
this.leaveMapRoom(ws.id, ws.currentMap);
this.joinMapRoom(ws.id, message.mapId);
ws.currentMap = message.mapId;
}
await this.chatService.updatePlayerPosition({
socketId: ws.id,
x: message.x,
y: message.y,
mapId: message.mapId
});
// 广播位置更新
this.broadcastToMap(message.mapId, {
t: 'position_update',
userId: ws.userId,
username: ws.username,
x: message.x,
y: message.y,
mapId: message.mapId
}, ws.id);
} catch (error) {
this.logger.error('位置更新处理失败', error);
this.sendError(ws, '位置更新处理失败');
}
}
/**
* 处理连接关闭
*
* @param ws WebSocket 连接实例
* @param code 关闭状态码
* @param reason 关闭原因
*/
private handleClose(ws: ExtendedWebSocket, code: number, reason: Buffer) {
this.logger.log(`WebSocket连接关闭: ${ws.id}`, { code, reason: reason?.toString() });
let logoutReason: 'manual' | 'timeout' | 'disconnect' = 'disconnect';
if (code === 1000) logoutReason = 'manual';
this.cleanupClient(ws, logoutReason);
}
/**
* 处理错误
*
* @param ws WebSocket 连接实例
* @param error 错误对象
*/
private handleError(ws: ExtendedWebSocket, error: Error) {
this.logger.error(`WebSocket错误: ${ws.id}`, error);
}
// ========== IChatWebSocketGateway 接口实现 ==========
public sendToPlayer(socketId: string, data: any): void {
const client = this.clients.get(socketId);
if (client && client.readyState === WebSocket.OPEN) {
this.sendMessage(client, data);
}
}
public broadcastToMap(mapId: string, data: any, excludeId?: string): void {
const room = this.mapRooms.get(mapId);
if (!room) return;
room.forEach(clientId => {
if (clientId !== excludeId) {
const client = this.clients.get(clientId);
if (client && client.authenticated && client.readyState === WebSocket.OPEN) {
this.sendMessage(client, data);
}
}
});
}
public getConnectionCount(): number {
return this.clients.size;
}
public getAuthenticatedConnectionCount(): number {
return Array.from(this.clients.values()).filter(c => c.authenticated).length;
}
public getMapPlayerCounts(): Record<string, number> {
const counts: Record<string, number> = {};
this.mapRooms.forEach((clients, mapId) => {
counts[mapId] = clients.size;
});
return counts;
}
public getMapPlayers(mapId: string): string[] {
const room = this.mapRooms.get(mapId);
if (!room) return [];
const players: string[] = [];
room.forEach(clientId => {
const client = this.clients.get(clientId);
if (client?.authenticated && client.username) {
players.push(client.username);
}
});
return players;
}
// ========== 私有辅助方法 ==========
private sendMessage(ws: ExtendedWebSocket, data: any) {
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify(data));
}
}
private sendError(ws: ExtendedWebSocket, message: string) {
this.sendMessage(ws, { type: 'error', message });
}
private joinMapRoom(clientId: string, mapId: string) {
if (!this.mapRooms.has(mapId)) {
this.mapRooms.set(mapId, new Set());
}
this.mapRooms.get(mapId).add(clientId);
}
private leaveMapRoom(clientId: string, mapId: string) {
const room = this.mapRooms.get(mapId);
if (room) {
room.delete(clientId);
if (room.size === 0) this.mapRooms.delete(mapId);
}
}
private async cleanupClient(ws: ExtendedWebSocket, reason: 'manual' | 'timeout' | 'disconnect' = 'disconnect') {
try {
if (ws.authenticated && ws.id) {
await this.chatService.handlePlayerLogout(ws.id, reason);
}
if (ws.currentMap) {
this.leaveMapRoom(ws.id, ws.currentMap);
}
this.clients.delete(ws.id);
} catch (error) {
this.logger.error(`清理客户端失败: ${ws.id}`, error);
}
}
private generateClientId(): string {
return `ws_${Date.now()}_${Math.random().toString(36).substring(2, 11)}`;
}
}