/** * Redis 故障恢复集成测试 * * 功能描述: * - 测试Redis连接中断恢复机制 * - 验证Redis数据丢失恢复能力 * - 测试Redis集群故障转移 * - 确保缓存重建机制正常 * - 验证数据一致性保证 * * 测试场景: * 1. Redis 连接中断恢复 * 2. Redis 数据丢失恢复 * 3. Redis 集群故障转移 * 4. 缓存重建机制 * 5. 数据一致性保证 * * 最近修改: * - 2026-01-08: 文件重命名 - 修正kebab-case为snake_case命名规范 (修改者: moyin) * * @author original * @version 1.0.1 * @since 2025-01-01 * @lastModified 2026-01-08 */ import { Test, TestingModule } from '@nestjs/testing'; import { INestApplication } from '@nestjs/common'; import { ConfigModule } from '@nestjs/config'; import { TypeOrmModule } from '@nestjs/typeorm'; import { RedisModule } from '@nestjs-modules/ioredis'; import Redis from 'ioredis'; import { LocationBroadcastModule } from '../../location_broadcast.module'; import { LocationBroadcastCoreService } from '../../../core/location_broadcast_core/location_broadcast_core.service'; import { LocationPositionService } from '../../services/location_position.service'; import { LocationSessionService } from '../../services/location_session.service'; import { UserProfilesService } from '../../../core/db/user_profiles/user_profiles.service'; import { Position } from '../../../core/location_broadcast_core/position.interface'; import { SessionUser } from '../../../core/location_broadcast_core/session.interface'; describe('Redis Failover Integration Tests', () => { let app: INestApplication; let module: TestingModule; let redis: Redis; let coreService: LocationBroadcastCoreService; let positionService: LocationPositionService; let sessionService: LocationSessionService; let userProfilesService: UserProfilesService; // 测试数据 const testUserId = 'test-user-redis-failover'; const testSessionId = 'test-session-redis-failover'; const testMapId = 'test-map-redis-failover'; const testPosition: Position = { x: 100, y: 200, z: 0, timestamp: Date.now() }; beforeAll(async () => { module = await Test.createTestingModule({ imports: [ ConfigModule.forRoot({ isGlobal: true, envFilePath: ['.env.test', '.env'] }), TypeOrmModule.forRoot({ type: 'mysql', host: process.env.DB_HOST || 'localhost', port: parseInt(process.env.DB_PORT) || 3306, username: process.env.DB_USERNAME || 'root', password: process.env.DB_PASSWORD || '', database: process.env.DB_DATABASE || 'test_db', entities: [__dirname + '/../../../**/*.entity{.ts,.js}'], synchronize: true, dropSchema: true }), RedisModule.forRoot({ type: 'single', url: process.env.REDIS_URL || 'redis://localhost:6379', options: { retryDelayOnFailover: 100, maxRetriesPerRequest: 3, lazyConnect: true } }), LocationBroadcastModule ] }).compile(); app = module.createNestApplication(); await app.init(); // 获取服务实例 coreService = module.get(LocationBroadcastCoreService); positionService = module.get(LocationPositionService); sessionService = module.get(LocationSessionService); userProfilesService = module.get(UserProfilesService); redis = module.get('default_IORedisModuleConnectionToken'); // 确保 Redis 连接 await redis.ping(); }); afterAll(async () => { // 清理测试数据 await redis.flushall(); await app.close(); }); beforeEach(async () => { // 清理 Redis 数据 await redis.flushall(); // 创建测试用户 await userProfilesService.createUserProfile({ user_id: testUserId, username: 'test-user-redis', email: 'test-redis@example.com', created_at: new Date(), updated_at: new Date() }); }); afterEach(async () => { // 清理测试数据 try { await sessionService.leaveSession(testUserId, testSessionId); } catch (error) { // 忽略清理错误 } try { await userProfilesService.deleteUserProfile(testUserId); } catch (error) { // 忽略清理错误 } }); describe('Redis 连接故障恢复', () => { it('应该在 Redis 连接中断后自动重连', async () => { // 1. 正常操作 - 加入会话 await sessionService.joinSession(testUserId, testSessionId, testMapId); // 验证会话存在 const users = await sessionService.getSessionUsers(testSessionId); expect(users).toHaveLength(1); expect(users[0].userId).toBe(testUserId); // 2. 模拟 Redis 连接中断 await redis.disconnect(); // 3. 尝试操作 - 应该失败 await expect( sessionService.getSessionUsers(testSessionId) ).rejects.toThrow(); // 4. 重新连接 Redis await redis.connect(); await redis.ping(); // 5. 验证服务恢复 - 数据可能丢失,但操作应该正常 const newUsers = await sessionService.getSessionUsers(testSessionId); // Redis 重启后数据丢失是正常的 expect(Array.isArray(newUsers)).toBe(true); // 6. 重新加入会话应该正常工作 await sessionService.joinSession(testUserId, testSessionId, testMapId); const recoveredUsers = await sessionService.getSessionUsers(testSessionId); expect(recoveredUsers).toHaveLength(1); expect(recoveredUsers[0].userId).toBe(testUserId); }); it('应该在 Redis 重启后重建缓存数据', async () => { // 1. 建立初始状态 await sessionService.joinSession(testUserId, testSessionId, testMapId); await positionService.updatePosition(testUserId, testSessionId, testPosition); // 验证数据存在 const initialPosition = await positionService.getUserPosition(testUserId, testSessionId); expect(initialPosition).toBeDefined(); expect(initialPosition.x).toBe(testPosition.x); // 2. 模拟 Redis 重启 (清空所有数据) await redis.flushall(); // 3. 验证缓存数据丢失 const lostPosition = await positionService.getUserPosition(testUserId, testSessionId); expect(lostPosition).toBeNull(); // 4. 重新加入会话和更新位置 await sessionService.joinSession(testUserId, testSessionId, testMapId); await positionService.updatePosition(testUserId, testSessionId, testPosition); // 5. 验证数据重建成功 const rebuiltPosition = await positionService.getUserPosition(testUserId, testSessionId); expect(rebuiltPosition).toBeDefined(); expect(rebuiltPosition.x).toBe(testPosition.x); expect(rebuiltPosition.y).toBe(testPosition.y); }); }); describe('Redis 数据一致性恢复', () => { it('应该处理部分数据丢失的情况', async () => { // 1. 建立完整的会话状态 await sessionService.joinSession(testUserId, testSessionId, testMapId); await positionService.updatePosition(testUserId, testSessionId, testPosition); // 验证完整状态 const users = await sessionService.getSessionUsers(testSessionId); const position = await positionService.getUserPosition(testUserId, testSessionId); expect(users).toHaveLength(1); expect(position).toBeDefined(); // 2. 模拟部分数据丢失 - 只删除位置数据 const positionKey = `position:${testSessionId}:${testUserId}`; await redis.del(positionKey); // 3. 验证部分数据丢失 const remainingUsers = await sessionService.getSessionUsers(testSessionId); const lostPosition = await positionService.getUserPosition(testUserId, testSessionId); expect(remainingUsers).toHaveLength(1); // 会话数据仍存在 expect(lostPosition).toBeNull(); // 位置数据丢失 // 4. 重新更新位置应该正常工作 const newPosition: Position = { x: 150, y: 250, z: 0, timestamp: Date.now() }; await positionService.updatePosition(testUserId, testSessionId, newPosition); // 5. 验证数据恢复 const recoveredPosition = await positionService.getUserPosition(testUserId, testSessionId); expect(recoveredPosition).toBeDefined(); expect(recoveredPosition.x).toBe(newPosition.x); expect(recoveredPosition.y).toBe(newPosition.y); }); it('应该处理会话数据不一致的情况', async () => { // 1. 建立会话 await sessionService.joinSession(testUserId, testSessionId, testMapId); // 2. 模拟数据不一致 - 手动添加不存在的用户到会话 const fakeUserId = 'fake-user-id'; const sessionKey = `session:${testSessionId}:users`; await redis.sadd(sessionKey, fakeUserId); // 3. 获取会话用户 - 应该包含假用户 const usersWithFake = await sessionService.getSessionUsers(testSessionId); expect(usersWithFake.length).toBeGreaterThan(1); // 4. 尝试获取假用户的位置 - 应该返回 null const fakePosition = await positionService.getUserPosition(fakeUserId, testSessionId); expect(fakePosition).toBeNull(); // 5. 清理不一致数据 - 重新加入会话应该修复状态 await sessionService.leaveSession(testUserId, testSessionId); await sessionService.joinSession(testUserId, testSessionId, testMapId); // 6. 验证数据一致性恢复 const cleanUsers = await sessionService.getSessionUsers(testSessionId); expect(cleanUsers).toHaveLength(1); expect(cleanUsers[0].userId).toBe(testUserId); }); }); describe('Redis 性能降级处理', () => { it('应该在 Redis 响应缓慢时使用降级策略', async () => { // 1. 正常操作基准测试 const startTime = Date.now(); await sessionService.joinSession(testUserId, testSessionId, testMapId); const normalTime = Date.now() - startTime; // 2. 模拟 Redis 响应缓慢 - 添加延迟 const originalGet = redis.get.bind(redis); redis.get = async (key: string) => { await new Promise(resolve => setTimeout(resolve, 200)); // 200ms 延迟 return originalGet(key); }; // 3. 测试降级响应时间 const slowStartTime = Date.now(); try { await positionService.getUserPosition(testUserId, testSessionId); } catch (error) { // 可能因为超时而失败,这是预期的 } const slowTime = Date.now() - slowStartTime; // 4. 恢复正常 Redis 操作 redis.get = originalGet; // 5. 验证系统仍然可用 await positionService.updatePosition(testUserId, testSessionId, testPosition); const position = await positionService.getUserPosition(testUserId, testSessionId); expect(position).toBeDefined(); // 记录性能数据 console.log(`Normal operation time: ${normalTime}ms`); console.log(`Slow operation time: ${slowTime}ms`); }); it('应该在 Redis 内存不足时处理错误', async () => { // 1. 建立基础会话 await sessionService.joinSession(testUserId, testSessionId, testMapId); // 2. 模拟内存不足错误 const originalSet = redis.set.bind(redis); redis.set = async () => { throw new Error('OOM command not allowed when used memory > maxmemory'); }; // 3. 尝试更新位置 - 应该处理错误 await expect( positionService.updatePosition(testUserId, testSessionId, testPosition) ).rejects.toThrow(); // 4. 恢复正常操作 redis.set = originalSet; // 5. 验证系统恢复正常 await positionService.updatePosition(testUserId, testSessionId, testPosition); const position = await positionService.getUserPosition(testUserId, testSessionId); expect(position).toBeDefined(); expect(position.x).toBe(testPosition.x); }); }); describe('Redis 集群故障转移', () => { it('应该处理 Redis 节点故障', async () => { // 注意: 这个测试需要 Redis 集群环境,在单节点环境中跳过 if (process.env.REDIS_CLUSTER !== 'true') { console.log('Skipping cluster failover test - single node Redis'); return; } // 1. 建立会话和位置数据 await sessionService.joinSession(testUserId, testSessionId, testMapId); await positionService.updatePosition(testUserId, testSessionId, testPosition); // 2. 验证数据存在 const users = await sessionService.getSessionUsers(testSessionId); const position = await positionService.getUserPosition(testUserId, testSessionId); expect(users).toHaveLength(1); expect(position).toBeDefined(); // 3. 模拟节点故障 - 在实际集群环境中需要外部工具 // 这里只能模拟连接错误 const originalPing = redis.ping.bind(redis); redis.ping = async () => { throw new Error('Connection lost'); }; // 4. 验证故障检测 await expect(redis.ping()).rejects.toThrow('Connection lost'); // 5. 恢复连接 redis.ping = originalPing; await redis.ping(); // 6. 验证数据仍然可访问 const recoveredUsers = await sessionService.getSessionUsers(testSessionId); const recoveredPosition = await positionService.getUserPosition(testUserId, testSessionId); expect(recoveredUsers).toHaveLength(1); expect(recoveredPosition).toBeDefined(); }); }); describe('缓存预热和重建', () => { it('应该支持缓存预热机制', async () => { // 1. 清空 Redis 缓存 await redis.flushall(); // 2. 在数据库中创建用户档案数据 await userProfilesService.updateUserProfile(testUserId, { last_position_update: new Date(), last_position_x: testPosition.x, last_position_y: testPosition.y, last_position_z: testPosition.z }); // 3. 验证 Redis 中没有缓存数据 const cachedPosition = await positionService.getUserPosition(testUserId, testSessionId); expect(cachedPosition).toBeNull(); // 4. 触发缓存预热 - 通过正常操作 await sessionService.joinSession(testUserId, testSessionId, testMapId); await positionService.updatePosition(testUserId, testSessionId, testPosition); // 5. 验证缓存已建立 const warmedPosition = await positionService.getUserPosition(testUserId, testSessionId); expect(warmedPosition).toBeDefined(); expect(warmedPosition.x).toBe(testPosition.x); expect(warmedPosition.y).toBe(testPosition.y); }); it('应该支持批量缓存重建', async () => { const userIds = ['user1', 'user2', 'user3']; // 1. 为多个用户建立会话 for (const userId of userIds) { await userProfilesService.createUserProfile({ user_id: userId, username: `user-${userId}`, email: `${userId}@example.com`, created_at: new Date(), updated_at: new Date() }); await sessionService.joinSession(userId, testSessionId, testMapId); await positionService.updatePosition(userId, testSessionId, { x: Math.random() * 1000, y: Math.random() * 1000, z: 0, timestamp: Date.now() }); } // 2. 验证所有用户都在会话中 const allUsers = await sessionService.getSessionUsers(testSessionId); expect(allUsers).toHaveLength(userIds.length); // 3. 清空 Redis 缓存 await redis.flushall(); // 4. 验证缓存数据丢失 const emptyUsers = await sessionService.getSessionUsers(testSessionId); expect(emptyUsers).toHaveLength(0); // 5. 重建缓存 - 重新加入会话 for (const userId of userIds) { await sessionService.joinSession(userId, testSessionId, testMapId); } // 6. 验证缓存重建成功 const rebuiltUsers = await sessionService.getSessionUsers(testSessionId); expect(rebuiltUsers).toHaveLength(userIds.length); // 清理测试数据 for (const userId of userIds) { await sessionService.leaveSession(userId, testSessionId); await userProfilesService.deleteUserProfile(userId); } }); }); });