Files
whale-town-end/src/business/zulip/zulip_websocket.gateway.ts
moyin d8b7143f60 websocket:增强Zulip WebSocket网关的调试和监控功能
- 添加详细的连接和断开日志记录
- 增强错误处理和异常捕获机制
- 完善客户端状态管理和会话跟踪
- 优化消息处理的调试输出

提升WebSocket连接问题的诊断能力
2026-01-05 11:14:04 +08:00

800 lines
21 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
/**
* Zulip WebSocket网关
*
* 功能描述:
* - 处理所有Godot游戏客户端的WebSocket连接
* - 实现游戏协议到Zulip协议的转换
* - 提供统一的消息路由和权限控制
*
* 主要方法:
* - handleConnection(): 处理客户端连接建立
* - handleDisconnect(): 处理客户端连接断开
* - handleLogin(): 处理登录消息
* - handleChat(): 处理聊天消息
* - handlePositionUpdate(): 处理位置更新
*
* 使用场景:
* - 游戏客户端WebSocket通信的统一入口
* - 消息协议转换和路由分发
* - 连接状态管理和权限验证
*
* @author angjustinl
* @version 1.0.0
* @since 2025-12-25
*/
import {
WebSocketGateway,
WebSocketServer,
SubscribeMessage,
OnGatewayConnection,
OnGatewayDisconnect,
MessageBody,
ConnectedSocket,
} from '@nestjs/websockets';
import { Server, Socket } from 'socket.io';
import { Injectable, Logger } from '@nestjs/common';
import { ZulipService } from './zulip.service';
import { SessionManagerService } from './services/session_manager.service';
/**
* 登录消息接口 - 按guide.md格式
*/
interface LoginMessage {
type: 'login';
token: string;
}
/**
* 聊天消息接口 - 按guide.md格式
*/
interface ChatMessage {
t: 'chat';
content: string;
scope: string; // "local" 或 topic名称
}
/**
* 位置更新消息接口
*/
interface PositionMessage {
t: 'position';
x: number;
y: number;
mapId: string;
}
/**
* 聊天渲染消息接口 - 发送给客户端
*/
interface ChatRenderMessage {
t: 'chat_render';
from: string;
txt: string;
bubble: boolean;
}
/**
* 登录成功消息接口 - 发送给客户端
*/
interface LoginSuccessMessage {
t: 'login_success';
sessionId: string;
userId: string;
username: string;
currentMap: string;
}
/**
* 客户端数据接口
*/
interface ClientData {
authenticated: boolean;
userId: string | null;
sessionId: string | null;
username: string | null;
connectedAt: Date;
}
/**
* Zulip WebSocket网关类
*
* 职责:
* - 处理所有Godot游戏客户端的WebSocket连接
* - 实现游戏协议到Zulip协议的转换
* - 提供统一的消息路由和权限控制
* - 管理客户端连接状态和会话
*
* 主要方法:
* - handleConnection(): 处理客户端连接建立
* - handleDisconnect(): 处理客户端连接断开
* - handleLogin(): 处理登录消息
* - handleChat(): 处理聊天消息
* - handlePositionUpdate(): 处理位置更新
* - sendChatRender(): 向客户端发送聊天渲染消息
*
* 使用场景:
* - 游戏客户端WebSocket通信的统一入口
* - 消息协议转换和路由分发
* - 连接状态管理和权限验证
* - 实时消息推送和广播
*/
@Injectable()
@WebSocketGateway({
cors: { origin: '*' },
namespace: '/game',
})
export class ZulipWebSocketGateway implements OnGatewayConnection, OnGatewayDisconnect {
@WebSocketServer()
server: Server;
private readonly logger = new Logger(ZulipWebSocketGateway.name);
constructor(
private readonly zulipService: ZulipService,
private readonly sessionManager: SessionManagerService,
) {
this.logger.log('ZulipWebSocketGateway初始化完成', {
gateway: 'ZulipWebSocketGateway',
namespace: '/game',
timestamp: new Date().toISOString(),
});
// 设置消息分发器使ZulipEventProcessorService能够向客户端发送消息
this.setupMessageDistributor();
}
/**
* 处理客户端连接建立
*
* 功能描述:
* 当游戏客户端建立WebSocket连接时调用记录连接信息
*
* 业务逻辑:
* 1. 记录新连接的建立
* 2. 为连接分配唯一标识
* 3. 初始化连接状态
*
* @param client WebSocket客户端连接对象
*/
async handleConnection(client: Socket): Promise<void> {
this.logger.log('新的WebSocket连接建立', {
operation: 'handleConnection',
socketId: client.id,
remoteAddress: client.handshake.address,
timestamp: new Date().toISOString(),
});
// 设置连接的初始状态
const clientData: ClientData = {
authenticated: false,
userId: null,
sessionId: null,
username: null,
connectedAt: new Date(),
};
client.data = clientData;
}
/**
* 处理客户端连接断开
*
* 功能描述:
* 当游戏客户端断开WebSocket连接时调用清理相关资源
*
* 业务逻辑:
* 1. 记录连接断开信息
* 2. 清理会话数据
* 3. 注销Zulip事件队列
* 4. 释放相关资源
*
* @param client WebSocket客户端连接对象
*/
async handleDisconnect(client: Socket): Promise<void> {
const clientData = client.data as ClientData | undefined;
const connectionDuration = clientData?.connectedAt
? Date.now() - clientData.connectedAt.getTime()
: 0;
this.logger.log('WebSocket连接断开', {
operation: 'handleDisconnect',
socketId: client.id,
userId: clientData?.userId,
authenticated: clientData?.authenticated,
connectionDuration,
timestamp: new Date().toISOString(),
});
// 如果用户已认证,处理登出逻辑
if (clientData?.authenticated) {
try {
await this.zulipService.handlePlayerLogout(client.id);
this.logger.log('玩家登出处理完成', {
operation: 'handleDisconnect',
socketId: client.id,
userId: clientData.userId,
});
} catch (error) {
const err = error as Error;
this.logger.error('处理玩家登出时发生错误', {
operation: 'handleDisconnect',
socketId: client.id,
error: err.message,
timestamp: new Date().toISOString(),
}, err.stack);
}
}
}
/**
* 处理登录消息 - 按guide.md格式
*
* 功能描述:
* 处理游戏客户端发送的登录请求验证Token并建立会话
*
* 业务逻辑:
* 1. 验证消息格式
* 2. 调用ZulipService处理登录逻辑
* 3. 更新连接状态
* 4. 返回登录结果
*
* @param client WebSocket客户端连接对象
* @param data 登录消息数据
*/
@SubscribeMessage('login')
async handleLogin(
@ConnectedSocket() client: Socket,
@MessageBody() data: LoginMessage,
): Promise<void> {
this.logger.log('收到登录请求', {
operation: 'handleLogin',
socketId: client.id,
messageType: data?.type,
timestamp: new Date().toISOString(),
});
try {
// 验证消息格式
if (!data || data.type !== 'login' || !data.token) {
this.logger.warn('登录请求格式无效', {
operation: 'handleLogin',
socketId: client.id,
data,
});
client.emit('login_error', {
t: 'login_error',
message: '登录请求格式无效',
});
return;
}
// 检查是否已经登录
const clientData = client.data as ClientData;
if (clientData?.authenticated) {
this.logger.warn('用户已登录,拒绝重复登录', {
operation: 'handleLogin',
socketId: client.id,
userId: clientData.userId,
});
client.emit('login_error', {
t: 'login_error',
message: '您已经登录',
});
return;
}
// 调用ZulipService处理登录
const result = await this.zulipService.handlePlayerLogin({
token: data.token,
socketId: client.id,
});
if (result.success && result.sessionId) {
// 更新连接状态
const updatedClientData: ClientData = {
authenticated: true,
sessionId: result.sessionId,
userId: result.userId || null,
username: result.username || null,
connectedAt: clientData?.connectedAt || new Date(),
};
client.data = updatedClientData;
// 发送登录成功消息
const loginSuccess: LoginSuccessMessage = {
t: 'login_success',
sessionId: result.sessionId,
userId: result.userId || '',
username: result.username || '',
currentMap: result.currentMap || 'novice_village',
};
client.emit('login_success', loginSuccess);
this.logger.log('登录处理成功', {
operation: 'handleLogin',
socketId: client.id,
sessionId: result.sessionId,
userId: result.userId,
username: result.username,
currentMap: result.currentMap,
timestamp: new Date().toISOString(),
});
} else {
// 发送登录失败消息
client.emit('login_error', {
t: 'login_error',
message: result.error || '登录失败',
});
this.logger.warn('登录处理失败', {
operation: 'handleLogin',
socketId: client.id,
error: result.error,
timestamp: new Date().toISOString(),
});
}
} catch (error) {
const err = error as Error;
this.logger.error('登录处理异常', {
operation: 'handleLogin',
socketId: client.id,
error: err.message,
timestamp: new Date().toISOString(),
}, err.stack);
client.emit('login_error', {
t: 'login_error',
message: '系统错误,请稍后重试',
});
}
}
/**
* 处理聊天消息 - 按guide.md格式
*
* 功能描述:
* 处理游戏客户端发送的聊天消息转发到Zulip对应的Stream/Topic
*
* 业务逻辑:
* 1. 验证用户认证状态
* 2. 验证消息格式
* 3. 调用ZulipService处理消息发送
* 4. 返回发送结果确认
*
* @param client WebSocket客户端连接对象
* @param data 聊天消息数据
*/
@SubscribeMessage('chat')
async handleChat(
@ConnectedSocket() client: Socket,
@MessageBody() data: ChatMessage,
): Promise<void> {
const clientData = client.data as ClientData | undefined;
console.log('🔍 DEBUG: handleChat 被调用了!', {
socketId: client.id,
data: data,
clientData: clientData,
timestamp: new Date().toISOString(),
});
this.logger.log('收到聊天消息', {
operation: 'handleChat',
socketId: client.id,
messageType: data?.t,
contentLength: data?.content?.length,
scope: data?.scope,
timestamp: new Date().toISOString(),
});
try {
// 验证用户认证状态
if (!clientData?.authenticated) {
this.logger.warn('未认证用户尝试发送聊天消息', {
operation: 'handleChat',
socketId: client.id,
});
client.emit('chat_error', {
t: 'chat_error',
message: '请先登录',
});
return;
}
// 验证消息格式
if (!data || data.t !== 'chat' || !data.content || !data.scope) {
this.logger.warn('聊天消息格式无效', {
operation: 'handleChat',
socketId: client.id,
data,
});
client.emit('chat_error', {
t: 'chat_error',
message: '消息格式无效',
});
return;
}
// 验证消息内容不为空
if (!data.content.trim()) {
this.logger.warn('聊天消息内容为空', {
operation: 'handleChat',
socketId: client.id,
});
client.emit('chat_error', {
t: 'chat_error',
message: '消息内容不能为空',
});
return;
}
// 调用ZulipService处理消息发送
const result = await this.zulipService.sendChatMessage({
socketId: client.id,
content: data.content,
scope: data.scope,
});
if (result.success) {
// 发送成功确认
client.emit('chat_sent', {
t: 'chat_sent',
messageId: result.messageId,
message: '消息发送成功',
});
this.logger.log('聊天消息发送成功', {
operation: 'handleChat',
socketId: client.id,
userId: clientData.userId,
messageId: result.messageId,
timestamp: new Date().toISOString(),
});
} else {
// 发送失败通知
client.emit('chat_error', {
t: 'chat_error',
message: result.error || '消息发送失败',
});
this.logger.warn('聊天消息发送失败', {
operation: 'handleChat',
socketId: client.id,
userId: clientData.userId,
error: result.error,
timestamp: new Date().toISOString(),
});
}
} catch (error) {
const err = error as Error;
this.logger.error('聊天消息处理异常', {
operation: 'handleChat',
socketId: client.id,
error: err.message,
timestamp: new Date().toISOString(),
}, err.stack);
client.emit('chat_error', {
t: 'chat_error',
message: '系统错误,请稍后重试',
});
}
}
/**
* 处理位置更新消息
*
* 功能描述:
* 处理游戏客户端发送的位置更新,用于消息路由和上下文注入
*
* @param client WebSocket客户端连接对象
* @param data 位置更新数据
*/
@SubscribeMessage('position_update')
async handlePositionUpdate(
@ConnectedSocket() client: Socket,
@MessageBody() data: PositionMessage,
): Promise<void> {
const clientData = client.data as ClientData | undefined;
this.logger.debug('收到位置更新', {
operation: 'handlePositionUpdate',
socketId: client.id,
mapId: data?.mapId,
position: data ? { x: data.x, y: data.y } : null,
timestamp: new Date().toISOString(),
});
try {
// 验证用户认证状态
if (!clientData?.authenticated) {
this.logger.debug('未认证用户发送位置更新,忽略', {
operation: 'handlePositionUpdate',
socketId: client.id,
});
return;
}
// 验证消息格式
if (!data || data.t !== 'position' || !data.mapId ||
typeof data.x !== 'number' || typeof data.y !== 'number') {
this.logger.warn('位置更新消息格式无效', {
operation: 'handlePositionUpdate',
socketId: client.id,
data,
});
return;
}
// 验证坐标有效性
if (!Number.isFinite(data.x) || !Number.isFinite(data.y)) {
this.logger.warn('位置坐标无效', {
operation: 'handlePositionUpdate',
socketId: client.id,
x: data.x,
y: data.y,
});
return;
}
// 调用ZulipService更新位置
const success = await this.zulipService.updatePlayerPosition({
socketId: client.id,
x: data.x,
y: data.y,
mapId: data.mapId,
});
if (success) {
this.logger.debug('位置更新成功', {
operation: 'handlePositionUpdate',
socketId: client.id,
mapId: data.mapId,
});
}
} catch (error) {
const err = error as Error;
this.logger.error('位置更新处理异常', {
operation: 'handlePositionUpdate',
socketId: client.id,
error: err.message,
timestamp: new Date().toISOString(),
}, err.stack);
}
}
/**
* 向指定客户端发送聊天渲染消息
*
* 功能描述:
* 向游戏客户端发送格式化的聊天消息,用于显示气泡或聊天框
*
* @param socketId 目标客户端Socket ID
* @param from 发送者名称
* @param txt 消息文本
* @param bubble 是否显示气泡
*/
sendChatRender(socketId: string, from: string, txt: string, bubble: boolean): void {
const message: ChatRenderMessage = {
t: 'chat_render',
from,
txt,
bubble,
};
this.server.to(socketId).emit('chat_render', message);
this.logger.debug('发送聊天渲染消息', {
operation: 'sendChatRender',
socketId,
from,
textLength: txt.length,
bubble,
timestamp: new Date().toISOString(),
});
}
/**
* 向指定地图的所有客户端广播消息
*
* 功能描述:
* 向指定地图区域内的所有在线玩家广播消息
*
* @param mapId 地图ID
* @param event 事件名称
* @param data 消息数据
*/
async broadcastToMap(mapId: string, event: string, data: any): Promise<void> {
this.logger.debug('向地图广播消息', {
operation: 'broadcastToMap',
mapId,
event,
timestamp: new Date().toISOString(),
});
try {
// 从SessionManager获取指定地图的所有Socket ID
const socketIds = await this.sessionManager.getSocketsInMap(mapId);
if (socketIds.length === 0) {
this.logger.debug('地图中没有在线玩家', {
operation: 'broadcastToMap',
mapId,
});
return;
}
// 向每个Socket发送消息
for (const socketId of socketIds) {
this.server.to(socketId).emit(event, data);
}
this.logger.log('地图广播完成', {
operation: 'broadcastToMap',
mapId,
event,
recipientCount: socketIds.length,
});
} catch (error) {
const err = error as Error;
this.logger.error('地图广播失败', {
operation: 'broadcastToMap',
mapId,
event,
error: err.message,
}, err.stack);
}
}
/**
* 向指定客户端发送消息
*
* 功能描述:
* 向指定的WebSocket客户端发送消息
*
* @param socketId 目标客户端Socket ID
* @param event 事件名称
* @param data 消息数据
*/
sendToPlayer(socketId: string, event: string, data: any): void {
this.server.to(socketId).emit(event, data);
this.logger.debug('发送消息给玩家', {
operation: 'sendToPlayer',
socketId,
event,
timestamp: new Date().toISOString(),
});
}
/**
* 获取当前连接数
*
* 功能描述:
* 获取当前WebSocket网关的连接数量
*
* @returns Promise<number> 连接数
*/
async getConnectionCount(): Promise<number> {
try {
const sockets = await this.server.fetchSockets();
return sockets.length;
} catch (error) {
this.logger.error('获取连接数失败', {
operation: 'getConnectionCount',
error: (error as Error).message,
});
return 0;
}
}
/**
* 获取已认证的连接数
*
* 功能描述:
* 获取当前已认证的WebSocket连接数量
*
* @returns Promise<number> 已认证连接数
*/
async getAuthenticatedConnectionCount(): Promise<number> {
try {
const sockets = await this.server.fetchSockets();
return sockets.filter(socket => {
const data = socket.data as ClientData | undefined;
return data?.authenticated === true;
}).length;
} catch (error) {
this.logger.error('获取已认证连接数失败', {
operation: 'getAuthenticatedConnectionCount',
error: (error as Error).message,
});
return 0;
}
}
/**
* 断开指定客户端连接
*
* 功能描述:
* 强制断开指定的WebSocket客户端连接
*
* @param socketId 目标客户端Socket ID
* @param reason 断开原因
*/
async disconnectClient(socketId: string, reason?: string): Promise<void> {
try {
const sockets = await this.server.fetchSockets();
const targetSocket = sockets.find(s => s.id === socketId);
if (targetSocket) {
targetSocket.disconnect(true);
this.logger.log('客户端连接已断开', {
operation: 'disconnectClient',
socketId,
reason,
});
} else {
this.logger.warn('未找到目标客户端', {
operation: 'disconnectClient',
socketId,
});
}
} catch (error) {
this.logger.error('断开客户端连接失败', {
operation: 'disconnectClient',
socketId,
error: (error as Error).message,
});
}
}
/**
* 设置消息分发器
*
* 功能描述:
* 将当前WebSocket网关设置为ZulipEventProcessorService的消息分发器
* 使其能够接收从Zulip返回的消息并转发给游戏客户端
*
* @private
*/
private setupMessageDistributor(): void {
try {
// 获取ZulipEventProcessorService实例
const eventProcessor = this.zulipService.getEventProcessor();
if (eventProcessor) {
// 设置消息分发器
eventProcessor.setMessageDistributor(this);
this.logger.log('消息分发器设置完成', {
operation: 'setupMessageDistributor',
timestamp: new Date().toISOString(),
});
} else {
this.logger.warn('无法获取ZulipEventProcessorService实例', {
operation: 'setupMessageDistributor',
});
}
} catch (error) {
const err = error as Error;
this.logger.error('设置消息分发器失败', {
operation: 'setupMessageDistributor',
error: err.message,
}, err.stack);
}
}
}