Files
whale-town-end/src/business/location_broadcast/location_broadcast.gateway.ts
moyin cbf4120ddd refactor: 更新WebSocket相关测试和location_broadcast模块
- 更新location_broadcast网关以支持原生WebSocket
- 修改WebSocket认证守卫和中间件
- 更新相关的测试文件和规范
- 添加WebSocket测试工具
- 完善Zulip服务的测试覆盖

技术改进:
- 统一WebSocket实现架构
- 优化性能监控和限流中间件
- 更新测试用例以适配新的WebSocket实现
2026-01-09 17:02:43 +08:00

876 lines
25 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
/**
* 位置广播WebSocket网关
*
* 功能描述:
* - 处理WebSocket连接和断开事件
* - 管理用户会话的加入和离开
* - 实时广播用户位置更新
* - 提供心跳检测和连接状态管理
*
* 职责分离:
* - WebSocket连接管理处理连接建立、断开和错误
* - 消息路由:根据消息类型分发到对应的处理器
* - 认证集成使用JWT认证守卫保护WebSocket事件
* - 实时广播:向会话中的其他用户广播位置更新
*
* 技术实现:
* - 原生WebSocket提供WebSocket通信能力
* - JWT认证保护需要认证的WebSocket事件
* - 核心服务集成:调用位置广播核心服务处理业务逻辑
* - 异常处理统一的WebSocket异常处理和错误响应
*
* 最近修改:
* - 2026-01-09: 重构为原生WebSocket - 移除Socket.IO依赖使用原生WebSocket (修改者: moyin)
*
* @author moyin
* @version 2.0.0
* @since 2026-01-08
* @lastModified 2026-01-09
*/
import {
WebSocketGateway,
WebSocketServer,
SubscribeMessage,
ConnectedSocket,
MessageBody,
OnGatewayConnection,
OnGatewayDisconnect,
OnGatewayInit,
WsException,
} from '@nestjs/websockets';
import { Server } from 'ws';
import * as WebSocket from 'ws';
import { Logger, UseFilters, UseGuards, UsePipes, ValidationPipe, ArgumentsHost, Inject } from '@nestjs/common';
import { BaseWsExceptionFilter } from '@nestjs/websockets';
// 导入中间件
import { RateLimitMiddleware } from './rate_limit.middleware';
import { PerformanceMonitorMiddleware } from './performance_monitor.middleware';
// 导入DTO和守卫
import { WebSocketAuthGuard, AuthenticatedSocket } from './websocket_auth.guard';
import {
JoinSessionMessage,
LeaveSessionMessage,
PositionUpdateMessage,
HeartbeatMessage,
} from './dto/websocket_message.dto';
import {
SessionJoinedResponse,
UserJoinedNotification,
UserLeftNotification,
PositionBroadcast,
HeartbeatResponse,
ErrorResponse,
SuccessResponse,
} from './dto/websocket_response.dto';
// 导入核心服务接口
import { Position } from '../../core/location_broadcast_core/position.interface';
/**
* 扩展的WebSocket接口包含用户信息
*/
interface ExtendedWebSocket extends WebSocket {
id: string;
userId?: string;
sessionIds?: Set<string>;
connectionTimeout?: NodeJS.Timeout;
isAlive?: boolean;
}
/**
* WebSocket异常过滤器
*
* 职责:
* - 捕获WebSocket通信中的异常
* - 格式化错误响应
* - 记录错误日志
*/
class WebSocketExceptionFilter extends BaseWsExceptionFilter {
private readonly logger = new Logger(WebSocketExceptionFilter.name);
catch(exception: any, host: ArgumentsHost) {
const client = host.switchToWs().getClient<ExtendedWebSocket>();
const error: ErrorResponse = {
type: 'error',
code: exception.code || 'INTERNAL_ERROR',
message: exception.message || '服务器内部错误',
details: exception.details,
originalMessage: exception.originalMessage,
timestamp: Date.now(),
};
this.logger.error('WebSocket异常', {
socketId: client.id,
error: exception.message,
code: exception.code,
timestamp: new Date().toISOString(),
});
this.sendMessage(client, 'error', error);
}
private sendMessage(client: ExtendedWebSocket, event: string, data: any) {
if (client.readyState === WebSocket.OPEN) {
client.send(JSON.stringify({ event, data }));
}
}
}
@WebSocketGateway({
cors: {
origin: '*', // 生产环境中应该配置具体的域名
methods: ['GET', 'POST'],
credentials: true,
},
path: '/location-broadcast', // WebSocket路径
})
@UseFilters(new WebSocketExceptionFilter())
export class LocationBroadcastGateway
implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect
{
@WebSocketServer()
server: Server;
private readonly logger = new Logger(LocationBroadcastGateway.name);
private clients = new Map<string, ExtendedWebSocket>();
private sessionRooms = new Map<string, Set<string>>(); // sessionId -> Set<clientId>
/** 连接超时时间(分钟) */
private static readonly CONNECTION_TIMEOUT_MINUTES = 30;
/** 时间转换常量 */
private static readonly MILLISECONDS_PER_MINUTE = 60 * 1000;
/** 心跳间隔(毫秒) */
private static readonly HEARTBEAT_INTERVAL = 30000;
// 中间件实例
private readonly rateLimitMiddleware = new RateLimitMiddleware();
private readonly performanceMonitor = new PerformanceMonitorMiddleware();
constructor(
@Inject('ILocationBroadcastCore')
private readonly locationBroadcastCore: any, // 使用依赖注入获取核心服务
) {}
/**
* WebSocket服务器初始化
*/
afterInit(server: Server) {
this.logger.log('位置广播WebSocket服务器初始化完成', {
path: '/location-broadcast',
timestamp: new Date().toISOString(),
});
// 设置心跳检测
this.setupHeartbeat();
}
/**
* 处理客户端连接
*/
handleConnection(client: ExtendedWebSocket) {
// 生成唯一ID
client.id = this.generateClientId();
client.sessionIds = new Set();
client.isAlive = true;
this.clients.set(client.id, client);
this.logger.log('WebSocket客户端连接', {
socketId: client.id,
timestamp: new Date().toISOString(),
});
// 记录连接事件到性能监控
this.performanceMonitor.recordConnection(client as any, true);
// 发送连接确认消息
const welcomeMessage = {
type: 'connection_established',
message: '连接已建立',
socketId: client.id,
timestamp: Date.now(),
};
this.sendMessage(client, 'welcome', welcomeMessage);
// 设置连接超时
this.setConnectionTimeout(client);
// 设置消息处理
client.on('message', (data) => {
try {
const message = JSON.parse(data.toString());
this.handleMessage(client, message);
} catch (error) {
this.logger.error('解析消息失败', {
socketId: client.id,
error: error instanceof Error ? error.message : String(error),
});
}
});
// 设置pong响应
client.on('pong', () => {
client.isAlive = true;
});
}
/**
* 处理客户端断开连接
*/
async handleDisconnect(client: ExtendedWebSocket) {
const startTime = Date.now();
this.logger.log('WebSocket客户端断开连接', {
socketId: client.id,
timestamp: new Date().toISOString(),
});
// 记录断开连接事件到性能监控
this.performanceMonitor.recordConnection(client as any, false);
try {
// 清理连接超时
if (client.connectionTimeout) {
clearTimeout(client.connectionTimeout);
}
// 如果是已认证的客户端,进行清理
if (client.userId) {
await this.handleUserDisconnection(client, 'connection_lost');
}
// 从客户端列表中移除
this.clients.delete(client.id);
// 从所有会话房间中移除
if (client.sessionIds) {
for (const sessionId of client.sessionIds) {
const room = this.sessionRooms.get(sessionId);
if (room) {
room.delete(client.id);
if (room.size === 0) {
this.sessionRooms.delete(sessionId);
}
}
}
}
const duration = Date.now() - startTime;
this.logger.log('客户端断开连接处理完成', {
socketId: client.id,
userId: client.userId || 'unknown',
duration,
timestamp: new Date().toISOString(),
});
} catch (error) {
this.logger.error('处理客户端断开连接时发生错误', {
socketId: client.id,
error: error instanceof Error ? error.message : String(error),
timestamp: new Date().toISOString(),
});
}
}
/**
* 处理消息路由
*/
private async handleMessage(client: ExtendedWebSocket, message: any) {
const { event, data } = message;
switch (event) {
case 'join_session':
await this.handleJoinSession(client, data);
break;
case 'leave_session':
await this.handleLeaveSession(client, data);
break;
case 'position_update':
await this.handlePositionUpdate(client, data);
break;
case 'heartbeat':
await this.handleHeartbeat(client, data);
break;
default:
this.logger.warn('未知消息类型', {
socketId: client.id,
event,
});
}
}
/**
* 处理加入会话消息
*/
async handleJoinSession(client: ExtendedWebSocket, message: JoinSessionMessage) {
const startTime = Date.now();
this.logger.log('处理加入会话请求', {
operation: 'join_session',
socketId: client.id,
userId: client.userId,
sessionId: message.sessionId,
timestamp: new Date().toISOString(),
});
try {
// 验证认证状态
if (!client.userId) {
throw new WsException({
type: 'error',
code: 'UNAUTHORIZED',
message: '用户未认证',
timestamp: Date.now(),
});
}
// 1. 将用户添加到会话
await this.locationBroadcastCore.addUserToSession(
message.sessionId,
client.userId,
client.id,
);
// 2. 如果提供了初始位置,设置用户位置
if (message.initialPosition) {
const position: Position = {
userId: client.userId,
x: message.initialPosition.x,
y: message.initialPosition.y,
mapId: message.initialPosition.mapId,
timestamp: Date.now(),
metadata: {},
};
await this.locationBroadcastCore.setUserPosition(client.userId, position);
}
// 3. 获取会话中的用户列表和位置信息
const [sessionUsers, sessionPositions] = await Promise.all([
this.locationBroadcastCore.getSessionUsers(message.sessionId),
this.locationBroadcastCore.getSessionPositions(message.sessionId),
]);
// 4. 向客户端发送加入成功响应
const joinResponse: SessionJoinedResponse = {
type: 'session_joined',
sessionId: message.sessionId,
users: sessionUsers.map(user => ({
userId: user.userId,
socketId: user.socketId,
joinedAt: user.joinedAt,
lastSeen: user.lastSeen,
status: user.status,
position: user.position ? {
x: user.position.x,
y: user.position.y,
mapId: user.position.mapId,
timestamp: user.position.timestamp,
} : undefined,
})),
positions: sessionPositions.map(pos => ({
userId: pos.userId,
x: pos.x,
y: pos.y,
mapId: pos.mapId,
timestamp: pos.timestamp,
metadata: pos.metadata,
})),
timestamp: Date.now(),
};
this.sendMessage(client, 'session_joined', joinResponse);
// 5. 向会话中其他用户广播新用户加入通知
const userJoinedNotification: UserJoinedNotification = {
type: 'user_joined',
user: {
userId: client.userId,
socketId: client.id,
joinedAt: Date.now(),
status: 'online',
},
position: message.initialPosition ? {
x: message.initialPosition.x,
y: message.initialPosition.y,
mapId: message.initialPosition.mapId,
timestamp: Date.now(),
} : undefined,
sessionId: message.sessionId,
timestamp: Date.now(),
};
// 广播给会话中的其他用户(排除当前用户)
this.broadcastToSession(message.sessionId, 'user_joined', userJoinedNotification, client.id);
// 将客户端加入会话房间
this.joinRoom(client, message.sessionId);
const duration = Date.now() - startTime;
this.logger.log('用户成功加入会话', {
operation: 'join_session',
socketId: client.id,
userId: client.userId,
sessionId: message.sessionId,
userCount: sessionUsers.length,
duration,
timestamp: new Date().toISOString(),
});
} catch (error) {
const duration = Date.now() - startTime;
this.logger.error('加入会话失败', {
operation: 'join_session',
socketId: client.id,
userId: client.userId,
sessionId: message.sessionId,
error: error instanceof Error ? error.message : String(error),
duration,
timestamp: new Date().toISOString(),
});
const errorResponse: ErrorResponse = {
type: 'error',
code: 'JOIN_SESSION_FAILED',
message: '加入会话失败',
details: {
sessionId: message.sessionId,
reason: error instanceof Error ? error.message : String(error),
},
originalMessage: message,
timestamp: Date.now(),
};
this.sendMessage(client, 'error', errorResponse);
}
}
/**
* 处理离开会话消息
*/
async handleLeaveSession(client: ExtendedWebSocket, message: LeaveSessionMessage) {
const startTime = Date.now();
this.logger.log('处理离开会话请求', {
operation: 'leave_session',
socketId: client.id,
userId: client.userId,
sessionId: message.sessionId,
reason: message.reason,
timestamp: new Date().toISOString(),
});
try {
// 验证认证状态
if (!client.userId) {
throw new WsException({
type: 'error',
code: 'UNAUTHORIZED',
message: '用户未认证',
timestamp: Date.now(),
});
}
// 1. 从会话中移除用户
await this.locationBroadcastCore.removeUserFromSession(
message.sessionId,
client.userId,
);
// 2. 向会话中其他用户广播用户离开通知
const userLeftNotification: UserLeftNotification = {
type: 'user_left',
userId: client.userId,
reason: message.reason || 'user_left',
sessionId: message.sessionId,
timestamp: Date.now(),
};
this.broadcastToSession(message.sessionId, 'user_left', userLeftNotification, client.id);
// 3. 从会话房间中移除客户端
this.leaveRoom(client, message.sessionId);
// 4. 发送离开成功确认
const successResponse: SuccessResponse = {
type: 'success',
message: '成功离开会话',
operation: 'leave_session',
data: {
sessionId: message.sessionId,
reason: message.reason || 'user_left',
},
timestamp: Date.now(),
};
this.sendMessage(client, 'leave_session_success', successResponse);
const duration = Date.now() - startTime;
this.logger.log('用户成功离开会话', {
operation: 'leave_session',
socketId: client.id,
userId: client.userId,
sessionId: message.sessionId,
reason: message.reason,
duration,
timestamp: new Date().toISOString(),
});
} catch (error) {
const duration = Date.now() - startTime;
this.logger.error('离开会话失败', {
operation: 'leave_session',
socketId: client.id,
userId: client.userId,
sessionId: message.sessionId,
error: error instanceof Error ? error.message : String(error),
duration,
timestamp: new Date().toISOString(),
});
const errorResponse: ErrorResponse = {
type: 'error',
code: 'LEAVE_SESSION_FAILED',
message: '离开会话失败',
details: {
sessionId: message.sessionId,
reason: error instanceof Error ? error.message : String(error),
},
originalMessage: message,
timestamp: Date.now(),
};
this.sendMessage(client, 'error', errorResponse);
}
}
/**
* 处理位置更新消息
*/
async handlePositionUpdate(client: ExtendedWebSocket, message: PositionUpdateMessage) {
// 开始性能监控
const perfContext = this.performanceMonitor.startMonitoring('position_update', client as any);
// 检查频率限制
const rateLimitAllowed = this.rateLimitMiddleware.checkRateLimit(client.userId || '', client.id);
if (!rateLimitAllowed) {
this.rateLimitMiddleware.handleRateLimit(client as any, client.userId || '');
this.performanceMonitor.endMonitoring(perfContext, false, 'Rate limit exceeded');
return;
}
const startTime = Date.now();
this.logger.debug('处理位置更新请求', {
operation: 'position_update',
socketId: client.id,
userId: client.userId,
mapId: message.mapId,
x: message.x,
y: message.y,
timestamp: new Date().toISOString(),
});
try {
// 验证认证状态
if (!client.userId) {
throw new WsException({
type: 'error',
code: 'UNAUTHORIZED',
message: '用户未认证',
timestamp: Date.now(),
});
}
// 1. 构建位置对象
const position: Position = {
userId: client.userId,
x: message.x,
y: message.y,
mapId: message.mapId,
timestamp: message.timestamp || Date.now(),
metadata: message.metadata || {},
};
// 2. 更新用户位置
await this.locationBroadcastCore.setUserPosition(client.userId, position);
// 3. 向用户所在的所有会话广播位置更新
if (client.sessionIds) {
for (const sessionId of client.sessionIds) {
const positionBroadcast: PositionBroadcast = {
type: 'position_broadcast',
userId: client.userId,
position: {
x: position.x,
y: position.y,
mapId: position.mapId,
timestamp: position.timestamp,
metadata: position.metadata,
},
sessionId,
timestamp: Date.now(),
};
this.broadcastToSession(sessionId, 'position_update', positionBroadcast, client.id);
}
}
// 4. 发送位置更新成功确认
const successResponse: SuccessResponse = {
type: 'success',
message: '位置更新成功',
operation: 'position_update',
data: {
x: position.x,
y: position.y,
mapId: position.mapId,
timestamp: position.timestamp,
},
timestamp: Date.now(),
};
this.sendMessage(client, 'position_update_success', successResponse);
const duration = Date.now() - startTime;
this.logger.debug('位置更新处理完成', {
operation: 'position_update',
socketId: client.id,
userId: client.userId,
mapId: message.mapId,
duration,
timestamp: new Date().toISOString(),
});
// 结束性能监控
this.performanceMonitor.endMonitoring(perfContext, true);
} catch (error) {
const duration = Date.now() - startTime;
this.logger.error('位置更新失败', {
operation: 'position_update',
socketId: client.id,
userId: client.userId,
mapId: message.mapId,
error: error instanceof Error ? error.message : String(error),
duration,
timestamp: new Date().toISOString(),
});
// 结束性能监控(失败)
this.performanceMonitor.endMonitoring(perfContext, false, error instanceof Error ? error.message : String(error));
const errorResponse: ErrorResponse = {
type: 'error',
code: 'POSITION_UPDATE_FAILED',
message: '位置更新失败',
details: {
mapId: message.mapId,
reason: error instanceof Error ? error.message : String(error),
},
originalMessage: message,
timestamp: Date.now(),
};
this.sendMessage(client, 'error', errorResponse);
}
}
/**
* 处理心跳消息
*/
async handleHeartbeat(client: ExtendedWebSocket, message: HeartbeatMessage) {
this.logger.debug('处理心跳请求', {
operation: 'heartbeat',
socketId: client.id,
clientTimestamp: message.timestamp,
sequence: message.sequence,
});
try {
// 1. 重置连接超时
this.setConnectionTimeout(client);
// 2. 构建心跳响应
const heartbeatResponse: HeartbeatResponse = {
type: 'heartbeat_response',
clientTimestamp: message.timestamp,
serverTimestamp: Date.now(),
sequence: message.sequence,
};
// 3. 发送心跳响应
this.sendMessage(client, 'heartbeat_response', heartbeatResponse);
} catch (error) {
this.logger.error('心跳处理失败', {
operation: 'heartbeat',
socketId: client.id,
error: error instanceof Error ? error.message : String(error),
});
}
}
/**
* 处理用户断开连接的清理工作
*/
private async handleUserDisconnection(client: ExtendedWebSocket, reason: string): Promise<void> {
try {
// 1. 获取用户所在的所有会话
const sessionIds = Array.from(client.sessionIds || []);
// 2. 从所有会话中移除用户并通知其他用户
for (const sessionId of sessionIds) {
try {
// 从会话中移除用户
await this.locationBroadcastCore.removeUserFromSession(
sessionId,
client.userId!,
);
// 通知会话中的其他用户
const userLeftNotification: UserLeftNotification = {
type: 'user_left',
userId: client.userId!,
reason,
sessionId,
timestamp: Date.now(),
};
this.broadcastToSession(sessionId, 'user_left', userLeftNotification, client.id);
} catch (error) {
this.logger.error('从会话中移除用户失败', {
socketId: client.id,
userId: client.userId,
sessionId,
error: error instanceof Error ? error.message : String(error),
});
}
}
// 3. 清理用户的所有数据
await this.locationBroadcastCore.cleanupUserData(client.userId!);
this.logger.log('用户断开连接清理完成', {
socketId: client.id,
userId: client.userId,
reason,
sessionCount: sessionIds.length,
timestamp: new Date().toISOString(),
});
} catch (error) {
this.logger.error('用户断开连接清理失败', {
socketId: client.id,
userId: client.userId,
reason,
error: error instanceof Error ? error.message : String(error),
});
}
}
/**
* 发送消息给客户端
*/
private sendMessage(client: ExtendedWebSocket, event: string, data: any) {
if (client.readyState === WebSocket.OPEN) {
client.send(JSON.stringify({ event, data }));
}
}
/**
* 向会话房间广播消息
*/
private broadcastToSession(sessionId: string, event: string, data: any, excludeClientId?: string) {
const room = this.sessionRooms.get(sessionId);
if (!room) return;
for (const clientId of room) {
if (excludeClientId && clientId === excludeClientId) continue;
const client = this.clients.get(clientId);
if (client) {
this.sendMessage(client, event, data);
}
}
}
/**
* 将客户端加入会话房间
*/
private joinRoom(client: ExtendedWebSocket, sessionId: string) {
if (!this.sessionRooms.has(sessionId)) {
this.sessionRooms.set(sessionId, new Set());
}
this.sessionRooms.get(sessionId)!.add(client.id);
client.sessionIds!.add(sessionId);
}
/**
* 将客户端从会话房间移除
*/
private leaveRoom(client: ExtendedWebSocket, sessionId: string) {
const room = this.sessionRooms.get(sessionId);
if (room) {
room.delete(client.id);
if (room.size === 0) {
this.sessionRooms.delete(sessionId);
}
}
client.sessionIds!.delete(sessionId);
}
/**
* 生成客户端ID
*/
private generateClientId(): string {
return `ws_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
/**
* 设置连接超时
*/
private setConnectionTimeout(client: ExtendedWebSocket) {
if (client.connectionTimeout) {
clearTimeout(client.connectionTimeout);
}
client.connectionTimeout = setTimeout(() => {
this.logger.warn('客户端连接超时,自动断开', {
socketId: client.id,
timeout: `${LocationBroadcastGateway.CONNECTION_TIMEOUT_MINUTES}分钟`,
});
client.close();
}, LocationBroadcastGateway.CONNECTION_TIMEOUT_MINUTES * LocationBroadcastGateway.MILLISECONDS_PER_MINUTE);
}
/**
* 设置心跳检测
*/
private setupHeartbeat() {
setInterval(() => {
this.clients.forEach((client) => {
if (!client.isAlive) {
this.logger.warn('客户端心跳超时,断开连接', {
socketId: client.id,
});
client.close();
return;
}
client.isAlive = false;
if (client.readyState === WebSocket.OPEN) {
client.ping();
}
});
}, LocationBroadcastGateway.HEARTBEAT_INTERVAL);
}
}