forked from datawhale/whale-town-end
feat:实现位置广播系统
- 添加位置广播核心控制器和服务 - 实现健康检查和位置同步功能 - 添加WebSocket实时位置更新支持 - 完善位置广播的测试覆盖
This commit is contained in:
790
src/business/location_broadcast/location_broadcast.gateway.ts
Normal file
790
src/business/location_broadcast/location_broadcast.gateway.ts
Normal file
@@ -0,0 +1,790 @@
|
||||
/**
|
||||
* 位置广播WebSocket网关
|
||||
*
|
||||
* 功能描述:
|
||||
* - 处理WebSocket连接和断开事件
|
||||
* - 管理用户会话的加入和离开
|
||||
* - 实时广播用户位置更新
|
||||
* - 提供心跳检测和连接状态管理
|
||||
*
|
||||
* 职责分离:
|
||||
* - WebSocket连接管理:处理连接建立、断开和错误
|
||||
* - 消息路由:根据消息类型分发到对应的处理器
|
||||
* - 认证集成:使用JWT认证守卫保护WebSocket事件
|
||||
* - 实时广播:向会话中的其他用户广播位置更新
|
||||
*
|
||||
* 技术实现:
|
||||
* - Socket.IO:提供WebSocket通信能力
|
||||
* - JWT认证:保护需要认证的WebSocket事件
|
||||
* - 核心服务集成:调用位置广播核心服务处理业务逻辑
|
||||
* - 异常处理:统一的WebSocket异常处理和错误响应
|
||||
*
|
||||
* 最近修改:
|
||||
* - 2026-01-08: 代码重构 - 提取魔法数字为常量,优化代码质量 (修改者: moyin)
|
||||
*
|
||||
* @author moyin
|
||||
* @version 1.1.0
|
||||
* @since 2026-01-08
|
||||
* @lastModified 2026-01-08
|
||||
*/
|
||||
|
||||
import {
|
||||
WebSocketGateway,
|
||||
WebSocketServer,
|
||||
SubscribeMessage,
|
||||
ConnectedSocket,
|
||||
MessageBody,
|
||||
OnGatewayConnection,
|
||||
OnGatewayDisconnect,
|
||||
OnGatewayInit,
|
||||
WsException,
|
||||
} from '@nestjs/websockets';
|
||||
import { Server, Socket } from 'socket.io';
|
||||
import { Logger, UseFilters, UseGuards, UsePipes, ValidationPipe, ArgumentsHost, Inject } from '@nestjs/common';
|
||||
import { BaseWsExceptionFilter } from '@nestjs/websockets';
|
||||
|
||||
// 导入中间件
|
||||
import { RateLimitMiddleware } from './rate_limit.middleware';
|
||||
import { PerformanceMonitorMiddleware } from './performance_monitor.middleware';
|
||||
|
||||
// 导入DTO和守卫
|
||||
import { WebSocketAuthGuard, AuthenticatedSocket } from './websocket_auth.guard';
|
||||
import {
|
||||
JoinSessionMessage,
|
||||
LeaveSessionMessage,
|
||||
PositionUpdateMessage,
|
||||
HeartbeatMessage,
|
||||
} from './dto/websocket_message.dto';
|
||||
import {
|
||||
SessionJoinedResponse,
|
||||
UserJoinedNotification,
|
||||
UserLeftNotification,
|
||||
PositionBroadcast,
|
||||
HeartbeatResponse,
|
||||
ErrorResponse,
|
||||
SuccessResponse,
|
||||
} from './dto/websocket_response.dto';
|
||||
|
||||
// 导入核心服务接口
|
||||
import { Position } from '../../core/location_broadcast_core/position.interface';
|
||||
|
||||
/**
|
||||
* WebSocket异常过滤器
|
||||
*
|
||||
* 职责:
|
||||
* - 捕获WebSocket通信中的异常
|
||||
* - 格式化错误响应
|
||||
* - 记录错误日志
|
||||
*/
|
||||
class WebSocketExceptionFilter extends BaseWsExceptionFilter {
|
||||
private readonly logger = new Logger(WebSocketExceptionFilter.name);
|
||||
|
||||
catch(exception: any, host: ArgumentsHost) {
|
||||
const client = host.switchToWs().getClient<Socket>();
|
||||
|
||||
const error: ErrorResponse = {
|
||||
type: 'error',
|
||||
code: exception.code || 'INTERNAL_ERROR',
|
||||
message: exception.message || '服务器内部错误',
|
||||
details: exception.details,
|
||||
originalMessage: exception.originalMessage,
|
||||
timestamp: Date.now(),
|
||||
};
|
||||
|
||||
this.logger.error('WebSocket异常', {
|
||||
socketId: client.id,
|
||||
error: exception.message,
|
||||
code: exception.code,
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
|
||||
client.emit('error', error);
|
||||
}
|
||||
}
|
||||
|
||||
@WebSocketGateway({
|
||||
cors: {
|
||||
origin: '*', // 生产环境中应该配置具体的域名
|
||||
methods: ['GET', 'POST'],
|
||||
credentials: true,
|
||||
},
|
||||
namespace: '/location-broadcast', // 使用专门的命名空间
|
||||
transports: ['websocket', 'polling'], // 支持WebSocket和轮询
|
||||
})
|
||||
@UseFilters(new WebSocketExceptionFilter())
|
||||
export class LocationBroadcastGateway
|
||||
implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect
|
||||
{
|
||||
@WebSocketServer()
|
||||
server: Server;
|
||||
|
||||
private readonly logger = new Logger(LocationBroadcastGateway.name);
|
||||
|
||||
/** 连接超时时间(分钟) */
|
||||
private static readonly CONNECTION_TIMEOUT_MINUTES = 30;
|
||||
/** 时间转换常量 */
|
||||
private static readonly MILLISECONDS_PER_MINUTE = 60 * 1000;
|
||||
|
||||
// 中间件实例
|
||||
private readonly rateLimitMiddleware = new RateLimitMiddleware();
|
||||
private readonly performanceMonitor = new PerformanceMonitorMiddleware();
|
||||
|
||||
constructor(
|
||||
@Inject('ILocationBroadcastCore')
|
||||
private readonly locationBroadcastCore: any, // 使用依赖注入获取核心服务
|
||||
) {}
|
||||
|
||||
/**
|
||||
* WebSocket服务器初始化
|
||||
*
|
||||
* 技术实现:
|
||||
* 1. 配置Socket.IO服务器选项
|
||||
* 2. 设置中间件和事件监听器
|
||||
* 3. 初始化连接池和监控
|
||||
* 4. 记录服务器启动日志
|
||||
*/
|
||||
afterInit(server: Server) {
|
||||
this.logger.log('位置广播WebSocket服务器初始化完成', {
|
||||
namespace: '/location-broadcast',
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
|
||||
// 设置服务器级别的中间件
|
||||
server.use((socket, next) => {
|
||||
this.logger.debug('新的WebSocket连接尝试', {
|
||||
socketId: socket.id,
|
||||
remoteAddress: socket.handshake.address,
|
||||
userAgent: socket.handshake.headers['user-agent'],
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
next();
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理客户端连接
|
||||
*
|
||||
* 技术实现:
|
||||
* 1. 记录连接建立日志
|
||||
* 2. 初始化客户端状态
|
||||
* 3. 发送连接确认消息
|
||||
* 4. 设置连接超时和心跳检测
|
||||
*
|
||||
* @param client WebSocket客户端
|
||||
*/
|
||||
handleConnection(client: Socket) {
|
||||
this.logger.log('WebSocket客户端连接', {
|
||||
socketId: client.id,
|
||||
remoteAddress: client.handshake.address,
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
|
||||
// 记录连接事件到性能监控
|
||||
this.performanceMonitor.recordConnection(client, true);
|
||||
|
||||
// 发送连接确认消息
|
||||
const welcomeMessage = {
|
||||
type: 'connection_established',
|
||||
message: '连接已建立',
|
||||
socketId: client.id,
|
||||
timestamp: Date.now(),
|
||||
};
|
||||
|
||||
client.emit('welcome', welcomeMessage);
|
||||
|
||||
// 设置连接超时(30分钟无活动自动断开)
|
||||
const timeout = setTimeout(() => {
|
||||
this.logger.warn('客户端连接超时,自动断开', {
|
||||
socketId: client.id,
|
||||
timeout: `${LocationBroadcastGateway.CONNECTION_TIMEOUT_MINUTES}分钟`,
|
||||
});
|
||||
client.disconnect(true);
|
||||
}, LocationBroadcastGateway.CONNECTION_TIMEOUT_MINUTES * LocationBroadcastGateway.MILLISECONDS_PER_MINUTE);
|
||||
|
||||
// 将超时ID存储到客户端对象中
|
||||
(client as any).connectionTimeout = timeout;
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理客户端断开连接
|
||||
*
|
||||
* 技术实现:
|
||||
* 1. 清理客户端相关数据
|
||||
* 2. 从所有会话中移除用户
|
||||
* 3. 通知其他用户该用户离开
|
||||
* 4. 记录断开连接日志
|
||||
*
|
||||
* @param client WebSocket客户端
|
||||
*/
|
||||
async handleDisconnect(client: Socket) {
|
||||
const startTime = Date.now();
|
||||
|
||||
this.logger.log('WebSocket客户端断开连接', {
|
||||
socketId: client.id,
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
|
||||
// 记录断开连接事件到性能监控
|
||||
this.performanceMonitor.recordConnection(client, false);
|
||||
|
||||
try {
|
||||
// 清理连接超时
|
||||
const timeout = (client as any).connectionTimeout;
|
||||
if (timeout) {
|
||||
clearTimeout(timeout);
|
||||
}
|
||||
|
||||
// 如果是已认证的客户端,进行清理
|
||||
const authenticatedClient = client as AuthenticatedSocket;
|
||||
if (authenticatedClient.userId) {
|
||||
await this.handleUserDisconnection(authenticatedClient, 'connection_lost');
|
||||
}
|
||||
|
||||
const duration = Date.now() - startTime;
|
||||
this.logger.log('客户端断开连接处理完成', {
|
||||
socketId: client.id,
|
||||
userId: authenticatedClient.userId || 'unknown',
|
||||
duration,
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
|
||||
} catch (error) {
|
||||
this.logger.error('处理客户端断开连接时发生错误', {
|
||||
socketId: client.id,
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理加入会话消息
|
||||
*
|
||||
* 技术实现:
|
||||
* 1. 验证JWT令牌和用户身份
|
||||
* 2. 将用户添加到指定会话
|
||||
* 3. 获取会话中其他用户的位置信息
|
||||
* 4. 向用户发送会话加入成功响应
|
||||
* 5. 向会话中其他用户广播新用户加入通知
|
||||
*
|
||||
* @param client 已认证的WebSocket客户端
|
||||
* @param message 加入会话消息
|
||||
*/
|
||||
@SubscribeMessage('join_session')
|
||||
@UseGuards(WebSocketAuthGuard)
|
||||
@UsePipes(new ValidationPipe({ transform: true }))
|
||||
async handleJoinSession(
|
||||
@ConnectedSocket() client: AuthenticatedSocket,
|
||||
@MessageBody() message: JoinSessionMessage,
|
||||
) {
|
||||
const startTime = Date.now();
|
||||
|
||||
this.logger.log('处理加入会话请求', {
|
||||
operation: 'join_session',
|
||||
socketId: client.id,
|
||||
userId: client.userId,
|
||||
sessionId: message.sessionId,
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
|
||||
try {
|
||||
// 1. 将用户添加到会话
|
||||
await this.locationBroadcastCore.addUserToSession(
|
||||
message.sessionId,
|
||||
client.userId,
|
||||
client.id,
|
||||
);
|
||||
|
||||
// 2. 如果提供了初始位置,设置用户位置
|
||||
if (message.initialPosition) {
|
||||
const position: Position = {
|
||||
userId: client.userId,
|
||||
x: message.initialPosition.x,
|
||||
y: message.initialPosition.y,
|
||||
mapId: message.initialPosition.mapId,
|
||||
timestamp: Date.now(),
|
||||
metadata: {},
|
||||
};
|
||||
|
||||
await this.locationBroadcastCore.setUserPosition(client.userId, position);
|
||||
}
|
||||
|
||||
// 3. 获取会话中的用户列表和位置信息
|
||||
const [sessionUsers, sessionPositions] = await Promise.all([
|
||||
this.locationBroadcastCore.getSessionUsers(message.sessionId),
|
||||
this.locationBroadcastCore.getSessionPositions(message.sessionId),
|
||||
]);
|
||||
|
||||
// 4. 向客户端发送加入成功响应
|
||||
const joinResponse: SessionJoinedResponse = {
|
||||
type: 'session_joined',
|
||||
sessionId: message.sessionId,
|
||||
users: sessionUsers.map(user => ({
|
||||
userId: user.userId,
|
||||
socketId: user.socketId,
|
||||
joinedAt: user.joinedAt,
|
||||
lastSeen: user.lastSeen,
|
||||
status: user.status,
|
||||
position: user.position ? {
|
||||
x: user.position.x,
|
||||
y: user.position.y,
|
||||
mapId: user.position.mapId,
|
||||
timestamp: user.position.timestamp,
|
||||
} : undefined,
|
||||
})),
|
||||
positions: sessionPositions.map(pos => ({
|
||||
userId: pos.userId,
|
||||
x: pos.x,
|
||||
y: pos.y,
|
||||
mapId: pos.mapId,
|
||||
timestamp: pos.timestamp,
|
||||
metadata: pos.metadata,
|
||||
})),
|
||||
timestamp: Date.now(),
|
||||
};
|
||||
|
||||
client.emit('session_joined', joinResponse);
|
||||
|
||||
// 5. 向会话中其他用户广播新用户加入通知
|
||||
const userJoinedNotification: UserJoinedNotification = {
|
||||
type: 'user_joined',
|
||||
user: {
|
||||
userId: client.userId,
|
||||
socketId: client.id,
|
||||
joinedAt: Date.now(),
|
||||
status: 'online',
|
||||
},
|
||||
position: message.initialPosition ? {
|
||||
x: message.initialPosition.x,
|
||||
y: message.initialPosition.y,
|
||||
mapId: message.initialPosition.mapId,
|
||||
timestamp: Date.now(),
|
||||
} : undefined,
|
||||
sessionId: message.sessionId,
|
||||
timestamp: Date.now(),
|
||||
};
|
||||
|
||||
// 广播给会话中的其他用户(排除当前用户)
|
||||
client.to(message.sessionId).emit('user_joined', userJoinedNotification);
|
||||
|
||||
// 将客户端加入Socket.IO房间(用于广播)
|
||||
client.join(message.sessionId);
|
||||
|
||||
const duration = Date.now() - startTime;
|
||||
this.logger.log('用户成功加入会话', {
|
||||
operation: 'join_session',
|
||||
socketId: client.id,
|
||||
userId: client.userId,
|
||||
sessionId: message.sessionId,
|
||||
userCount: sessionUsers.length,
|
||||
duration,
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
|
||||
} catch (error) {
|
||||
const duration = Date.now() - startTime;
|
||||
this.logger.error('加入会话失败', {
|
||||
operation: 'join_session',
|
||||
socketId: client.id,
|
||||
userId: client.userId,
|
||||
sessionId: message.sessionId,
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
duration,
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
|
||||
throw new WsException({
|
||||
type: 'error',
|
||||
code: 'JOIN_SESSION_FAILED',
|
||||
message: '加入会话失败',
|
||||
details: {
|
||||
sessionId: message.sessionId,
|
||||
reason: error instanceof Error ? error.message : String(error),
|
||||
},
|
||||
originalMessage: message,
|
||||
timestamp: Date.now(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理离开会话消息
|
||||
*
|
||||
* 技术实现:
|
||||
* 1. 验证用户身份和会话权限
|
||||
* 2. 从会话中移除用户
|
||||
* 3. 清理用户相关数据
|
||||
* 4. 向会话中其他用户广播用户离开通知
|
||||
* 5. 发送离开成功确认
|
||||
*
|
||||
* @param client 已认证的WebSocket客户端
|
||||
* @param message 离开会话消息
|
||||
*/
|
||||
@SubscribeMessage('leave_session')
|
||||
@UseGuards(WebSocketAuthGuard)
|
||||
@UsePipes(new ValidationPipe({ transform: true }))
|
||||
async handleLeaveSession(
|
||||
@ConnectedSocket() client: AuthenticatedSocket,
|
||||
@MessageBody() message: LeaveSessionMessage,
|
||||
) {
|
||||
const startTime = Date.now();
|
||||
|
||||
this.logger.log('处理离开会话请求', {
|
||||
operation: 'leave_session',
|
||||
socketId: client.id,
|
||||
userId: client.userId,
|
||||
sessionId: message.sessionId,
|
||||
reason: message.reason,
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
|
||||
try {
|
||||
// 1. 从会话中移除用户
|
||||
await this.locationBroadcastCore.removeUserFromSession(
|
||||
message.sessionId,
|
||||
client.userId,
|
||||
);
|
||||
|
||||
// 2. 向会话中其他用户广播用户离开通知
|
||||
const userLeftNotification: UserLeftNotification = {
|
||||
type: 'user_left',
|
||||
userId: client.userId,
|
||||
reason: message.reason || 'user_left',
|
||||
sessionId: message.sessionId,
|
||||
timestamp: Date.now(),
|
||||
};
|
||||
|
||||
client.to(message.sessionId).emit('user_left', userLeftNotification);
|
||||
|
||||
// 3. 从Socket.IO房间中移除客户端
|
||||
client.leave(message.sessionId);
|
||||
|
||||
// 4. 发送离开成功确认
|
||||
const successResponse: SuccessResponse = {
|
||||
type: 'success',
|
||||
message: '成功离开会话',
|
||||
operation: 'leave_session',
|
||||
data: {
|
||||
sessionId: message.sessionId,
|
||||
reason: message.reason || 'user_left',
|
||||
},
|
||||
timestamp: Date.now(),
|
||||
};
|
||||
|
||||
client.emit('leave_session_success', successResponse);
|
||||
|
||||
const duration = Date.now() - startTime;
|
||||
this.logger.log('用户成功离开会话', {
|
||||
operation: 'leave_session',
|
||||
socketId: client.id,
|
||||
userId: client.userId,
|
||||
sessionId: message.sessionId,
|
||||
reason: message.reason,
|
||||
duration,
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
|
||||
} catch (error) {
|
||||
const duration = Date.now() - startTime;
|
||||
this.logger.error('离开会话失败', {
|
||||
operation: 'leave_session',
|
||||
socketId: client.id,
|
||||
userId: client.userId,
|
||||
sessionId: message.sessionId,
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
duration,
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
|
||||
throw new WsException({
|
||||
type: 'error',
|
||||
code: 'LEAVE_SESSION_FAILED',
|
||||
message: '离开会话失败',
|
||||
details: {
|
||||
sessionId: message.sessionId,
|
||||
reason: error instanceof Error ? error.message : String(error),
|
||||
},
|
||||
originalMessage: message,
|
||||
timestamp: Date.now(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理位置更新消息
|
||||
*
|
||||
* 技术实现:
|
||||
* 1. 验证位置数据的有效性
|
||||
* 2. 更新用户在Redis中的位置缓存
|
||||
* 3. 获取用户当前所在的会话
|
||||
* 4. 向会话中其他用户广播位置更新
|
||||
* 5. 可选:触发位置数据持久化
|
||||
*
|
||||
* @param client 已认证的WebSocket客户端
|
||||
* @param message 位置更新消息
|
||||
*/
|
||||
@SubscribeMessage('position_update')
|
||||
@UseGuards(WebSocketAuthGuard)
|
||||
@UsePipes(new ValidationPipe({ transform: true }))
|
||||
async handlePositionUpdate(
|
||||
@ConnectedSocket() client: AuthenticatedSocket,
|
||||
@MessageBody() message: PositionUpdateMessage,
|
||||
) {
|
||||
// 开始性能监控
|
||||
const perfContext = this.performanceMonitor.startMonitoring('position_update', client);
|
||||
|
||||
// 检查频率限制
|
||||
const rateLimitAllowed = this.rateLimitMiddleware.checkRateLimit(client.userId, client.id);
|
||||
if (!rateLimitAllowed) {
|
||||
this.rateLimitMiddleware.handleRateLimit(client, client.userId);
|
||||
this.performanceMonitor.endMonitoring(perfContext, false, 'Rate limit exceeded');
|
||||
return;
|
||||
}
|
||||
|
||||
const startTime = Date.now();
|
||||
|
||||
this.logger.debug('处理位置更新请求', {
|
||||
operation: 'position_update',
|
||||
socketId: client.id,
|
||||
userId: client.userId,
|
||||
mapId: message.mapId,
|
||||
x: message.x,
|
||||
y: message.y,
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
|
||||
try {
|
||||
// 1. 构建位置对象
|
||||
const position: Position = {
|
||||
userId: client.userId,
|
||||
x: message.x,
|
||||
y: message.y,
|
||||
mapId: message.mapId,
|
||||
timestamp: message.timestamp || Date.now(),
|
||||
metadata: message.metadata || {},
|
||||
};
|
||||
|
||||
// 2. 更新用户位置
|
||||
await this.locationBroadcastCore.setUserPosition(client.userId, position);
|
||||
|
||||
// 3. 获取用户当前会话(从Redis中获取)
|
||||
// 注意:这里需要从Redis获取用户的会话信息
|
||||
// 暂时使用客户端房间信息作为会话ID
|
||||
const rooms = Array.from(client.rooms);
|
||||
const sessionId = rooms.find(room => room !== client.id); // 排除socket自身的房间
|
||||
|
||||
if (sessionId) {
|
||||
// 4. 向会话中其他用户广播位置更新
|
||||
const positionBroadcast: PositionBroadcast = {
|
||||
type: 'position_broadcast',
|
||||
userId: client.userId,
|
||||
position: {
|
||||
x: position.x,
|
||||
y: position.y,
|
||||
mapId: position.mapId,
|
||||
timestamp: position.timestamp,
|
||||
metadata: position.metadata,
|
||||
},
|
||||
sessionId,
|
||||
timestamp: Date.now(),
|
||||
};
|
||||
|
||||
client.to(sessionId).emit('position_update', positionBroadcast);
|
||||
}
|
||||
|
||||
// 5. 发送位置更新成功确认(可选)
|
||||
const successResponse: SuccessResponse = {
|
||||
type: 'success',
|
||||
message: '位置更新成功',
|
||||
operation: 'position_update',
|
||||
data: {
|
||||
x: position.x,
|
||||
y: position.y,
|
||||
mapId: position.mapId,
|
||||
timestamp: position.timestamp,
|
||||
},
|
||||
timestamp: Date.now(),
|
||||
};
|
||||
|
||||
client.emit('position_update_success', successResponse);
|
||||
|
||||
const duration = Date.now() - startTime;
|
||||
this.logger.debug('位置更新处理完成', {
|
||||
operation: 'position_update',
|
||||
socketId: client.id,
|
||||
userId: client.userId,
|
||||
mapId: message.mapId,
|
||||
sessionId,
|
||||
duration,
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
|
||||
// 结束性能监控
|
||||
this.performanceMonitor.endMonitoring(perfContext, true);
|
||||
|
||||
} catch (error) {
|
||||
const duration = Date.now() - startTime;
|
||||
this.logger.error('位置更新失败', {
|
||||
operation: 'position_update',
|
||||
socketId: client.id,
|
||||
userId: client.userId,
|
||||
mapId: message.mapId,
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
duration,
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
|
||||
// 结束性能监控(失败)
|
||||
this.performanceMonitor.endMonitoring(perfContext, false, error instanceof Error ? error.message : String(error));
|
||||
|
||||
throw new WsException({
|
||||
type: 'error',
|
||||
code: 'POSITION_UPDATE_FAILED',
|
||||
message: '位置更新失败',
|
||||
details: {
|
||||
mapId: message.mapId,
|
||||
reason: error instanceof Error ? error.message : String(error),
|
||||
},
|
||||
originalMessage: message,
|
||||
timestamp: Date.now(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理心跳消息
|
||||
*
|
||||
* 技术实现:
|
||||
* 1. 接收客户端心跳请求
|
||||
* 2. 更新连接活跃时间
|
||||
* 3. 返回服务端时间戳
|
||||
* 4. 重置连接超时计时器
|
||||
*
|
||||
* @param client WebSocket客户端
|
||||
* @param message 心跳消息
|
||||
*/
|
||||
@SubscribeMessage('heartbeat')
|
||||
@UsePipes(new ValidationPipe({ transform: true }))
|
||||
async handleHeartbeat(
|
||||
@ConnectedSocket() client: Socket,
|
||||
@MessageBody() message: HeartbeatMessage,
|
||||
) {
|
||||
this.logger.debug('处理心跳请求', {
|
||||
operation: 'heartbeat',
|
||||
socketId: client.id,
|
||||
clientTimestamp: message.timestamp,
|
||||
sequence: message.sequence,
|
||||
});
|
||||
|
||||
try {
|
||||
// 1. 重置连接超时
|
||||
const timeout = (client as any).connectionTimeout;
|
||||
if (timeout) {
|
||||
clearTimeout(timeout);
|
||||
|
||||
// 重新设置超时
|
||||
const newTimeout = setTimeout(() => {
|
||||
this.logger.warn('客户端连接超时,自动断开', {
|
||||
socketId: client.id,
|
||||
timeout: `${LocationBroadcastGateway.CONNECTION_TIMEOUT_MINUTES}分钟`,
|
||||
});
|
||||
client.disconnect(true);
|
||||
}, LocationBroadcastGateway.CONNECTION_TIMEOUT_MINUTES * LocationBroadcastGateway.MILLISECONDS_PER_MINUTE);
|
||||
|
||||
(client as any).connectionTimeout = newTimeout;
|
||||
}
|
||||
|
||||
// 2. 构建心跳响应
|
||||
const heartbeatResponse: HeartbeatResponse = {
|
||||
type: 'heartbeat_response',
|
||||
clientTimestamp: message.timestamp,
|
||||
serverTimestamp: Date.now(),
|
||||
sequence: message.sequence,
|
||||
};
|
||||
|
||||
// 3. 发送心跳响应
|
||||
client.emit('heartbeat_response', heartbeatResponse);
|
||||
|
||||
} catch (error) {
|
||||
this.logger.error('心跳处理失败', {
|
||||
operation: 'heartbeat',
|
||||
socketId: client.id,
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
});
|
||||
|
||||
// 心跳失败不抛出异常,避免断开连接
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理用户断开连接的清理工作
|
||||
*
|
||||
* 技术实现:
|
||||
* 1. 清理用户在所有会话中的数据
|
||||
* 2. 通知相关会话中的其他用户
|
||||
* 3. 清理Redis中的用户数据
|
||||
* 4. 记录断开连接的统计信息
|
||||
*
|
||||
* @param client 已认证的WebSocket客户端
|
||||
* @param reason 断开原因
|
||||
*/
|
||||
private async handleUserDisconnection(
|
||||
client: AuthenticatedSocket,
|
||||
reason: string,
|
||||
): Promise<void> {
|
||||
try {
|
||||
// 1. 获取用户所在的所有房间(会话)
|
||||
const rooms = Array.from(client.rooms);
|
||||
const sessionIds = rooms.filter(room => room !== client.id);
|
||||
|
||||
// 2. 从所有会话中移除用户并通知其他用户
|
||||
for (const sessionId of sessionIds) {
|
||||
try {
|
||||
// 从会话中移除用户
|
||||
await this.locationBroadcastCore.removeUserFromSession(
|
||||
sessionId,
|
||||
client.userId,
|
||||
);
|
||||
|
||||
// 通知会话中的其他用户
|
||||
const userLeftNotification: UserLeftNotification = {
|
||||
type: 'user_left',
|
||||
userId: client.userId,
|
||||
reason,
|
||||
sessionId,
|
||||
timestamp: Date.now(),
|
||||
};
|
||||
|
||||
client.to(sessionId).emit('user_left', userLeftNotification);
|
||||
|
||||
} catch (error) {
|
||||
this.logger.error('从会话中移除用户失败', {
|
||||
socketId: client.id,
|
||||
userId: client.userId,
|
||||
sessionId,
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// 3. 清理用户的所有数据
|
||||
await this.locationBroadcastCore.cleanupUserData(client.userId);
|
||||
|
||||
this.logger.log('用户断开连接清理完成', {
|
||||
socketId: client.id,
|
||||
userId: client.userId,
|
||||
reason,
|
||||
sessionCount: sessionIds.length,
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
|
||||
} catch (error) {
|
||||
this.logger.error('用户断开连接清理失败', {
|
||||
socketId: client.id,
|
||||
userId: client.userId,
|
||||
reason,
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user