Files
whale-town-end/src/business/zulip/zulip_websocket.gateway.ts
moyin e9dc887c59 feat: 移除Socket.IO依赖,实现原生WebSocket支持
- 移除所有Socket.IO相关装饰器和依赖
- 创建CleanWebSocketGateway使用原生WebSocket Server
- 实现完整的多客户端实时同步功能
- 支持地图房间分组管理
- 支持本地和全局消息广播
- 支持位置更新实时同步
- 更新API文档和连接信息
- 完成多客户端同步测试验证

技术改进:
- 使用原生ws库替代Socket.IO,减少依赖
- 实现更高效的消息路由和广播机制
- 添加地图房间自动管理功能
- 提供实时连接统计和监控接口

测试验证:
-  多客户端连接和认证
-  聊天消息实时同步
-  位置更新广播
-  地图房间分组
-  系统状态监控
2026-01-09 17:00:23 +08:00

914 lines
24 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
/**
* Zulip WebSocket网关
*
* 功能描述:
* - 处理所有Godot游戏客户端的WebSocket连接
* - 实现游戏协议到Zulip协议的转换
* - 提供统一的消息路由和权限控制
*
* 职责分离:
* - 连接管理处理WebSocket连接的建立、维护和断开
* - 协议转换:在游戏客户端协议和内部业务协议之间转换
* - 权限控制:验证用户身份和消息发送权限
* - 消息路由:将消息分发到正确的业务处理服务
*
* 主要方法:
* - handleConnection(): 处理客户端连接建立
* - handleDisconnect(): 处理客户端连接断开
* - handleLogin(): 处理登录消息
* - handleChat(): 处理聊天消息
* - handlePositionUpdate(): 处理位置更新
*
* 使用场景:
* - 游戏客户端WebSocket通信的统一入口
* - 消息协议转换和路由分发
* - 连接状态管理和权限验证
*
* 最近修改:
* - 2026-01-09: 重构为原生WebSocket - 移除Socket.IO依赖使用原生WebSocket (修改者: moyin)
*
* @author angjustinl
* @version 2.0.0
* @since 2025-12-25
* @lastModified 2026-01-09
*/
import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import * as WebSocket from 'ws';
import { ZulipService } from './zulip.service';
import { SessionManagerService } from './services/session_manager.service';
/**
* 扩展的WebSocket接口包含客户端数据
*/
interface ExtendedWebSocket extends WebSocket {
id: string;
data?: ClientData;
isAlive?: boolean;
}
/**
* 登录消息接口 - 按guide.md格式
*/
interface LoginMessage {
type: 'login';
token: string;
}
/**
* 聊天消息接口 - 按guide.md格式
*/
interface ChatMessage {
t: 'chat';
content: string;
scope: string; // "local" 或 topic名称
}
/**
* 位置更新消息接口
*/
interface PositionMessage {
t: 'position';
x: number;
y: number;
mapId: string;
}
/**
* 聊天渲染消息接口 - 发送给客户端
*/
interface ChatRenderMessage {
t: 'chat_render';
from: string;
txt: string;
bubble: boolean;
}
/**
* 登录成功消息接口 - 发送给客户端
*/
interface LoginSuccessMessage {
t: 'login_success';
sessionId: string;
userId: string;
username: string;
currentMap: string;
}
/**
* 客户端数据接口
*/
interface ClientData {
authenticated: boolean;
userId: string | null;
sessionId: string | null;
username: string | null;
connectedAt: Date;
}
/**
* Zulip WebSocket网关类
*
* 职责:
* - 处理所有Godot游戏客户端的WebSocket连接
* - 实现游戏协议到Zulip协议的转换
* - 提供统一的消息路由和权限控制
* - 管理客户端连接状态和会话
*
* 主要方法:
* - handleConnection(): 处理客户端连接建立
* - handleDisconnect(): 处理客户端连接断开
* - handleLogin(): 处理登录消息
* - handleChat(): 处理聊天消息
* - handlePositionUpdate(): 处理位置更新
* - sendChatRender(): 向客户端发送聊天渲染消息
*
* 使用场景:
* - 游戏客户端WebSocket通信的统一入口
* - 消息协议转换和路由分发
* - 连接状态管理和权限验证
* - 实时消息推送和广播
*/
@Injectable()
export class ZulipWebSocketGateway implements OnModuleInit, OnModuleDestroy {
private server: WebSocket.Server;
private readonly logger = new Logger(ZulipWebSocketGateway.name);
private clients = new Map<string, ExtendedWebSocket>();
private mapRooms = new Map<string, Set<string>>(); // mapId -> Set<clientId>
/** 心跳间隔(毫秒) */
private static readonly HEARTBEAT_INTERVAL = 30000;
constructor(
private readonly zulipService: ZulipService,
private readonly sessionManager: SessionManagerService,
) {
this.logger.log('ZulipWebSocketGateway初始化完成', {
gateway: 'ZulipWebSocketGateway',
path: '/game',
timestamp: new Date().toISOString(),
});
}
/**
* 模块初始化 - 启动WebSocket服务器
*/
async onModuleInit() {
const port = process.env.WEBSOCKET_PORT ? parseInt(process.env.WEBSOCKET_PORT) : 3001;
this.server = new WebSocket.Server({
port: port,
path: '/game'
});
this.server.on('connection', (client: ExtendedWebSocket) => {
this.handleConnection(client);
});
this.logger.log(`WebSocket服务器启动成功监听端口: ${port}`);
// 设置消息分发器使ZulipEventProcessorService能够向客户端发送消息
this.setupMessageDistributor();
// 设置心跳检测
this.setupHeartbeat();
}
/**
* 模块销毁 - 关闭WebSocket服务器
*/
async onModuleDestroy() {
if (this.server) {
this.server.close();
this.logger.log('WebSocket服务器已关闭');
}
}
/**
* 处理客户端连接建立
*
* 功能描述:
* 当游戏客户端建立WebSocket连接时调用记录连接信息
*
* 业务逻辑:
* 1. 记录新连接的建立
* 2. 为连接分配唯一标识
* 3. 初始化连接状态
*
* @param client WebSocket客户端连接对象
*/
async handleConnection(client: ExtendedWebSocket): Promise<void> {
// 生成唯一ID
client.id = this.generateClientId();
client.isAlive = true;
this.clients.set(client.id, client);
this.logger.log('新的WebSocket连接建立', {
operation: 'handleConnection',
socketId: client.id,
timestamp: new Date().toISOString(),
});
// 设置连接的初始状态
const clientData: ClientData = {
authenticated: false,
userId: null,
sessionId: null,
username: null,
connectedAt: new Date(),
};
client.data = clientData;
// 设置消息处理
client.on('message', (data) => {
try {
const message = JSON.parse(data.toString());
this.handleMessage(client, message);
} catch (error) {
this.logger.error('解析消息失败', {
socketId: client.id,
error: error instanceof Error ? error.message : String(error),
});
}
});
// 设置pong响应
client.on('pong', () => {
client.isAlive = true;
});
}
/**
* 处理客户端连接断开
*
* 功能描述:
* 当游戏客户端断开WebSocket连接时调用清理相关资源
*
* 业务逻辑:
* 1. 记录连接断开信息
* 2. 清理会话数据
* 3. 注销Zulip事件队列
* 4. 释放相关资源
*
* @param client WebSocket客户端连接对象
*/
async handleDisconnect(client: ExtendedWebSocket): Promise<void> {
const clientData = client.data;
const connectionDuration = clientData?.connectedAt
? Date.now() - clientData.connectedAt.getTime()
: 0;
this.logger.log('WebSocket连接断开', {
operation: 'handleDisconnect',
socketId: client.id,
userId: clientData?.userId,
authenticated: clientData?.authenticated,
connectionDuration,
timestamp: new Date().toISOString(),
});
// 如果用户已认证,处理登出逻辑
if (clientData?.authenticated) {
try {
await this.zulipService.handlePlayerLogout(client.id);
this.logger.log('玩家登出处理完成', {
operation: 'handleDisconnect',
socketId: client.id,
userId: clientData.userId,
});
} catch (error) {
const err = error as Error;
this.logger.error('处理玩家登出时发生错误', {
operation: 'handleDisconnect',
socketId: client.id,
error: err.message,
timestamp: new Date().toISOString(),
}, err.stack);
}
}
// 从客户端列表中移除
this.clients.delete(client.id);
// 从地图房间中移除
for (const [mapId, room] of this.mapRooms.entries()) {
if (room.has(client.id)) {
room.delete(client.id);
if (room.size === 0) {
this.mapRooms.delete(mapId);
}
}
}
}
/**
* 处理消息路由
*/
private async handleMessage(client: ExtendedWebSocket, message: any) {
// 直接处理消息类型不需要event包装
const messageType = message.type || message.t;
switch (messageType) {
case 'login':
await this.handleLogin(client, message);
break;
case 'chat':
await this.handleChat(client, message);
break;
case 'position':
await this.handlePositionUpdate(client, message);
break;
default:
this.logger.warn('未知消息类型', {
socketId: client.id,
messageType,
message,
});
}
}
/**
* 处理登录消息 - 按guide.md格式
*
* 功能描述:
* 处理游戏客户端发送的登录请求验证Token并建立会话
*
* 业务逻辑:
* 1. 验证消息格式
* 2. 调用ZulipService处理登录逻辑
* 3. 更新连接状态
* 4. 返回登录结果
*
* @param client WebSocket客户端连接对象
* @param data 登录消息数据
*/
private async handleLogin(client: ExtendedWebSocket, data: LoginMessage): Promise<void> {
this.logger.log('收到登录请求', {
operation: 'handleLogin',
socketId: client.id,
messageType: data?.type,
timestamp: new Date().toISOString(),
});
try {
// 验证消息格式
if (!data || data.type !== 'login' || !data.token) {
this.logger.warn('登录请求格式无效', {
operation: 'handleLogin',
socketId: client.id,
data,
});
this.sendMessage(client, 'login_error', {
t: 'login_error',
message: '登录请求格式无效',
});
return;
}
// 检查是否已经登录
const clientData = client.data;
if (clientData?.authenticated) {
this.logger.warn('用户已登录,拒绝重复登录', {
operation: 'handleLogin',
socketId: client.id,
userId: clientData.userId,
});
this.sendMessage(client, 'login_error', {
t: 'login_error',
message: '您已经登录',
});
return;
}
// 调用ZulipService处理登录
const result = await this.zulipService.handlePlayerLogin({
token: data.token,
socketId: client.id,
});
if (result.success && result.sessionId) {
// 更新连接状态
const updatedClientData: ClientData = {
authenticated: true,
sessionId: result.sessionId,
userId: result.userId || null,
username: result.username || null,
connectedAt: clientData?.connectedAt || new Date(),
};
client.data = updatedClientData;
// 发送登录成功消息
const loginSuccess: LoginSuccessMessage = {
t: 'login_success',
sessionId: result.sessionId,
userId: result.userId || '',
username: result.username || '',
currentMap: result.currentMap || 'novice_village',
};
this.sendMessage(client, 'login_success', loginSuccess);
this.logger.log('登录处理成功', {
operation: 'handleLogin',
socketId: client.id,
sessionId: result.sessionId,
userId: result.userId,
username: result.username,
currentMap: result.currentMap,
timestamp: new Date().toISOString(),
});
} else {
// 发送登录失败消息
this.sendMessage(client, 'login_error', {
t: 'login_error',
message: result.error || '登录失败',
});
this.logger.warn('登录处理失败', {
operation: 'handleLogin',
socketId: client.id,
error: result.error,
timestamp: new Date().toISOString(),
});
}
} catch (error) {
const err = error as Error;
this.logger.error('登录处理异常', {
operation: 'handleLogin',
socketId: client.id,
error: err.message,
timestamp: new Date().toISOString(),
}, err.stack);
this.sendMessage(client, 'login_error', {
t: 'login_error',
message: '系统错误,请稍后重试',
});
}
}
/**
* 处理聊天消息 - 按guide.md格式
*
* 功能描述:
* 处理游戏客户端发送的聊天消息转发到Zulip对应的Stream/Topic
*
* 业务逻辑:
* 1. 验证用户认证状态
* 2. 验证消息格式
* 3. 调用ZulipService处理消息发送
* 4. 返回发送结果确认
*
* @param client WebSocket客户端连接对象
* @param data 聊天消息数据
*/
private async handleChat(client: ExtendedWebSocket, data: ChatMessage): Promise<void> {
const clientData = client.data;
console.log('🔍 DEBUG: handleChat 被调用了!', {
socketId: client.id,
data: data,
clientData: clientData,
timestamp: new Date().toISOString(),
});
this.logger.log('收到聊天消息', {
operation: 'handleChat',
socketId: client.id,
messageType: data?.t,
contentLength: data?.content?.length,
scope: data?.scope,
timestamp: new Date().toISOString(),
});
try {
// 验证用户认证状态
if (!clientData?.authenticated) {
this.logger.warn('未认证用户尝试发送聊天消息', {
operation: 'handleChat',
socketId: client.id,
});
this.sendMessage(client, 'chat_error', {
t: 'chat_error',
message: '请先登录',
});
return;
}
// 验证消息格式
if (!data || data.t !== 'chat' || !data.content || !data.scope) {
this.logger.warn('聊天消息格式无效', {
operation: 'handleChat',
socketId: client.id,
data,
});
this.sendMessage(client, 'chat_error', {
t: 'chat_error',
message: '消息格式无效',
});
return;
}
// 验证消息内容不为空
if (!data.content.trim()) {
this.logger.warn('聊天消息内容为空', {
operation: 'handleChat',
socketId: client.id,
});
this.sendMessage(client, 'chat_error', {
t: 'chat_error',
message: '消息内容不能为空',
});
return;
}
// 调用ZulipService处理消息发送
const result = await this.zulipService.sendChatMessage({
socketId: client.id,
content: data.content,
scope: data.scope,
});
if (result.success) {
// 发送成功确认
this.sendMessage(client, 'chat_sent', {
t: 'chat_sent',
messageId: result.messageId,
message: '消息发送成功',
});
this.logger.log('聊天消息发送成功', {
operation: 'handleChat',
socketId: client.id,
userId: clientData.userId,
messageId: result.messageId,
timestamp: new Date().toISOString(),
});
} else {
// 发送失败通知
this.sendMessage(client, 'chat_error', {
t: 'chat_error',
message: result.error || '消息发送失败',
});
this.logger.warn('聊天消息发送失败', {
operation: 'handleChat',
socketId: client.id,
userId: clientData.userId,
error: result.error,
timestamp: new Date().toISOString(),
});
}
} catch (error) {
const err = error as Error;
this.logger.error('聊天消息处理异常', {
operation: 'handleChat',
socketId: client.id,
error: err.message,
timestamp: new Date().toISOString(),
}, err.stack);
this.sendMessage(client, 'chat_error', {
t: 'chat_error',
message: '系统错误,请稍后重试',
});
}
}
/**
* 处理位置更新消息
*
* 功能描述:
* 处理游戏客户端发送的位置更新,用于消息路由和上下文注入
*
* @param client WebSocket客户端连接对象
* @param data 位置更新数据
*/
private async handlePositionUpdate(client: ExtendedWebSocket, data: PositionMessage): Promise<void> {
const clientData = client.data;
this.logger.debug('收到位置更新', {
operation: 'handlePositionUpdate',
socketId: client.id,
mapId: data?.mapId,
position: data ? { x: data.x, y: data.y } : null,
timestamp: new Date().toISOString(),
});
try {
// 验证用户认证状态
if (!clientData?.authenticated) {
this.logger.debug('未认证用户发送位置更新,忽略', {
operation: 'handlePositionUpdate',
socketId: client.id,
});
return;
}
// 验证消息格式
if (!data || data.t !== 'position' || !data.mapId ||
typeof data.x !== 'number' || typeof data.y !== 'number') {
this.logger.warn('位置更新消息格式无效', {
operation: 'handlePositionUpdate',
socketId: client.id,
data,
});
return;
}
// 验证坐标有效性
if (!Number.isFinite(data.x) || !Number.isFinite(data.y)) {
this.logger.warn('位置坐标无效', {
operation: 'handlePositionUpdate',
socketId: client.id,
x: data.x,
y: data.y,
});
return;
}
// 调用ZulipService更新位置
const success = await this.zulipService.updatePlayerPosition({
socketId: client.id,
x: data.x,
y: data.y,
mapId: data.mapId,
});
if (success) {
this.logger.debug('位置更新成功', {
operation: 'handlePositionUpdate',
socketId: client.id,
mapId: data.mapId,
});
}
} catch (error) {
const err = error as Error;
this.logger.error('位置更新处理异常', {
operation: 'handlePositionUpdate',
socketId: client.id,
error: err.message,
timestamp: new Date().toISOString(),
}, err.stack);
}
}
/**
* 向指定客户端发送聊天渲染消息
*
* 功能描述:
* 向游戏客户端发送格式化的聊天消息,用于显示气泡或聊天框
*
* @param socketId 目标客户端Socket ID
* @param from 发送者名称
* @param txt 消息文本
* @param bubble 是否显示气泡
*/
sendChatRender(socketId: string, from: string, txt: string, bubble: boolean): void {
const message: ChatRenderMessage = {
t: 'chat_render',
from,
txt,
bubble,
};
const client = this.clients.get(socketId);
if (client) {
this.sendMessage(client, 'chat_render', message);
}
this.logger.debug('发送聊天渲染消息', {
operation: 'sendChatRender',
socketId,
from,
textLength: txt.length,
bubble,
timestamp: new Date().toISOString(),
});
}
/**
* 向指定地图的所有客户端广播消息
*
* 功能描述:
* 向指定地图区域内的所有在线玩家广播消息
*
* @param mapId 地图ID
* @param event 事件名称
* @param data 消息数据
*/
async broadcastToMap(mapId: string, event: string, data: any): Promise<void> {
this.logger.debug('向地图广播消息', {
operation: 'broadcastToMap',
mapId,
event,
timestamp: new Date().toISOString(),
});
try {
// 从SessionManager获取指定地图的所有Socket ID
const socketIds = await this.sessionManager.getSocketsInMap(mapId);
if (socketIds.length === 0) {
this.logger.debug('地图中没有在线玩家', {
operation: 'broadcastToMap',
mapId,
});
return;
}
// 向每个Socket发送消息
for (const socketId of socketIds) {
const client = this.clients.get(socketId);
if (client) {
this.sendMessage(client, event, data);
}
}
this.logger.log('地图广播完成', {
operation: 'broadcastToMap',
mapId,
event,
recipientCount: socketIds.length,
});
} catch (error) {
const err = error as Error;
this.logger.error('地图广播失败', {
operation: 'broadcastToMap',
mapId,
event,
error: err.message,
}, err.stack);
}
}
/**
* 向指定客户端发送消息
*
* 功能描述:
* 向指定的WebSocket客户端发送消息
*
* @param socketId 目标客户端Socket ID
* @param event 事件名称
* @param data 消息数据
*/
sendToPlayer(socketId: string, event: string, data: any): void {
const client = this.clients.get(socketId);
if (client) {
this.sendMessage(client, event, data);
}
this.logger.debug('发送消息给玩家', {
operation: 'sendToPlayer',
socketId,
event,
timestamp: new Date().toISOString(),
});
}
/**
* 获取当前连接数
*
* 功能描述:
* 获取当前WebSocket网关的连接数量
*
* @returns Promise<number> 连接数
*/
async getConnectionCount(): Promise<number> {
return this.clients.size;
}
/**
* 获取已认证的连接数
*
* 功能描述:
* 获取当前已认证的WebSocket连接数量
*
* @returns Promise<number> 已认证连接数
*/
async getAuthenticatedConnectionCount(): Promise<number> {
let count = 0;
for (const client of this.clients.values()) {
if (client.data?.authenticated === true) {
count++;
}
}
return count;
}
/**
* 断开指定客户端连接
*
* 功能描述:
* 强制断开指定的WebSocket客户端连接
*
* @param socketId 目标客户端Socket ID
* @param reason 断开原因
*/
async disconnectClient(socketId: string, reason?: string): Promise<void> {
const client = this.clients.get(socketId);
if (client) {
client.close();
this.logger.log('客户端连接已断开', {
operation: 'disconnectClient',
socketId,
reason,
});
} else {
this.logger.warn('未找到目标客户端', {
operation: 'disconnectClient',
socketId,
});
}
}
/**
* 发送消息给客户端
*/
private sendMessage(client: ExtendedWebSocket, event: string, data: any) {
if (client.readyState === WebSocket.OPEN) {
// 直接发送数据不包装在event中
client.send(JSON.stringify(data));
}
}
/**
* 生成客户端ID
*/
private generateClientId(): string {
return `ws_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
/**
* 设置心跳检测
*/
private setupHeartbeat() {
setInterval(() => {
this.clients.forEach((client) => {
if (!client.isAlive) {
this.logger.warn('客户端心跳超时,断开连接', {
socketId: client.id,
});
client.close();
return;
}
client.isAlive = false;
if (client.readyState === WebSocket.OPEN) {
client.ping();
}
});
}, ZulipWebSocketGateway.HEARTBEAT_INTERVAL);
}
/**
* 设置消息分发器
*
* 功能描述:
* 将当前WebSocket网关设置为ZulipEventProcessorService的消息分发器
* 使其能够接收从Zulip返回的消息并转发给游戏客户端
*
* @private
*/
private setupMessageDistributor(): void {
try {
// 获取ZulipEventProcessorService实例
const eventProcessor = this.zulipService.getEventProcessor();
if (eventProcessor) {
// 设置消息分发器
eventProcessor.setMessageDistributor(this);
this.logger.log('消息分发器设置完成', {
operation: 'setupMessageDistributor',
timestamp: new Date().toISOString(),
});
} else {
this.logger.warn('无法获取ZulipEventProcessorService实例', {
operation: 'setupMessageDistributor',
});
}
} catch (error) {
const err = error as Error;
this.logger.error('设置消息分发器失败', {
operation: 'setupMessageDistributor',
error: err.message,
}, err.stack);
}
}
}