diff --git a/src/business/zulip/chat.controller.ts b/src/business/zulip/chat.controller.ts index ea546ce..dad654d 100644 --- a/src/business/zulip/chat.controller.ts +++ b/src/business/zulip/chat.controller.ts @@ -43,6 +43,7 @@ import { import { JwtAuthGuard } from '../auth/jwt_auth.guard'; import { ZulipService } from './zulip.service'; import { ZulipWebSocketGateway } from './zulip_websocket.gateway'; +import { CleanWebSocketGateway } from './clean_websocket.gateway'; import { SendChatMessageDto, ChatMessageResponseDto, @@ -58,7 +59,7 @@ export class ChatController { constructor( private readonly zulipService: ZulipService, - private readonly websocketGateway: ZulipWebSocketGateway, + private readonly websocketGateway: CleanWebSocketGateway, ) {} /** @@ -255,6 +256,7 @@ export class ChatController { // 获取 WebSocket 连接状态 const totalConnections = await this.websocketGateway.getConnectionCount(); const authenticatedConnections = await this.websocketGateway.getAuthenticatedConnectionCount(); + const mapPlayerCounts = await this.websocketGateway.getMapPlayerCounts(); // 获取内存使用情况 const memoryUsage = process.memoryUsage(); @@ -267,11 +269,7 @@ export class ChatController { totalConnections, authenticatedConnections, activeSessions: authenticatedConnections, // 简化处理 - mapPlayerCounts: { - 'whale_port': Math.floor(authenticatedConnections * 0.4), - 'pumpkin_valley': Math.floor(authenticatedConnections * 0.3), - 'novice_village': Math.floor(authenticatedConnections * 0.3), - }, + mapPlayerCounts: mapPlayerCounts, }, zulip: { serverConnected: true, // 需要实际检查 @@ -349,19 +347,21 @@ export class ChatController { }) async getWebSocketInfo() { return { - websocketUrl: 'ws://localhost:3000/game', - namespace: '/game', + websocketUrl: 'ws://localhost:3001', + namespace: '/', supportedEvents: [ 'login', // 用户登录 'chat', // 发送聊天消息 - 'position_update', // 位置更新 + 'position', // 位置更新 ], supportedResponses: [ + 'connected', // 连接确认 'login_success', // 登录成功 'login_error', // 登录失败 'chat_sent', // 消息发送成功 'chat_error', // 消息发送失败 'chat_render', // 接收到聊天消息 + 'error', // 通用错误 ], authRequired: true, tokenType: 'JWT', diff --git a/src/business/zulip/clean_websocket.gateway.ts b/src/business/zulip/clean_websocket.gateway.ts new file mode 100644 index 0000000..91e7524 --- /dev/null +++ b/src/business/zulip/clean_websocket.gateway.ts @@ -0,0 +1,346 @@ +/** + * 清洁的WebSocket网关 + * 使用原生WebSocket,不依赖NestJS的WebSocket装饰器 + */ + +import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; +import * as WebSocket from 'ws'; +import { ZulipService } from './zulip.service'; +import { SessionManagerService } from './services/session_manager.service'; + +interface ExtendedWebSocket extends WebSocket { + id: string; + isAlive?: boolean; + authenticated?: boolean; + userId?: string; + username?: string; + sessionId?: string; + currentMap?: string; +} + +@Injectable() +export class CleanWebSocketGateway implements OnModuleInit, OnModuleDestroy { + private server: WebSocket.Server; + private readonly logger = new Logger(CleanWebSocketGateway.name); + private clients = new Map(); + private mapRooms = new Map>(); // mapId -> Set + + constructor( + private readonly zulipService: ZulipService, + private readonly sessionManager: SessionManagerService, + ) {} + + async onModuleInit() { + const port = 3001; + + this.server = new WebSocket.Server({ port }); + + this.server.on('connection', (ws: ExtendedWebSocket) => { + ws.id = this.generateClientId(); + ws.isAlive = true; + ws.authenticated = false; + + this.clients.set(ws.id, ws); + + this.logger.log(`新的WebSocket连接: ${ws.id}`); + + ws.on('message', (data) => { + try { + const message = JSON.parse(data.toString()); + this.handleMessage(ws, message); + } catch (error) { + this.logger.error('解析消息失败', error); + this.sendError(ws, '消息格式错误'); + } + }); + + ws.on('close', () => { + this.logger.log(`WebSocket连接关闭: ${ws.id}`); + this.cleanupClient(ws); + }); + + ws.on('error', (error) => { + this.logger.error(`WebSocket错误: ${ws.id}`, error); + }); + + // 发送连接确认 + this.sendMessage(ws, { + type: 'connected', + message: '连接成功', + socketId: ws.id + }); + }); + + this.logger.log(`WebSocket服务器启动成功,端口: ${port}`); + } + + async onModuleDestroy() { + if (this.server) { + this.server.close(); + this.logger.log('WebSocket服务器已关闭'); + } + } + + private async handleMessage(ws: ExtendedWebSocket, message: any) { + this.logger.log(`收到消息: ${ws.id}`, message); + + const messageType = message.type || message.t; + + this.logger.log(`消息类型: ${messageType}`, { type: message.type, t: message.t }); + + switch (messageType) { + case 'login': + await this.handleLogin(ws, message); + break; + case 'chat': + await this.handleChat(ws, message); + break; + case 'position': + await this.handlePositionUpdate(ws, message); + break; + default: + this.logger.warn(`未知消息类型: ${messageType}`, message); + this.sendError(ws, `未知消息类型: ${messageType}`); + } + } + + private async handleLogin(ws: ExtendedWebSocket, message: any) { + try { + if (!message.token) { + this.sendError(ws, 'Token不能为空'); + return; + } + + // 调用ZulipService进行登录 + const result = await this.zulipService.handlePlayerLogin({ + socketId: ws.id, + token: message.token + }); + + if (result.success) { + ws.authenticated = true; + ws.userId = result.userId; + ws.username = result.username; + ws.sessionId = result.sessionId; + ws.currentMap = 'whale_port'; // 默认地图 + + // 加入默认地图房间 + this.joinMapRoom(ws.id, ws.currentMap); + + this.sendMessage(ws, { + t: 'login_success', + sessionId: result.sessionId, + userId: result.userId, + username: result.username, + currentMap: ws.currentMap + }); + + this.logger.log(`用户登录成功: ${result.username} (${ws.id}) 进入地图: ${ws.currentMap}`); + } else { + this.sendMessage(ws, { + t: 'login_error', + message: result.error || '登录失败' + }); + } + } catch (error) { + this.logger.error('登录处理失败', error); + this.sendError(ws, '登录处理失败'); + } + } + + private async handleChat(ws: ExtendedWebSocket, message: any) { + try { + if (!ws.authenticated) { + this.sendError(ws, '请先登录'); + return; + } + + if (!message.content) { + this.sendError(ws, '消息内容不能为空'); + return; + } + + // 调用ZulipService发送消息 + const result = await this.zulipService.sendChatMessage({ + socketId: ws.id, + content: message.content, + scope: message.scope || 'local' + }); + + if (result.success) { + this.sendMessage(ws, { + t: 'chat_sent', + messageId: result.messageId, + message: '消息发送成功' + }); + + // 广播消息给其他用户(根据scope决定范围) + if (message.scope === 'global') { + // 全局消息:广播给所有已认证用户 + this.broadcastMessage({ + t: 'chat_render', + from: ws.username, + txt: message.content, + bubble: true, + scope: 'global' + }, ws.id); + } else { + // 本地消息:只广播给同一地图的用户 + this.broadcastToMap(ws.currentMap, { + t: 'chat_render', + from: ws.username, + txt: message.content, + bubble: true, + scope: 'local', + mapId: ws.currentMap + }, ws.id); + } + + this.logger.log(`消息发送成功: ${ws.username} -> ${message.content}`); + } else { + this.sendMessage(ws, { + t: 'chat_error', + message: result.error || '消息发送失败' + }); + } + } catch (error) { + this.logger.error('聊天处理失败', error); + this.sendError(ws, '聊天处理失败'); + } + } + + private async handlePositionUpdate(ws: ExtendedWebSocket, message: any) { + try { + if (!ws.authenticated) { + this.sendError(ws, '请先登录'); + return; + } + + // 简单的位置更新处理,这里可以添加更多逻辑 + this.logger.log(`位置更新: ${ws.username} -> (${message.x}, ${message.y}) 在 ${message.mapId}`); + + // 如果用户切换了地图,更新房间 + if (ws.currentMap !== message.mapId) { + this.leaveMapRoom(ws.id, ws.currentMap); + this.joinMapRoom(ws.id, message.mapId); + ws.currentMap = message.mapId; + + this.logger.log(`用户 ${ws.username} 切换到地图: ${message.mapId}`); + } + + // 广播位置更新给同一地图的其他用户 + this.broadcastToMap(message.mapId, { + t: 'position_update', + userId: ws.userId, + username: ws.username, + x: message.x, + y: message.y, + mapId: message.mapId + }, ws.id); + + } catch (error) { + this.logger.error('位置更新处理失败', error); + this.sendError(ws, '位置更新处理失败'); + } + } + + private sendMessage(ws: ExtendedWebSocket, data: any) { + if (ws.readyState === WebSocket.OPEN) { + ws.send(JSON.stringify(data)); + } + } + + private sendError(ws: ExtendedWebSocket, message: string) { + this.sendMessage(ws, { + type: 'error', + message: message + }); + } + + private broadcastMessage(data: any, excludeId?: string) { + this.clients.forEach((client, id) => { + if (id !== excludeId && client.authenticated) { + this.sendMessage(client, data); + } + }); + } + + private broadcastToMap(mapId: string, data: any, excludeId?: string) { + const room = this.mapRooms.get(mapId); + if (!room) return; + + room.forEach(clientId => { + if (clientId !== excludeId) { + const client = this.clients.get(clientId); + if (client && client.authenticated) { + this.sendMessage(client, data); + } + } + }); + } + + private joinMapRoom(clientId: string, mapId: string) { + if (!this.mapRooms.has(mapId)) { + this.mapRooms.set(mapId, new Set()); + } + this.mapRooms.get(mapId).add(clientId); + + this.logger.log(`客户端 ${clientId} 加入地图房间: ${mapId}`); + } + + private leaveMapRoom(clientId: string, mapId: string) { + const room = this.mapRooms.get(mapId); + if (room) { + room.delete(clientId); + if (room.size === 0) { + this.mapRooms.delete(mapId); + } + this.logger.log(`客户端 ${clientId} 离开地图房间: ${mapId}`); + } + } + + private cleanupClient(ws: ExtendedWebSocket) { + // 从地图房间中移除 + if (ws.currentMap) { + this.leaveMapRoom(ws.id, ws.currentMap); + } + + // 从客户端列表中移除 + this.clients.delete(ws.id); + } + + private generateClientId(): string { + return `ws_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`; + } + + // 公共方法供其他服务调用 + public getConnectionCount(): number { + return this.clients.size; + } + + public getAuthenticatedConnectionCount(): number { + return Array.from(this.clients.values()).filter(client => client.authenticated).length; + } + + public getMapPlayerCounts(): Record { + const counts: Record = {}; + this.mapRooms.forEach((clients, mapId) => { + counts[mapId] = clients.size; + }); + return counts; + } + + public getMapPlayers(mapId: string): string[] { + const room = this.mapRooms.get(mapId); + if (!room) return []; + + const players: string[] = []; + room.forEach(clientId => { + const client = this.clients.get(clientId); + if (client && client.authenticated && client.username) { + players.push(client.username); + } + }); + return players; + } +} \ No newline at end of file diff --git a/src/business/zulip/zulip.module.ts b/src/business/zulip/zulip.module.ts index c146c57..091129c 100644 --- a/src/business/zulip/zulip.module.ts +++ b/src/business/zulip/zulip.module.ts @@ -44,6 +44,7 @@ import { Module } from '@nestjs/common'; import { ZulipWebSocketGateway } from './zulip_websocket.gateway'; +import { CleanWebSocketGateway } from './clean_websocket.gateway'; import { ZulipService } from './zulip.service'; import { SessionManagerService } from './services/session_manager.service'; import { MessageFilterService } from './services/message_filter.service'; @@ -86,7 +87,7 @@ import { AuthModule } from '../auth/auth.module'; // 会话清理服务 - 定时清理过期会话 SessionCleanupService, // WebSocket网关 - 处理游戏客户端WebSocket连接 - ZulipWebSocketGateway, + CleanWebSocketGateway, ], controllers: [ // 聊天相关的REST API控制器 @@ -108,7 +109,7 @@ import { AuthModule } from '../auth/auth.module'; // 导出会话清理服务 SessionCleanupService, // 导出WebSocket网关 - ZulipWebSocketGateway, + CleanWebSocketGateway, ], }) export class ZulipModule {} \ No newline at end of file diff --git a/src/business/zulip/zulip_websocket.gateway.ts b/src/business/zulip/zulip_websocket.gateway.ts index a6bc187..1facdcc 100644 --- a/src/business/zulip/zulip_websocket.gateway.ts +++ b/src/business/zulip/zulip_websocket.gateway.ts @@ -25,28 +25,28 @@ * - 连接状态管理和权限验证 * * 最近修改: - * - 2026-01-07: 代码规范优化 - 完善文件头注释和修改记录 (修改者: moyin) + * - 2026-01-09: 重构为原生WebSocket - 移除Socket.IO依赖,使用原生WebSocket (修改者: moyin) * * @author angjustinl - * @version 1.0.1 + * @version 2.0.0 * @since 2025-12-25 - * @lastModified 2026-01-07 + * @lastModified 2026-01-09 */ -import { - WebSocketGateway, - WebSocketServer, - SubscribeMessage, - OnGatewayConnection, - OnGatewayDisconnect, - MessageBody, - ConnectedSocket, -} from '@nestjs/websockets'; -import { Server, Socket } from 'socket.io'; -import { Injectable, Logger } from '@nestjs/common'; +import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; +import * as WebSocket from 'ws'; import { ZulipService } from './zulip.service'; import { SessionManagerService } from './services/session_manager.service'; +/** + * 扩展的WebSocket接口,包含客户端数据 + */ +interface ExtendedWebSocket extends WebSocket { + id: string; + data?: ClientData; + isAlive?: boolean; +} + /** * 登录消息接口 - 按guide.md格式 */ @@ -130,15 +130,14 @@ interface ClientData { * - 实时消息推送和广播 */ @Injectable() -@WebSocketGateway({ - cors: { origin: '*' }, - namespace: '/game', -}) -export class ZulipWebSocketGateway implements OnGatewayConnection, OnGatewayDisconnect { - @WebSocketServer() - server: Server; - +export class ZulipWebSocketGateway implements OnModuleInit, OnModuleDestroy { + private server: WebSocket.Server; private readonly logger = new Logger(ZulipWebSocketGateway.name); + private clients = new Map(); + private mapRooms = new Map>(); // mapId -> Set + + /** 心跳间隔(毫秒) */ + private static readonly HEARTBEAT_INTERVAL = 30000; constructor( private readonly zulipService: ZulipService, @@ -146,12 +145,43 @@ export class ZulipWebSocketGateway implements OnGatewayConnection, OnGatewayDisc ) { this.logger.log('ZulipWebSocketGateway初始化完成', { gateway: 'ZulipWebSocketGateway', - namespace: '/game', + path: '/game', timestamp: new Date().toISOString(), }); + } + + /** + * 模块初始化 - 启动WebSocket服务器 + */ + async onModuleInit() { + const port = process.env.WEBSOCKET_PORT ? parseInt(process.env.WEBSOCKET_PORT) : 3001; + + this.server = new WebSocket.Server({ + port: port, + path: '/game' + }); + + this.server.on('connection', (client: ExtendedWebSocket) => { + this.handleConnection(client); + }); + + this.logger.log(`WebSocket服务器启动成功,监听端口: ${port}`); // 设置消息分发器,使ZulipEventProcessorService能够向客户端发送消息 this.setupMessageDistributor(); + + // 设置心跳检测 + this.setupHeartbeat(); + } + + /** + * 模块销毁 - 关闭WebSocket服务器 + */ + async onModuleDestroy() { + if (this.server) { + this.server.close(); + this.logger.log('WebSocket服务器已关闭'); + } } /** @@ -167,11 +197,16 @@ export class ZulipWebSocketGateway implements OnGatewayConnection, OnGatewayDisc * * @param client WebSocket客户端连接对象 */ - async handleConnection(client: Socket): Promise { + async handleConnection(client: ExtendedWebSocket): Promise { + // 生成唯一ID + client.id = this.generateClientId(); + client.isAlive = true; + + this.clients.set(client.id, client); + this.logger.log('新的WebSocket连接建立', { operation: 'handleConnection', socketId: client.id, - remoteAddress: client.handshake.address, timestamp: new Date().toISOString(), }); @@ -184,6 +219,24 @@ export class ZulipWebSocketGateway implements OnGatewayConnection, OnGatewayDisc connectedAt: new Date(), }; client.data = clientData; + + // 设置消息处理 + client.on('message', (data) => { + try { + const message = JSON.parse(data.toString()); + this.handleMessage(client, message); + } catch (error) { + this.logger.error('解析消息失败', { + socketId: client.id, + error: error instanceof Error ? error.message : String(error), + }); + } + }); + + // 设置pong响应 + client.on('pong', () => { + client.isAlive = true; + }); } /** @@ -200,8 +253,8 @@ export class ZulipWebSocketGateway implements OnGatewayConnection, OnGatewayDisc * * @param client WebSocket客户端连接对象 */ - async handleDisconnect(client: Socket): Promise { - const clientData = client.data as ClientData | undefined; + async handleDisconnect(client: ExtendedWebSocket): Promise { + const clientData = client.data; const connectionDuration = clientData?.connectedAt ? Date.now() - clientData.connectedAt.getTime() : 0; @@ -235,6 +288,45 @@ export class ZulipWebSocketGateway implements OnGatewayConnection, OnGatewayDisc }, err.stack); } } + + // 从客户端列表中移除 + this.clients.delete(client.id); + + // 从地图房间中移除 + for (const [mapId, room] of this.mapRooms.entries()) { + if (room.has(client.id)) { + room.delete(client.id); + if (room.size === 0) { + this.mapRooms.delete(mapId); + } + } + } + } + + /** + * 处理消息路由 + */ + private async handleMessage(client: ExtendedWebSocket, message: any) { + // 直接处理消息类型,不需要event包装 + const messageType = message.type || message.t; + + switch (messageType) { + case 'login': + await this.handleLogin(client, message); + break; + case 'chat': + await this.handleChat(client, message); + break; + case 'position': + await this.handlePositionUpdate(client, message); + break; + default: + this.logger.warn('未知消息类型', { + socketId: client.id, + messageType, + message, + }); + } } /** @@ -252,11 +344,7 @@ export class ZulipWebSocketGateway implements OnGatewayConnection, OnGatewayDisc * @param client WebSocket客户端连接对象 * @param data 登录消息数据 */ - @SubscribeMessage('login') - async handleLogin( - @ConnectedSocket() client: Socket, - @MessageBody() data: LoginMessage, - ): Promise { + private async handleLogin(client: ExtendedWebSocket, data: LoginMessage): Promise { this.logger.log('收到登录请求', { operation: 'handleLogin', socketId: client.id, @@ -273,7 +361,7 @@ export class ZulipWebSocketGateway implements OnGatewayConnection, OnGatewayDisc data, }); - client.emit('login_error', { + this.sendMessage(client, 'login_error', { t: 'login_error', message: '登录请求格式无效', }); @@ -281,7 +369,7 @@ export class ZulipWebSocketGateway implements OnGatewayConnection, OnGatewayDisc } // 检查是否已经登录 - const clientData = client.data as ClientData; + const clientData = client.data; if (clientData?.authenticated) { this.logger.warn('用户已登录,拒绝重复登录', { operation: 'handleLogin', @@ -289,7 +377,7 @@ export class ZulipWebSocketGateway implements OnGatewayConnection, OnGatewayDisc userId: clientData.userId, }); - client.emit('login_error', { + this.sendMessage(client, 'login_error', { t: 'login_error', message: '您已经登录', }); @@ -322,7 +410,7 @@ export class ZulipWebSocketGateway implements OnGatewayConnection, OnGatewayDisc currentMap: result.currentMap || 'novice_village', }; - client.emit('login_success', loginSuccess); + this.sendMessage(client, 'login_success', loginSuccess); this.logger.log('登录处理成功', { operation: 'handleLogin', @@ -335,7 +423,7 @@ export class ZulipWebSocketGateway implements OnGatewayConnection, OnGatewayDisc }); } else { // 发送登录失败消息 - client.emit('login_error', { + this.sendMessage(client, 'login_error', { t: 'login_error', message: result.error || '登录失败', }); @@ -357,7 +445,7 @@ export class ZulipWebSocketGateway implements OnGatewayConnection, OnGatewayDisc timestamp: new Date().toISOString(), }, err.stack); - client.emit('login_error', { + this.sendMessage(client, 'login_error', { t: 'login_error', message: '系统错误,请稍后重试', }); @@ -379,12 +467,8 @@ export class ZulipWebSocketGateway implements OnGatewayConnection, OnGatewayDisc * @param client WebSocket客户端连接对象 * @param data 聊天消息数据 */ - @SubscribeMessage('chat') - async handleChat( - @ConnectedSocket() client: Socket, - @MessageBody() data: ChatMessage, - ): Promise { - const clientData = client.data as ClientData | undefined; + private async handleChat(client: ExtendedWebSocket, data: ChatMessage): Promise { + const clientData = client.data; console.log('🔍 DEBUG: handleChat 被调用了!', { socketId: client.id, @@ -410,7 +494,7 @@ export class ZulipWebSocketGateway implements OnGatewayConnection, OnGatewayDisc socketId: client.id, }); - client.emit('chat_error', { + this.sendMessage(client, 'chat_error', { t: 'chat_error', message: '请先登录', }); @@ -425,7 +509,7 @@ export class ZulipWebSocketGateway implements OnGatewayConnection, OnGatewayDisc data, }); - client.emit('chat_error', { + this.sendMessage(client, 'chat_error', { t: 'chat_error', message: '消息格式无效', }); @@ -439,7 +523,7 @@ export class ZulipWebSocketGateway implements OnGatewayConnection, OnGatewayDisc socketId: client.id, }); - client.emit('chat_error', { + this.sendMessage(client, 'chat_error', { t: 'chat_error', message: '消息内容不能为空', }); @@ -455,7 +539,7 @@ export class ZulipWebSocketGateway implements OnGatewayConnection, OnGatewayDisc if (result.success) { // 发送成功确认 - client.emit('chat_sent', { + this.sendMessage(client, 'chat_sent', { t: 'chat_sent', messageId: result.messageId, message: '消息发送成功', @@ -470,7 +554,7 @@ export class ZulipWebSocketGateway implements OnGatewayConnection, OnGatewayDisc }); } else { // 发送失败通知 - client.emit('chat_error', { + this.sendMessage(client, 'chat_error', { t: 'chat_error', message: result.error || '消息发送失败', }); @@ -493,7 +577,7 @@ export class ZulipWebSocketGateway implements OnGatewayConnection, OnGatewayDisc timestamp: new Date().toISOString(), }, err.stack); - client.emit('chat_error', { + this.sendMessage(client, 'chat_error', { t: 'chat_error', message: '系统错误,请稍后重试', }); @@ -509,12 +593,8 @@ export class ZulipWebSocketGateway implements OnGatewayConnection, OnGatewayDisc * @param client WebSocket客户端连接对象 * @param data 位置更新数据 */ - @SubscribeMessage('position_update') - async handlePositionUpdate( - @ConnectedSocket() client: Socket, - @MessageBody() data: PositionMessage, - ): Promise { - const clientData = client.data as ClientData | undefined; + private async handlePositionUpdate(client: ExtendedWebSocket, data: PositionMessage): Promise { + const clientData = client.data; this.logger.debug('收到位置更新', { operation: 'handlePositionUpdate', @@ -602,7 +682,10 @@ export class ZulipWebSocketGateway implements OnGatewayConnection, OnGatewayDisc bubble, }; - this.server.to(socketId).emit('chat_render', message); + const client = this.clients.get(socketId); + if (client) { + this.sendMessage(client, 'chat_render', message); + } this.logger.debug('发送聊天渲染消息', { operation: 'sendChatRender', @@ -646,7 +729,10 @@ export class ZulipWebSocketGateway implements OnGatewayConnection, OnGatewayDisc // 向每个Socket发送消息 for (const socketId of socketIds) { - this.server.to(socketId).emit(event, data); + const client = this.clients.get(socketId); + if (client) { + this.sendMessage(client, event, data); + } } this.logger.log('地图广播完成', { @@ -678,7 +764,10 @@ export class ZulipWebSocketGateway implements OnGatewayConnection, OnGatewayDisc * @param data 消息数据 */ sendToPlayer(socketId: string, event: string, data: any): void { - this.server.to(socketId).emit(event, data); + const client = this.clients.get(socketId); + if (client) { + this.sendMessage(client, event, data); + } this.logger.debug('发送消息给玩家', { operation: 'sendToPlayer', @@ -697,16 +786,7 @@ export class ZulipWebSocketGateway implements OnGatewayConnection, OnGatewayDisc * @returns Promise 连接数 */ async getConnectionCount(): Promise { - try { - const sockets = await this.server.fetchSockets(); - return sockets.length; - } catch (error) { - this.logger.error('获取连接数失败', { - operation: 'getConnectionCount', - error: (error as Error).message, - }); - return 0; - } + return this.clients.size; } /** @@ -718,19 +798,13 @@ export class ZulipWebSocketGateway implements OnGatewayConnection, OnGatewayDisc * @returns Promise 已认证连接数 */ async getAuthenticatedConnectionCount(): Promise { - try { - const sockets = await this.server.fetchSockets(); - return sockets.filter(socket => { - const data = socket.data as ClientData | undefined; - return data?.authenticated === true; - }).length; - } catch (error) { - this.logger.error('获取已认证连接数失败', { - operation: 'getAuthenticatedConnectionCount', - error: (error as Error).message, - }); - return 0; + let count = 0; + for (const client of this.clients.values()) { + if (client.data?.authenticated === true) { + count++; + } } + return count; } /** @@ -743,33 +817,63 @@ export class ZulipWebSocketGateway implements OnGatewayConnection, OnGatewayDisc * @param reason 断开原因 */ async disconnectClient(socketId: string, reason?: string): Promise { - try { - const sockets = await this.server.fetchSockets(); - const targetSocket = sockets.find(s => s.id === socketId); + const client = this.clients.get(socketId); + + if (client) { + client.close(); - if (targetSocket) { - targetSocket.disconnect(true); - - this.logger.log('客户端连接已断开', { - operation: 'disconnectClient', - socketId, - reason, - }); - } else { - this.logger.warn('未找到目标客户端', { - operation: 'disconnectClient', - socketId, - }); - } - } catch (error) { - this.logger.error('断开客户端连接失败', { + this.logger.log('客户端连接已断开', { + operation: 'disconnectClient', + socketId, + reason, + }); + } else { + this.logger.warn('未找到目标客户端', { operation: 'disconnectClient', socketId, - error: (error as Error).message, }); } } + /** + * 发送消息给客户端 + */ + private sendMessage(client: ExtendedWebSocket, event: string, data: any) { + if (client.readyState === WebSocket.OPEN) { + // 直接发送数据,不包装在event中 + client.send(JSON.stringify(data)); + } + } + + /** + * 生成客户端ID + */ + private generateClientId(): string { + return `ws_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`; + } + + /** + * 设置心跳检测 + */ + private setupHeartbeat() { + setInterval(() => { + this.clients.forEach((client) => { + if (!client.isAlive) { + this.logger.warn('客户端心跳超时,断开连接', { + socketId: client.id, + }); + client.close(); + return; + } + + client.isAlive = false; + if (client.readyState === WebSocket.OPEN) { + client.ping(); + } + }); + }, ZulipWebSocketGateway.HEARTBEAT_INTERVAL); + } + /** * 设置消息分发器 *