Files
whale-town-end/src/business/location_broadcast/location_broadcast.gateway.ts
moyin c31cbe559d feat:实现位置广播系统
- 添加位置广播核心控制器和服务
- 实现健康检查和位置同步功能
- 添加WebSocket实时位置更新支持
- 完善位置广播的测试覆盖
2026-01-08 23:05:52 +08:00

790 lines
24 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
/**
* 位置广播WebSocket网关
*
* 功能描述:
* - 处理WebSocket连接和断开事件
* - 管理用户会话的加入和离开
* - 实时广播用户位置更新
* - 提供心跳检测和连接状态管理
*
* 职责分离:
* - WebSocket连接管理处理连接建立、断开和错误
* - 消息路由:根据消息类型分发到对应的处理器
* - 认证集成使用JWT认证守卫保护WebSocket事件
* - 实时广播:向会话中的其他用户广播位置更新
*
* 技术实现:
* - Socket.IO提供WebSocket通信能力
* - JWT认证保护需要认证的WebSocket事件
* - 核心服务集成:调用位置广播核心服务处理业务逻辑
* - 异常处理统一的WebSocket异常处理和错误响应
*
* 最近修改:
* - 2026-01-08: 代码重构 - 提取魔法数字为常量,优化代码质量 (修改者: moyin)
*
* @author moyin
* @version 1.1.0
* @since 2026-01-08
* @lastModified 2026-01-08
*/
import {
WebSocketGateway,
WebSocketServer,
SubscribeMessage,
ConnectedSocket,
MessageBody,
OnGatewayConnection,
OnGatewayDisconnect,
OnGatewayInit,
WsException,
} from '@nestjs/websockets';
import { Server, Socket } from 'socket.io';
import { Logger, UseFilters, UseGuards, UsePipes, ValidationPipe, ArgumentsHost, Inject } from '@nestjs/common';
import { BaseWsExceptionFilter } from '@nestjs/websockets';
// 导入中间件
import { RateLimitMiddleware } from './rate_limit.middleware';
import { PerformanceMonitorMiddleware } from './performance_monitor.middleware';
// 导入DTO和守卫
import { WebSocketAuthGuard, AuthenticatedSocket } from './websocket_auth.guard';
import {
JoinSessionMessage,
LeaveSessionMessage,
PositionUpdateMessage,
HeartbeatMessage,
} from './dto/websocket_message.dto';
import {
SessionJoinedResponse,
UserJoinedNotification,
UserLeftNotification,
PositionBroadcast,
HeartbeatResponse,
ErrorResponse,
SuccessResponse,
} from './dto/websocket_response.dto';
// 导入核心服务接口
import { Position } from '../../core/location_broadcast_core/position.interface';
/**
* WebSocket异常过滤器
*
* 职责:
* - 捕获WebSocket通信中的异常
* - 格式化错误响应
* - 记录错误日志
*/
class WebSocketExceptionFilter extends BaseWsExceptionFilter {
private readonly logger = new Logger(WebSocketExceptionFilter.name);
catch(exception: any, host: ArgumentsHost) {
const client = host.switchToWs().getClient<Socket>();
const error: ErrorResponse = {
type: 'error',
code: exception.code || 'INTERNAL_ERROR',
message: exception.message || '服务器内部错误',
details: exception.details,
originalMessage: exception.originalMessage,
timestamp: Date.now(),
};
this.logger.error('WebSocket异常', {
socketId: client.id,
error: exception.message,
code: exception.code,
timestamp: new Date().toISOString(),
});
client.emit('error', error);
}
}
@WebSocketGateway({
cors: {
origin: '*', // 生产环境中应该配置具体的域名
methods: ['GET', 'POST'],
credentials: true,
},
namespace: '/location-broadcast', // 使用专门的命名空间
transports: ['websocket', 'polling'], // 支持WebSocket和轮询
})
@UseFilters(new WebSocketExceptionFilter())
export class LocationBroadcastGateway
implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect
{
@WebSocketServer()
server: Server;
private readonly logger = new Logger(LocationBroadcastGateway.name);
/** 连接超时时间(分钟) */
private static readonly CONNECTION_TIMEOUT_MINUTES = 30;
/** 时间转换常量 */
private static readonly MILLISECONDS_PER_MINUTE = 60 * 1000;
// 中间件实例
private readonly rateLimitMiddleware = new RateLimitMiddleware();
private readonly performanceMonitor = new PerformanceMonitorMiddleware();
constructor(
@Inject('ILocationBroadcastCore')
private readonly locationBroadcastCore: any, // 使用依赖注入获取核心服务
) {}
/**
* WebSocket服务器初始化
*
* 技术实现:
* 1. 配置Socket.IO服务器选项
* 2. 设置中间件和事件监听器
* 3. 初始化连接池和监控
* 4. 记录服务器启动日志
*/
afterInit(server: Server) {
this.logger.log('位置广播WebSocket服务器初始化完成', {
namespace: '/location-broadcast',
timestamp: new Date().toISOString(),
});
// 设置服务器级别的中间件
server.use((socket, next) => {
this.logger.debug('新的WebSocket连接尝试', {
socketId: socket.id,
remoteAddress: socket.handshake.address,
userAgent: socket.handshake.headers['user-agent'],
timestamp: new Date().toISOString(),
});
next();
});
}
/**
* 处理客户端连接
*
* 技术实现:
* 1. 记录连接建立日志
* 2. 初始化客户端状态
* 3. 发送连接确认消息
* 4. 设置连接超时和心跳检测
*
* @param client WebSocket客户端
*/
handleConnection(client: Socket) {
this.logger.log('WebSocket客户端连接', {
socketId: client.id,
remoteAddress: client.handshake.address,
timestamp: new Date().toISOString(),
});
// 记录连接事件到性能监控
this.performanceMonitor.recordConnection(client, true);
// 发送连接确认消息
const welcomeMessage = {
type: 'connection_established',
message: '连接已建立',
socketId: client.id,
timestamp: Date.now(),
};
client.emit('welcome', welcomeMessage);
// 设置连接超时30分钟无活动自动断开
const timeout = setTimeout(() => {
this.logger.warn('客户端连接超时,自动断开', {
socketId: client.id,
timeout: `${LocationBroadcastGateway.CONNECTION_TIMEOUT_MINUTES}分钟`,
});
client.disconnect(true);
}, LocationBroadcastGateway.CONNECTION_TIMEOUT_MINUTES * LocationBroadcastGateway.MILLISECONDS_PER_MINUTE);
// 将超时ID存储到客户端对象中
(client as any).connectionTimeout = timeout;
}
/**
* 处理客户端断开连接
*
* 技术实现:
* 1. 清理客户端相关数据
* 2. 从所有会话中移除用户
* 3. 通知其他用户该用户离开
* 4. 记录断开连接日志
*
* @param client WebSocket客户端
*/
async handleDisconnect(client: Socket) {
const startTime = Date.now();
this.logger.log('WebSocket客户端断开连接', {
socketId: client.id,
timestamp: new Date().toISOString(),
});
// 记录断开连接事件到性能监控
this.performanceMonitor.recordConnection(client, false);
try {
// 清理连接超时
const timeout = (client as any).connectionTimeout;
if (timeout) {
clearTimeout(timeout);
}
// 如果是已认证的客户端,进行清理
const authenticatedClient = client as AuthenticatedSocket;
if (authenticatedClient.userId) {
await this.handleUserDisconnection(authenticatedClient, 'connection_lost');
}
const duration = Date.now() - startTime;
this.logger.log('客户端断开连接处理完成', {
socketId: client.id,
userId: authenticatedClient.userId || 'unknown',
duration,
timestamp: new Date().toISOString(),
});
} catch (error) {
this.logger.error('处理客户端断开连接时发生错误', {
socketId: client.id,
error: error instanceof Error ? error.message : String(error),
timestamp: new Date().toISOString(),
});
}
}
/**
* 处理加入会话消息
*
* 技术实现:
* 1. 验证JWT令牌和用户身份
* 2. 将用户添加到指定会话
* 3. 获取会话中其他用户的位置信息
* 4. 向用户发送会话加入成功响应
* 5. 向会话中其他用户广播新用户加入通知
*
* @param client 已认证的WebSocket客户端
* @param message 加入会话消息
*/
@SubscribeMessage('join_session')
@UseGuards(WebSocketAuthGuard)
@UsePipes(new ValidationPipe({ transform: true }))
async handleJoinSession(
@ConnectedSocket() client: AuthenticatedSocket,
@MessageBody() message: JoinSessionMessage,
) {
const startTime = Date.now();
this.logger.log('处理加入会话请求', {
operation: 'join_session',
socketId: client.id,
userId: client.userId,
sessionId: message.sessionId,
timestamp: new Date().toISOString(),
});
try {
// 1. 将用户添加到会话
await this.locationBroadcastCore.addUserToSession(
message.sessionId,
client.userId,
client.id,
);
// 2. 如果提供了初始位置,设置用户位置
if (message.initialPosition) {
const position: Position = {
userId: client.userId,
x: message.initialPosition.x,
y: message.initialPosition.y,
mapId: message.initialPosition.mapId,
timestamp: Date.now(),
metadata: {},
};
await this.locationBroadcastCore.setUserPosition(client.userId, position);
}
// 3. 获取会话中的用户列表和位置信息
const [sessionUsers, sessionPositions] = await Promise.all([
this.locationBroadcastCore.getSessionUsers(message.sessionId),
this.locationBroadcastCore.getSessionPositions(message.sessionId),
]);
// 4. 向客户端发送加入成功响应
const joinResponse: SessionJoinedResponse = {
type: 'session_joined',
sessionId: message.sessionId,
users: sessionUsers.map(user => ({
userId: user.userId,
socketId: user.socketId,
joinedAt: user.joinedAt,
lastSeen: user.lastSeen,
status: user.status,
position: user.position ? {
x: user.position.x,
y: user.position.y,
mapId: user.position.mapId,
timestamp: user.position.timestamp,
} : undefined,
})),
positions: sessionPositions.map(pos => ({
userId: pos.userId,
x: pos.x,
y: pos.y,
mapId: pos.mapId,
timestamp: pos.timestamp,
metadata: pos.metadata,
})),
timestamp: Date.now(),
};
client.emit('session_joined', joinResponse);
// 5. 向会话中其他用户广播新用户加入通知
const userJoinedNotification: UserJoinedNotification = {
type: 'user_joined',
user: {
userId: client.userId,
socketId: client.id,
joinedAt: Date.now(),
status: 'online',
},
position: message.initialPosition ? {
x: message.initialPosition.x,
y: message.initialPosition.y,
mapId: message.initialPosition.mapId,
timestamp: Date.now(),
} : undefined,
sessionId: message.sessionId,
timestamp: Date.now(),
};
// 广播给会话中的其他用户(排除当前用户)
client.to(message.sessionId).emit('user_joined', userJoinedNotification);
// 将客户端加入Socket.IO房间用于广播
client.join(message.sessionId);
const duration = Date.now() - startTime;
this.logger.log('用户成功加入会话', {
operation: 'join_session',
socketId: client.id,
userId: client.userId,
sessionId: message.sessionId,
userCount: sessionUsers.length,
duration,
timestamp: new Date().toISOString(),
});
} catch (error) {
const duration = Date.now() - startTime;
this.logger.error('加入会话失败', {
operation: 'join_session',
socketId: client.id,
userId: client.userId,
sessionId: message.sessionId,
error: error instanceof Error ? error.message : String(error),
duration,
timestamp: new Date().toISOString(),
});
throw new WsException({
type: 'error',
code: 'JOIN_SESSION_FAILED',
message: '加入会话失败',
details: {
sessionId: message.sessionId,
reason: error instanceof Error ? error.message : String(error),
},
originalMessage: message,
timestamp: Date.now(),
});
}
}
/**
* 处理离开会话消息
*
* 技术实现:
* 1. 验证用户身份和会话权限
* 2. 从会话中移除用户
* 3. 清理用户相关数据
* 4. 向会话中其他用户广播用户离开通知
* 5. 发送离开成功确认
*
* @param client 已认证的WebSocket客户端
* @param message 离开会话消息
*/
@SubscribeMessage('leave_session')
@UseGuards(WebSocketAuthGuard)
@UsePipes(new ValidationPipe({ transform: true }))
async handleLeaveSession(
@ConnectedSocket() client: AuthenticatedSocket,
@MessageBody() message: LeaveSessionMessage,
) {
const startTime = Date.now();
this.logger.log('处理离开会话请求', {
operation: 'leave_session',
socketId: client.id,
userId: client.userId,
sessionId: message.sessionId,
reason: message.reason,
timestamp: new Date().toISOString(),
});
try {
// 1. 从会话中移除用户
await this.locationBroadcastCore.removeUserFromSession(
message.sessionId,
client.userId,
);
// 2. 向会话中其他用户广播用户离开通知
const userLeftNotification: UserLeftNotification = {
type: 'user_left',
userId: client.userId,
reason: message.reason || 'user_left',
sessionId: message.sessionId,
timestamp: Date.now(),
};
client.to(message.sessionId).emit('user_left', userLeftNotification);
// 3. 从Socket.IO房间中移除客户端
client.leave(message.sessionId);
// 4. 发送离开成功确认
const successResponse: SuccessResponse = {
type: 'success',
message: '成功离开会话',
operation: 'leave_session',
data: {
sessionId: message.sessionId,
reason: message.reason || 'user_left',
},
timestamp: Date.now(),
};
client.emit('leave_session_success', successResponse);
const duration = Date.now() - startTime;
this.logger.log('用户成功离开会话', {
operation: 'leave_session',
socketId: client.id,
userId: client.userId,
sessionId: message.sessionId,
reason: message.reason,
duration,
timestamp: new Date().toISOString(),
});
} catch (error) {
const duration = Date.now() - startTime;
this.logger.error('离开会话失败', {
operation: 'leave_session',
socketId: client.id,
userId: client.userId,
sessionId: message.sessionId,
error: error instanceof Error ? error.message : String(error),
duration,
timestamp: new Date().toISOString(),
});
throw new WsException({
type: 'error',
code: 'LEAVE_SESSION_FAILED',
message: '离开会话失败',
details: {
sessionId: message.sessionId,
reason: error instanceof Error ? error.message : String(error),
},
originalMessage: message,
timestamp: Date.now(),
});
}
}
/**
* 处理位置更新消息
*
* 技术实现:
* 1. 验证位置数据的有效性
* 2. 更新用户在Redis中的位置缓存
* 3. 获取用户当前所在的会话
* 4. 向会话中其他用户广播位置更新
* 5. 可选:触发位置数据持久化
*
* @param client 已认证的WebSocket客户端
* @param message 位置更新消息
*/
@SubscribeMessage('position_update')
@UseGuards(WebSocketAuthGuard)
@UsePipes(new ValidationPipe({ transform: true }))
async handlePositionUpdate(
@ConnectedSocket() client: AuthenticatedSocket,
@MessageBody() message: PositionUpdateMessage,
) {
// 开始性能监控
const perfContext = this.performanceMonitor.startMonitoring('position_update', client);
// 检查频率限制
const rateLimitAllowed = this.rateLimitMiddleware.checkRateLimit(client.userId, client.id);
if (!rateLimitAllowed) {
this.rateLimitMiddleware.handleRateLimit(client, client.userId);
this.performanceMonitor.endMonitoring(perfContext, false, 'Rate limit exceeded');
return;
}
const startTime = Date.now();
this.logger.debug('处理位置更新请求', {
operation: 'position_update',
socketId: client.id,
userId: client.userId,
mapId: message.mapId,
x: message.x,
y: message.y,
timestamp: new Date().toISOString(),
});
try {
// 1. 构建位置对象
const position: Position = {
userId: client.userId,
x: message.x,
y: message.y,
mapId: message.mapId,
timestamp: message.timestamp || Date.now(),
metadata: message.metadata || {},
};
// 2. 更新用户位置
await this.locationBroadcastCore.setUserPosition(client.userId, position);
// 3. 获取用户当前会话从Redis中获取
// 注意这里需要从Redis获取用户的会话信息
// 暂时使用客户端房间信息作为会话ID
const rooms = Array.from(client.rooms);
const sessionId = rooms.find(room => room !== client.id); // 排除socket自身的房间
if (sessionId) {
// 4. 向会话中其他用户广播位置更新
const positionBroadcast: PositionBroadcast = {
type: 'position_broadcast',
userId: client.userId,
position: {
x: position.x,
y: position.y,
mapId: position.mapId,
timestamp: position.timestamp,
metadata: position.metadata,
},
sessionId,
timestamp: Date.now(),
};
client.to(sessionId).emit('position_update', positionBroadcast);
}
// 5. 发送位置更新成功确认(可选)
const successResponse: SuccessResponse = {
type: 'success',
message: '位置更新成功',
operation: 'position_update',
data: {
x: position.x,
y: position.y,
mapId: position.mapId,
timestamp: position.timestamp,
},
timestamp: Date.now(),
};
client.emit('position_update_success', successResponse);
const duration = Date.now() - startTime;
this.logger.debug('位置更新处理完成', {
operation: 'position_update',
socketId: client.id,
userId: client.userId,
mapId: message.mapId,
sessionId,
duration,
timestamp: new Date().toISOString(),
});
// 结束性能监控
this.performanceMonitor.endMonitoring(perfContext, true);
} catch (error) {
const duration = Date.now() - startTime;
this.logger.error('位置更新失败', {
operation: 'position_update',
socketId: client.id,
userId: client.userId,
mapId: message.mapId,
error: error instanceof Error ? error.message : String(error),
duration,
timestamp: new Date().toISOString(),
});
// 结束性能监控(失败)
this.performanceMonitor.endMonitoring(perfContext, false, error instanceof Error ? error.message : String(error));
throw new WsException({
type: 'error',
code: 'POSITION_UPDATE_FAILED',
message: '位置更新失败',
details: {
mapId: message.mapId,
reason: error instanceof Error ? error.message : String(error),
},
originalMessage: message,
timestamp: Date.now(),
});
}
}
/**
* 处理心跳消息
*
* 技术实现:
* 1. 接收客户端心跳请求
* 2. 更新连接活跃时间
* 3. 返回服务端时间戳
* 4. 重置连接超时计时器
*
* @param client WebSocket客户端
* @param message 心跳消息
*/
@SubscribeMessage('heartbeat')
@UsePipes(new ValidationPipe({ transform: true }))
async handleHeartbeat(
@ConnectedSocket() client: Socket,
@MessageBody() message: HeartbeatMessage,
) {
this.logger.debug('处理心跳请求', {
operation: 'heartbeat',
socketId: client.id,
clientTimestamp: message.timestamp,
sequence: message.sequence,
});
try {
// 1. 重置连接超时
const timeout = (client as any).connectionTimeout;
if (timeout) {
clearTimeout(timeout);
// 重新设置超时
const newTimeout = setTimeout(() => {
this.logger.warn('客户端连接超时,自动断开', {
socketId: client.id,
timeout: `${LocationBroadcastGateway.CONNECTION_TIMEOUT_MINUTES}分钟`,
});
client.disconnect(true);
}, LocationBroadcastGateway.CONNECTION_TIMEOUT_MINUTES * LocationBroadcastGateway.MILLISECONDS_PER_MINUTE);
(client as any).connectionTimeout = newTimeout;
}
// 2. 构建心跳响应
const heartbeatResponse: HeartbeatResponse = {
type: 'heartbeat_response',
clientTimestamp: message.timestamp,
serverTimestamp: Date.now(),
sequence: message.sequence,
};
// 3. 发送心跳响应
client.emit('heartbeat_response', heartbeatResponse);
} catch (error) {
this.logger.error('心跳处理失败', {
operation: 'heartbeat',
socketId: client.id,
error: error instanceof Error ? error.message : String(error),
});
// 心跳失败不抛出异常,避免断开连接
}
}
/**
* 处理用户断开连接的清理工作
*
* 技术实现:
* 1. 清理用户在所有会话中的数据
* 2. 通知相关会话中的其他用户
* 3. 清理Redis中的用户数据
* 4. 记录断开连接的统计信息
*
* @param client 已认证的WebSocket客户端
* @param reason 断开原因
*/
private async handleUserDisconnection(
client: AuthenticatedSocket,
reason: string,
): Promise<void> {
try {
// 1. 获取用户所在的所有房间(会话)
const rooms = Array.from(client.rooms);
const sessionIds = rooms.filter(room => room !== client.id);
// 2. 从所有会话中移除用户并通知其他用户
for (const sessionId of sessionIds) {
try {
// 从会话中移除用户
await this.locationBroadcastCore.removeUserFromSession(
sessionId,
client.userId,
);
// 通知会话中的其他用户
const userLeftNotification: UserLeftNotification = {
type: 'user_left',
userId: client.userId,
reason,
sessionId,
timestamp: Date.now(),
};
client.to(sessionId).emit('user_left', userLeftNotification);
} catch (error) {
this.logger.error('从会话中移除用户失败', {
socketId: client.id,
userId: client.userId,
sessionId,
error: error instanceof Error ? error.message : String(error),
});
}
}
// 3. 清理用户的所有数据
await this.locationBroadcastCore.cleanupUserData(client.userId);
this.logger.log('用户断开连接清理完成', {
socketId: client.id,
userId: client.userId,
reason,
sessionCount: sessionIds.length,
timestamp: new Date().toISOString(),
});
} catch (error) {
this.logger.error('用户断开连接清理失败', {
socketId: client.id,
userId: client.userId,
reason,
error: error instanceof Error ? error.message : String(error),
});
}
}
}