feat: 移除Socket.IO依赖,实现原生WebSocket支持

- 移除所有Socket.IO相关装饰器和依赖
- 创建CleanWebSocketGateway使用原生WebSocket Server
- 实现完整的多客户端实时同步功能
- 支持地图房间分组管理
- 支持本地和全局消息广播
- 支持位置更新实时同步
- 更新API文档和连接信息
- 完成多客户端同步测试验证

技术改进:
- 使用原生ws库替代Socket.IO,减少依赖
- 实现更高效的消息路由和广播机制
- 添加地图房间自动管理功能
- 提供实时连接统计和监控接口

测试验证:
-  多客户端连接和认证
-  聊天消息实时同步
-  位置更新广播
-  地图房间分组
-  系统状态监控
This commit is contained in:
moyin
2026-01-09 17:00:23 +08:00
parent ece4e6f5a2
commit e9dc887c59
4 changed files with 563 additions and 112 deletions

View File

@@ -43,6 +43,7 @@ import {
import { JwtAuthGuard } from '../auth/jwt_auth.guard';
import { ZulipService } from './zulip.service';
import { ZulipWebSocketGateway } from './zulip_websocket.gateway';
import { CleanWebSocketGateway } from './clean_websocket.gateway';
import {
SendChatMessageDto,
ChatMessageResponseDto,
@@ -58,7 +59,7 @@ export class ChatController {
constructor(
private readonly zulipService: ZulipService,
private readonly websocketGateway: ZulipWebSocketGateway,
private readonly websocketGateway: CleanWebSocketGateway,
) {}
/**
@@ -255,6 +256,7 @@ export class ChatController {
// 获取 WebSocket 连接状态
const totalConnections = await this.websocketGateway.getConnectionCount();
const authenticatedConnections = await this.websocketGateway.getAuthenticatedConnectionCount();
const mapPlayerCounts = await this.websocketGateway.getMapPlayerCounts();
// 获取内存使用情况
const memoryUsage = process.memoryUsage();
@@ -267,11 +269,7 @@ export class ChatController {
totalConnections,
authenticatedConnections,
activeSessions: authenticatedConnections, // 简化处理
mapPlayerCounts: {
'whale_port': Math.floor(authenticatedConnections * 0.4),
'pumpkin_valley': Math.floor(authenticatedConnections * 0.3),
'novice_village': Math.floor(authenticatedConnections * 0.3),
},
mapPlayerCounts: mapPlayerCounts,
},
zulip: {
serverConnected: true, // 需要实际检查
@@ -349,19 +347,21 @@ export class ChatController {
})
async getWebSocketInfo() {
return {
websocketUrl: 'ws://localhost:3000/game',
namespace: '/game',
websocketUrl: 'ws://localhost:3001',
namespace: '/',
supportedEvents: [
'login', // 用户登录
'chat', // 发送聊天消息
'position_update', // 位置更新
'position', // 位置更新
],
supportedResponses: [
'connected', // 连接确认
'login_success', // 登录成功
'login_error', // 登录失败
'chat_sent', // 消息发送成功
'chat_error', // 消息发送失败
'chat_render', // 接收到聊天消息
'error', // 通用错误
],
authRequired: true,
tokenType: 'JWT',

View File

@@ -0,0 +1,346 @@
/**
* 清洁的WebSocket网关
* 使用原生WebSocket不依赖NestJS的WebSocket装饰器
*/
import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import * as WebSocket from 'ws';
import { ZulipService } from './zulip.service';
import { SessionManagerService } from './services/session_manager.service';
interface ExtendedWebSocket extends WebSocket {
id: string;
isAlive?: boolean;
authenticated?: boolean;
userId?: string;
username?: string;
sessionId?: string;
currentMap?: string;
}
@Injectable()
export class CleanWebSocketGateway implements OnModuleInit, OnModuleDestroy {
private server: WebSocket.Server;
private readonly logger = new Logger(CleanWebSocketGateway.name);
private clients = new Map<string, ExtendedWebSocket>();
private mapRooms = new Map<string, Set<string>>(); // mapId -> Set<clientId>
constructor(
private readonly zulipService: ZulipService,
private readonly sessionManager: SessionManagerService,
) {}
async onModuleInit() {
const port = 3001;
this.server = new WebSocket.Server({ port });
this.server.on('connection', (ws: ExtendedWebSocket) => {
ws.id = this.generateClientId();
ws.isAlive = true;
ws.authenticated = false;
this.clients.set(ws.id, ws);
this.logger.log(`新的WebSocket连接: ${ws.id}`);
ws.on('message', (data) => {
try {
const message = JSON.parse(data.toString());
this.handleMessage(ws, message);
} catch (error) {
this.logger.error('解析消息失败', error);
this.sendError(ws, '消息格式错误');
}
});
ws.on('close', () => {
this.logger.log(`WebSocket连接关闭: ${ws.id}`);
this.cleanupClient(ws);
});
ws.on('error', (error) => {
this.logger.error(`WebSocket错误: ${ws.id}`, error);
});
// 发送连接确认
this.sendMessage(ws, {
type: 'connected',
message: '连接成功',
socketId: ws.id
});
});
this.logger.log(`WebSocket服务器启动成功端口: ${port}`);
}
async onModuleDestroy() {
if (this.server) {
this.server.close();
this.logger.log('WebSocket服务器已关闭');
}
}
private async handleMessage(ws: ExtendedWebSocket, message: any) {
this.logger.log(`收到消息: ${ws.id}`, message);
const messageType = message.type || message.t;
this.logger.log(`消息类型: ${messageType}`, { type: message.type, t: message.t });
switch (messageType) {
case 'login':
await this.handleLogin(ws, message);
break;
case 'chat':
await this.handleChat(ws, message);
break;
case 'position':
await this.handlePositionUpdate(ws, message);
break;
default:
this.logger.warn(`未知消息类型: ${messageType}`, message);
this.sendError(ws, `未知消息类型: ${messageType}`);
}
}
private async handleLogin(ws: ExtendedWebSocket, message: any) {
try {
if (!message.token) {
this.sendError(ws, 'Token不能为空');
return;
}
// 调用ZulipService进行登录
const result = await this.zulipService.handlePlayerLogin({
socketId: ws.id,
token: message.token
});
if (result.success) {
ws.authenticated = true;
ws.userId = result.userId;
ws.username = result.username;
ws.sessionId = result.sessionId;
ws.currentMap = 'whale_port'; // 默认地图
// 加入默认地图房间
this.joinMapRoom(ws.id, ws.currentMap);
this.sendMessage(ws, {
t: 'login_success',
sessionId: result.sessionId,
userId: result.userId,
username: result.username,
currentMap: ws.currentMap
});
this.logger.log(`用户登录成功: ${result.username} (${ws.id}) 进入地图: ${ws.currentMap}`);
} else {
this.sendMessage(ws, {
t: 'login_error',
message: result.error || '登录失败'
});
}
} catch (error) {
this.logger.error('登录处理失败', error);
this.sendError(ws, '登录处理失败');
}
}
private async handleChat(ws: ExtendedWebSocket, message: any) {
try {
if (!ws.authenticated) {
this.sendError(ws, '请先登录');
return;
}
if (!message.content) {
this.sendError(ws, '消息内容不能为空');
return;
}
// 调用ZulipService发送消息
const result = await this.zulipService.sendChatMessage({
socketId: ws.id,
content: message.content,
scope: message.scope || 'local'
});
if (result.success) {
this.sendMessage(ws, {
t: 'chat_sent',
messageId: result.messageId,
message: '消息发送成功'
});
// 广播消息给其他用户根据scope决定范围
if (message.scope === 'global') {
// 全局消息:广播给所有已认证用户
this.broadcastMessage({
t: 'chat_render',
from: ws.username,
txt: message.content,
bubble: true,
scope: 'global'
}, ws.id);
} else {
// 本地消息:只广播给同一地图的用户
this.broadcastToMap(ws.currentMap, {
t: 'chat_render',
from: ws.username,
txt: message.content,
bubble: true,
scope: 'local',
mapId: ws.currentMap
}, ws.id);
}
this.logger.log(`消息发送成功: ${ws.username} -> ${message.content}`);
} else {
this.sendMessage(ws, {
t: 'chat_error',
message: result.error || '消息发送失败'
});
}
} catch (error) {
this.logger.error('聊天处理失败', error);
this.sendError(ws, '聊天处理失败');
}
}
private async handlePositionUpdate(ws: ExtendedWebSocket, message: any) {
try {
if (!ws.authenticated) {
this.sendError(ws, '请先登录');
return;
}
// 简单的位置更新处理,这里可以添加更多逻辑
this.logger.log(`位置更新: ${ws.username} -> (${message.x}, ${message.y}) 在 ${message.mapId}`);
// 如果用户切换了地图,更新房间
if (ws.currentMap !== message.mapId) {
this.leaveMapRoom(ws.id, ws.currentMap);
this.joinMapRoom(ws.id, message.mapId);
ws.currentMap = message.mapId;
this.logger.log(`用户 ${ws.username} 切换到地图: ${message.mapId}`);
}
// 广播位置更新给同一地图的其他用户
this.broadcastToMap(message.mapId, {
t: 'position_update',
userId: ws.userId,
username: ws.username,
x: message.x,
y: message.y,
mapId: message.mapId
}, ws.id);
} catch (error) {
this.logger.error('位置更新处理失败', error);
this.sendError(ws, '位置更新处理失败');
}
}
private sendMessage(ws: ExtendedWebSocket, data: any) {
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify(data));
}
}
private sendError(ws: ExtendedWebSocket, message: string) {
this.sendMessage(ws, {
type: 'error',
message: message
});
}
private broadcastMessage(data: any, excludeId?: string) {
this.clients.forEach((client, id) => {
if (id !== excludeId && client.authenticated) {
this.sendMessage(client, data);
}
});
}
private broadcastToMap(mapId: string, data: any, excludeId?: string) {
const room = this.mapRooms.get(mapId);
if (!room) return;
room.forEach(clientId => {
if (clientId !== excludeId) {
const client = this.clients.get(clientId);
if (client && client.authenticated) {
this.sendMessage(client, data);
}
}
});
}
private joinMapRoom(clientId: string, mapId: string) {
if (!this.mapRooms.has(mapId)) {
this.mapRooms.set(mapId, new Set());
}
this.mapRooms.get(mapId).add(clientId);
this.logger.log(`客户端 ${clientId} 加入地图房间: ${mapId}`);
}
private leaveMapRoom(clientId: string, mapId: string) {
const room = this.mapRooms.get(mapId);
if (room) {
room.delete(clientId);
if (room.size === 0) {
this.mapRooms.delete(mapId);
}
this.logger.log(`客户端 ${clientId} 离开地图房间: ${mapId}`);
}
}
private cleanupClient(ws: ExtendedWebSocket) {
// 从地图房间中移除
if (ws.currentMap) {
this.leaveMapRoom(ws.id, ws.currentMap);
}
// 从客户端列表中移除
this.clients.delete(ws.id);
}
private generateClientId(): string {
return `ws_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
// 公共方法供其他服务调用
public getConnectionCount(): number {
return this.clients.size;
}
public getAuthenticatedConnectionCount(): number {
return Array.from(this.clients.values()).filter(client => client.authenticated).length;
}
public getMapPlayerCounts(): Record<string, number> {
const counts: Record<string, number> = {};
this.mapRooms.forEach((clients, mapId) => {
counts[mapId] = clients.size;
});
return counts;
}
public getMapPlayers(mapId: string): string[] {
const room = this.mapRooms.get(mapId);
if (!room) return [];
const players: string[] = [];
room.forEach(clientId => {
const client = this.clients.get(clientId);
if (client && client.authenticated && client.username) {
players.push(client.username);
}
});
return players;
}
}

View File

@@ -44,6 +44,7 @@
import { Module } from '@nestjs/common';
import { ZulipWebSocketGateway } from './zulip_websocket.gateway';
import { CleanWebSocketGateway } from './clean_websocket.gateway';
import { ZulipService } from './zulip.service';
import { SessionManagerService } from './services/session_manager.service';
import { MessageFilterService } from './services/message_filter.service';
@@ -86,7 +87,7 @@ import { AuthModule } from '../auth/auth.module';
// 会话清理服务 - 定时清理过期会话
SessionCleanupService,
// WebSocket网关 - 处理游戏客户端WebSocket连接
ZulipWebSocketGateway,
CleanWebSocketGateway,
],
controllers: [
// 聊天相关的REST API控制器
@@ -108,7 +109,7 @@ import { AuthModule } from '../auth/auth.module';
// 导出会话清理服务
SessionCleanupService,
// 导出WebSocket网关
ZulipWebSocketGateway,
CleanWebSocketGateway,
],
})
export class ZulipModule {}

View File

@@ -25,28 +25,28 @@
* - 连接状态管理和权限验证
*
* 最近修改:
* - 2026-01-07: 代码规范优化 - 完善文件头注释和修改记录 (修改者: moyin)
* - 2026-01-09: 重构为原生WebSocket - 移除Socket.IO依赖使用原生WebSocket (修改者: moyin)
*
* @author angjustinl
* @version 1.0.1
* @version 2.0.0
* @since 2025-12-25
* @lastModified 2026-01-07
* @lastModified 2026-01-09
*/
import {
WebSocketGateway,
WebSocketServer,
SubscribeMessage,
OnGatewayConnection,
OnGatewayDisconnect,
MessageBody,
ConnectedSocket,
} from '@nestjs/websockets';
import { Server, Socket } from 'socket.io';
import { Injectable, Logger } from '@nestjs/common';
import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import * as WebSocket from 'ws';
import { ZulipService } from './zulip.service';
import { SessionManagerService } from './services/session_manager.service';
/**
* 扩展的WebSocket接口包含客户端数据
*/
interface ExtendedWebSocket extends WebSocket {
id: string;
data?: ClientData;
isAlive?: boolean;
}
/**
* 登录消息接口 - 按guide.md格式
*/
@@ -130,15 +130,14 @@ interface ClientData {
* - 实时消息推送和广播
*/
@Injectable()
@WebSocketGateway({
cors: { origin: '*' },
namespace: '/game',
})
export class ZulipWebSocketGateway implements OnGatewayConnection, OnGatewayDisconnect {
@WebSocketServer()
server: Server;
export class ZulipWebSocketGateway implements OnModuleInit, OnModuleDestroy {
private server: WebSocket.Server;
private readonly logger = new Logger(ZulipWebSocketGateway.name);
private clients = new Map<string, ExtendedWebSocket>();
private mapRooms = new Map<string, Set<string>>(); // mapId -> Set<clientId>
/** 心跳间隔(毫秒) */
private static readonly HEARTBEAT_INTERVAL = 30000;
constructor(
private readonly zulipService: ZulipService,
@@ -146,12 +145,43 @@ export class ZulipWebSocketGateway implements OnGatewayConnection, OnGatewayDisc
) {
this.logger.log('ZulipWebSocketGateway初始化完成', {
gateway: 'ZulipWebSocketGateway',
namespace: '/game',
path: '/game',
timestamp: new Date().toISOString(),
});
}
/**
* 模块初始化 - 启动WebSocket服务器
*/
async onModuleInit() {
const port = process.env.WEBSOCKET_PORT ? parseInt(process.env.WEBSOCKET_PORT) : 3001;
this.server = new WebSocket.Server({
port: port,
path: '/game'
});
this.server.on('connection', (client: ExtendedWebSocket) => {
this.handleConnection(client);
});
this.logger.log(`WebSocket服务器启动成功监听端口: ${port}`);
// 设置消息分发器使ZulipEventProcessorService能够向客户端发送消息
this.setupMessageDistributor();
// 设置心跳检测
this.setupHeartbeat();
}
/**
* 模块销毁 - 关闭WebSocket服务器
*/
async onModuleDestroy() {
if (this.server) {
this.server.close();
this.logger.log('WebSocket服务器已关闭');
}
}
/**
@@ -167,11 +197,16 @@ export class ZulipWebSocketGateway implements OnGatewayConnection, OnGatewayDisc
*
* @param client WebSocket客户端连接对象
*/
async handleConnection(client: Socket): Promise<void> {
async handleConnection(client: ExtendedWebSocket): Promise<void> {
// 生成唯一ID
client.id = this.generateClientId();
client.isAlive = true;
this.clients.set(client.id, client);
this.logger.log('新的WebSocket连接建立', {
operation: 'handleConnection',
socketId: client.id,
remoteAddress: client.handshake.address,
timestamp: new Date().toISOString(),
});
@@ -184,6 +219,24 @@ export class ZulipWebSocketGateway implements OnGatewayConnection, OnGatewayDisc
connectedAt: new Date(),
};
client.data = clientData;
// 设置消息处理
client.on('message', (data) => {
try {
const message = JSON.parse(data.toString());
this.handleMessage(client, message);
} catch (error) {
this.logger.error('解析消息失败', {
socketId: client.id,
error: error instanceof Error ? error.message : String(error),
});
}
});
// 设置pong响应
client.on('pong', () => {
client.isAlive = true;
});
}
/**
@@ -200,8 +253,8 @@ export class ZulipWebSocketGateway implements OnGatewayConnection, OnGatewayDisc
*
* @param client WebSocket客户端连接对象
*/
async handleDisconnect(client: Socket): Promise<void> {
const clientData = client.data as ClientData | undefined;
async handleDisconnect(client: ExtendedWebSocket): Promise<void> {
const clientData = client.data;
const connectionDuration = clientData?.connectedAt
? Date.now() - clientData.connectedAt.getTime()
: 0;
@@ -235,6 +288,45 @@ export class ZulipWebSocketGateway implements OnGatewayConnection, OnGatewayDisc
}, err.stack);
}
}
// 从客户端列表中移除
this.clients.delete(client.id);
// 从地图房间中移除
for (const [mapId, room] of this.mapRooms.entries()) {
if (room.has(client.id)) {
room.delete(client.id);
if (room.size === 0) {
this.mapRooms.delete(mapId);
}
}
}
}
/**
* 处理消息路由
*/
private async handleMessage(client: ExtendedWebSocket, message: any) {
// 直接处理消息类型不需要event包装
const messageType = message.type || message.t;
switch (messageType) {
case 'login':
await this.handleLogin(client, message);
break;
case 'chat':
await this.handleChat(client, message);
break;
case 'position':
await this.handlePositionUpdate(client, message);
break;
default:
this.logger.warn('未知消息类型', {
socketId: client.id,
messageType,
message,
});
}
}
/**
@@ -252,11 +344,7 @@ export class ZulipWebSocketGateway implements OnGatewayConnection, OnGatewayDisc
* @param client WebSocket客户端连接对象
* @param data 登录消息数据
*/
@SubscribeMessage('login')
async handleLogin(
@ConnectedSocket() client: Socket,
@MessageBody() data: LoginMessage,
): Promise<void> {
private async handleLogin(client: ExtendedWebSocket, data: LoginMessage): Promise<void> {
this.logger.log('收到登录请求', {
operation: 'handleLogin',
socketId: client.id,
@@ -273,7 +361,7 @@ export class ZulipWebSocketGateway implements OnGatewayConnection, OnGatewayDisc
data,
});
client.emit('login_error', {
this.sendMessage(client, 'login_error', {
t: 'login_error',
message: '登录请求格式无效',
});
@@ -281,7 +369,7 @@ export class ZulipWebSocketGateway implements OnGatewayConnection, OnGatewayDisc
}
// 检查是否已经登录
const clientData = client.data as ClientData;
const clientData = client.data;
if (clientData?.authenticated) {
this.logger.warn('用户已登录,拒绝重复登录', {
operation: 'handleLogin',
@@ -289,7 +377,7 @@ export class ZulipWebSocketGateway implements OnGatewayConnection, OnGatewayDisc
userId: clientData.userId,
});
client.emit('login_error', {
this.sendMessage(client, 'login_error', {
t: 'login_error',
message: '您已经登录',
});
@@ -322,7 +410,7 @@ export class ZulipWebSocketGateway implements OnGatewayConnection, OnGatewayDisc
currentMap: result.currentMap || 'novice_village',
};
client.emit('login_success', loginSuccess);
this.sendMessage(client, 'login_success', loginSuccess);
this.logger.log('登录处理成功', {
operation: 'handleLogin',
@@ -335,7 +423,7 @@ export class ZulipWebSocketGateway implements OnGatewayConnection, OnGatewayDisc
});
} else {
// 发送登录失败消息
client.emit('login_error', {
this.sendMessage(client, 'login_error', {
t: 'login_error',
message: result.error || '登录失败',
});
@@ -357,7 +445,7 @@ export class ZulipWebSocketGateway implements OnGatewayConnection, OnGatewayDisc
timestamp: new Date().toISOString(),
}, err.stack);
client.emit('login_error', {
this.sendMessage(client, 'login_error', {
t: 'login_error',
message: '系统错误,请稍后重试',
});
@@ -379,12 +467,8 @@ export class ZulipWebSocketGateway implements OnGatewayConnection, OnGatewayDisc
* @param client WebSocket客户端连接对象
* @param data 聊天消息数据
*/
@SubscribeMessage('chat')
async handleChat(
@ConnectedSocket() client: Socket,
@MessageBody() data: ChatMessage,
): Promise<void> {
const clientData = client.data as ClientData | undefined;
private async handleChat(client: ExtendedWebSocket, data: ChatMessage): Promise<void> {
const clientData = client.data;
console.log('🔍 DEBUG: handleChat 被调用了!', {
socketId: client.id,
@@ -410,7 +494,7 @@ export class ZulipWebSocketGateway implements OnGatewayConnection, OnGatewayDisc
socketId: client.id,
});
client.emit('chat_error', {
this.sendMessage(client, 'chat_error', {
t: 'chat_error',
message: '请先登录',
});
@@ -425,7 +509,7 @@ export class ZulipWebSocketGateway implements OnGatewayConnection, OnGatewayDisc
data,
});
client.emit('chat_error', {
this.sendMessage(client, 'chat_error', {
t: 'chat_error',
message: '消息格式无效',
});
@@ -439,7 +523,7 @@ export class ZulipWebSocketGateway implements OnGatewayConnection, OnGatewayDisc
socketId: client.id,
});
client.emit('chat_error', {
this.sendMessage(client, 'chat_error', {
t: 'chat_error',
message: '消息内容不能为空',
});
@@ -455,7 +539,7 @@ export class ZulipWebSocketGateway implements OnGatewayConnection, OnGatewayDisc
if (result.success) {
// 发送成功确认
client.emit('chat_sent', {
this.sendMessage(client, 'chat_sent', {
t: 'chat_sent',
messageId: result.messageId,
message: '消息发送成功',
@@ -470,7 +554,7 @@ export class ZulipWebSocketGateway implements OnGatewayConnection, OnGatewayDisc
});
} else {
// 发送失败通知
client.emit('chat_error', {
this.sendMessage(client, 'chat_error', {
t: 'chat_error',
message: result.error || '消息发送失败',
});
@@ -493,7 +577,7 @@ export class ZulipWebSocketGateway implements OnGatewayConnection, OnGatewayDisc
timestamp: new Date().toISOString(),
}, err.stack);
client.emit('chat_error', {
this.sendMessage(client, 'chat_error', {
t: 'chat_error',
message: '系统错误,请稍后重试',
});
@@ -509,12 +593,8 @@ export class ZulipWebSocketGateway implements OnGatewayConnection, OnGatewayDisc
* @param client WebSocket客户端连接对象
* @param data 位置更新数据
*/
@SubscribeMessage('position_update')
async handlePositionUpdate(
@ConnectedSocket() client: Socket,
@MessageBody() data: PositionMessage,
): Promise<void> {
const clientData = client.data as ClientData | undefined;
private async handlePositionUpdate(client: ExtendedWebSocket, data: PositionMessage): Promise<void> {
const clientData = client.data;
this.logger.debug('收到位置更新', {
operation: 'handlePositionUpdate',
@@ -602,7 +682,10 @@ export class ZulipWebSocketGateway implements OnGatewayConnection, OnGatewayDisc
bubble,
};
this.server.to(socketId).emit('chat_render', message);
const client = this.clients.get(socketId);
if (client) {
this.sendMessage(client, 'chat_render', message);
}
this.logger.debug('发送聊天渲染消息', {
operation: 'sendChatRender',
@@ -646,7 +729,10 @@ export class ZulipWebSocketGateway implements OnGatewayConnection, OnGatewayDisc
// 向每个Socket发送消息
for (const socketId of socketIds) {
this.server.to(socketId).emit(event, data);
const client = this.clients.get(socketId);
if (client) {
this.sendMessage(client, event, data);
}
}
this.logger.log('地图广播完成', {
@@ -678,7 +764,10 @@ export class ZulipWebSocketGateway implements OnGatewayConnection, OnGatewayDisc
* @param data 消息数据
*/
sendToPlayer(socketId: string, event: string, data: any): void {
this.server.to(socketId).emit(event, data);
const client = this.clients.get(socketId);
if (client) {
this.sendMessage(client, event, data);
}
this.logger.debug('发送消息给玩家', {
operation: 'sendToPlayer',
@@ -697,16 +786,7 @@ export class ZulipWebSocketGateway implements OnGatewayConnection, OnGatewayDisc
* @returns Promise<number> 连接数
*/
async getConnectionCount(): Promise<number> {
try {
const sockets = await this.server.fetchSockets();
return sockets.length;
} catch (error) {
this.logger.error('获取连接数失败', {
operation: 'getConnectionCount',
error: (error as Error).message,
});
return 0;
}
return this.clients.size;
}
/**
@@ -718,19 +798,13 @@ export class ZulipWebSocketGateway implements OnGatewayConnection, OnGatewayDisc
* @returns Promise<number> 已认证连接数
*/
async getAuthenticatedConnectionCount(): Promise<number> {
try {
const sockets = await this.server.fetchSockets();
return sockets.filter(socket => {
const data = socket.data as ClientData | undefined;
return data?.authenticated === true;
}).length;
} catch (error) {
this.logger.error('获取已认证连接数失败', {
operation: 'getAuthenticatedConnectionCount',
error: (error as Error).message,
});
return 0;
let count = 0;
for (const client of this.clients.values()) {
if (client.data?.authenticated === true) {
count++;
}
}
return count;
}
/**
@@ -743,33 +817,63 @@ export class ZulipWebSocketGateway implements OnGatewayConnection, OnGatewayDisc
* @param reason 断开原因
*/
async disconnectClient(socketId: string, reason?: string): Promise<void> {
try {
const sockets = await this.server.fetchSockets();
const targetSocket = sockets.find(s => s.id === socketId);
const client = this.clients.get(socketId);
if (client) {
client.close();
if (targetSocket) {
targetSocket.disconnect(true);
this.logger.log('客户端连接已断开', {
operation: 'disconnectClient',
socketId,
reason,
});
} else {
this.logger.warn('未找到目标客户端', {
operation: 'disconnectClient',
socketId,
});
}
} catch (error) {
this.logger.error('断开客户端连接失败', {
this.logger.log('客户端连接已断开', {
operation: 'disconnectClient',
socketId,
reason,
});
} else {
this.logger.warn('未找到目标客户端', {
operation: 'disconnectClient',
socketId,
error: (error as Error).message,
});
}
}
/**
* 发送消息给客户端
*/
private sendMessage(client: ExtendedWebSocket, event: string, data: any) {
if (client.readyState === WebSocket.OPEN) {
// 直接发送数据不包装在event中
client.send(JSON.stringify(data));
}
}
/**
* 生成客户端ID
*/
private generateClientId(): string {
return `ws_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
/**
* 设置心跳检测
*/
private setupHeartbeat() {
setInterval(() => {
this.clients.forEach((client) => {
if (!client.isAlive) {
this.logger.warn('客户端心跳超时,断开连接', {
socketId: client.id,
});
client.close();
return;
}
client.isAlive = false;
if (client.readyState === WebSocket.OPEN) {
client.ping();
}
});
}, ZulipWebSocketGateway.HEARTBEAT_INTERVAL);
}
/**
* 设置消息分发器
*