Files
whale-town-end/test/location_broadcast/redis_failover.integration_spec.ts
moyin 71bc317c57 test:添加位置广播系统端到端测试
- 添加并发用户测试场景
- 实现数据库恢复集成测试
- 重命名登录测试文件以符合命名规范
2026-01-08 23:06:11 +08:00

449 lines
16 KiB
TypeScript

/**
* Redis 故障恢复集成测试
*
* 功能描述:
* - 测试Redis连接中断恢复机制
* - 验证Redis数据丢失恢复能力
* - 测试Redis集群故障转移
* - 确保缓存重建机制正常
* - 验证数据一致性保证
*
* 测试场景:
* 1. Redis 连接中断恢复
* 2. Redis 数据丢失恢复
* 3. Redis 集群故障转移
* 4. 缓存重建机制
* 5. 数据一致性保证
*
* 最近修改:
* - 2026-01-08: 文件重命名 - 修正kebab-case为snake_case命名规范 (修改者: moyin)
*
* @author original
* @version 1.0.1
* @since 2025-01-01
* @lastModified 2026-01-08
*/
import { Test, TestingModule } from '@nestjs/testing';
import { INestApplication } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { TypeOrmModule } from '@nestjs/typeorm';
import { RedisModule } from '@nestjs-modules/ioredis';
import Redis from 'ioredis';
import { LocationBroadcastModule } from '../../location_broadcast.module';
import { LocationBroadcastCoreService } from '../../../core/location_broadcast_core/location_broadcast_core.service';
import { LocationPositionService } from '../../services/location_position.service';
import { LocationSessionService } from '../../services/location_session.service';
import { UserProfilesService } from '../../../core/db/user_profiles/user_profiles.service';
import { Position } from '../../../core/location_broadcast_core/position.interface';
import { SessionUser } from '../../../core/location_broadcast_core/session.interface';
describe('Redis Failover Integration Tests', () => {
let app: INestApplication;
let module: TestingModule;
let redis: Redis;
let coreService: LocationBroadcastCoreService;
let positionService: LocationPositionService;
let sessionService: LocationSessionService;
let userProfilesService: UserProfilesService;
// 测试数据
const testUserId = 'test-user-redis-failover';
const testSessionId = 'test-session-redis-failover';
const testMapId = 'test-map-redis-failover';
const testPosition: Position = {
x: 100,
y: 200,
z: 0,
timestamp: Date.now()
};
beforeAll(async () => {
module = await Test.createTestingModule({
imports: [
ConfigModule.forRoot({
isGlobal: true,
envFilePath: ['.env.test', '.env']
}),
TypeOrmModule.forRoot({
type: 'mysql',
host: process.env.DB_HOST || 'localhost',
port: parseInt(process.env.DB_PORT) || 3306,
username: process.env.DB_USERNAME || 'root',
password: process.env.DB_PASSWORD || '',
database: process.env.DB_DATABASE || 'test_db',
entities: [__dirname + '/../../../**/*.entity{.ts,.js}'],
synchronize: true,
dropSchema: true
}),
RedisModule.forRoot({
type: 'single',
url: process.env.REDIS_URL || 'redis://localhost:6379',
options: {
retryDelayOnFailover: 100,
maxRetriesPerRequest: 3,
lazyConnect: true
}
}),
LocationBroadcastModule
]
}).compile();
app = module.createNestApplication();
await app.init();
// 获取服务实例
coreService = module.get<LocationBroadcastCoreService>(LocationBroadcastCoreService);
positionService = module.get<LocationPositionService>(LocationPositionService);
sessionService = module.get<LocationSessionService>(LocationSessionService);
userProfilesService = module.get<UserProfilesService>(UserProfilesService);
redis = module.get<Redis>('default_IORedisModuleConnectionToken');
// 确保 Redis 连接
await redis.ping();
});
afterAll(async () => {
// 清理测试数据
await redis.flushall();
await app.close();
});
beforeEach(async () => {
// 清理 Redis 数据
await redis.flushall();
// 创建测试用户
await userProfilesService.createUserProfile({
user_id: testUserId,
username: 'test-user-redis',
email: 'test-redis@example.com',
created_at: new Date(),
updated_at: new Date()
});
});
afterEach(async () => {
// 清理测试数据
try {
await sessionService.leaveSession(testUserId, testSessionId);
} catch (error) {
// 忽略清理错误
}
try {
await userProfilesService.deleteUserProfile(testUserId);
} catch (error) {
// 忽略清理错误
}
});
describe('Redis 连接故障恢复', () => {
it('应该在 Redis 连接中断后自动重连', async () => {
// 1. 正常操作 - 加入会话
await sessionService.joinSession(testUserId, testSessionId, testMapId);
// 验证会话存在
const users = await sessionService.getSessionUsers(testSessionId);
expect(users).toHaveLength(1);
expect(users[0].userId).toBe(testUserId);
// 2. 模拟 Redis 连接中断
await redis.disconnect();
// 3. 尝试操作 - 应该失败
await expect(
sessionService.getSessionUsers(testSessionId)
).rejects.toThrow();
// 4. 重新连接 Redis
await redis.connect();
await redis.ping();
// 5. 验证服务恢复 - 数据可能丢失,但操作应该正常
const newUsers = await sessionService.getSessionUsers(testSessionId);
// Redis 重启后数据丢失是正常的
expect(Array.isArray(newUsers)).toBe(true);
// 6. 重新加入会话应该正常工作
await sessionService.joinSession(testUserId, testSessionId, testMapId);
const recoveredUsers = await sessionService.getSessionUsers(testSessionId);
expect(recoveredUsers).toHaveLength(1);
expect(recoveredUsers[0].userId).toBe(testUserId);
});
it('应该在 Redis 重启后重建缓存数据', async () => {
// 1. 建立初始状态
await sessionService.joinSession(testUserId, testSessionId, testMapId);
await positionService.updatePosition(testUserId, testSessionId, testPosition);
// 验证数据存在
const initialPosition = await positionService.getUserPosition(testUserId, testSessionId);
expect(initialPosition).toBeDefined();
expect(initialPosition.x).toBe(testPosition.x);
// 2. 模拟 Redis 重启 (清空所有数据)
await redis.flushall();
// 3. 验证缓存数据丢失
const lostPosition = await positionService.getUserPosition(testUserId, testSessionId);
expect(lostPosition).toBeNull();
// 4. 重新加入会话和更新位置
await sessionService.joinSession(testUserId, testSessionId, testMapId);
await positionService.updatePosition(testUserId, testSessionId, testPosition);
// 5. 验证数据重建成功
const rebuiltPosition = await positionService.getUserPosition(testUserId, testSessionId);
expect(rebuiltPosition).toBeDefined();
expect(rebuiltPosition.x).toBe(testPosition.x);
expect(rebuiltPosition.y).toBe(testPosition.y);
});
});
describe('Redis 数据一致性恢复', () => {
it('应该处理部分数据丢失的情况', async () => {
// 1. 建立完整的会话状态
await sessionService.joinSession(testUserId, testSessionId, testMapId);
await positionService.updatePosition(testUserId, testSessionId, testPosition);
// 验证完整状态
const users = await sessionService.getSessionUsers(testSessionId);
const position = await positionService.getUserPosition(testUserId, testSessionId);
expect(users).toHaveLength(1);
expect(position).toBeDefined();
// 2. 模拟部分数据丢失 - 只删除位置数据
const positionKey = `position:${testSessionId}:${testUserId}`;
await redis.del(positionKey);
// 3. 验证部分数据丢失
const remainingUsers = await sessionService.getSessionUsers(testSessionId);
const lostPosition = await positionService.getUserPosition(testUserId, testSessionId);
expect(remainingUsers).toHaveLength(1); // 会话数据仍存在
expect(lostPosition).toBeNull(); // 位置数据丢失
// 4. 重新更新位置应该正常工作
const newPosition: Position = {
x: 150,
y: 250,
z: 0,
timestamp: Date.now()
};
await positionService.updatePosition(testUserId, testSessionId, newPosition);
// 5. 验证数据恢复
const recoveredPosition = await positionService.getUserPosition(testUserId, testSessionId);
expect(recoveredPosition).toBeDefined();
expect(recoveredPosition.x).toBe(newPosition.x);
expect(recoveredPosition.y).toBe(newPosition.y);
});
it('应该处理会话数据不一致的情况', async () => {
// 1. 建立会话
await sessionService.joinSession(testUserId, testSessionId, testMapId);
// 2. 模拟数据不一致 - 手动添加不存在的用户到会话
const fakeUserId = 'fake-user-id';
const sessionKey = `session:${testSessionId}:users`;
await redis.sadd(sessionKey, fakeUserId);
// 3. 获取会话用户 - 应该包含假用户
const usersWithFake = await sessionService.getSessionUsers(testSessionId);
expect(usersWithFake.length).toBeGreaterThan(1);
// 4. 尝试获取假用户的位置 - 应该返回 null
const fakePosition = await positionService.getUserPosition(fakeUserId, testSessionId);
expect(fakePosition).toBeNull();
// 5. 清理不一致数据 - 重新加入会话应该修复状态
await sessionService.leaveSession(testUserId, testSessionId);
await sessionService.joinSession(testUserId, testSessionId, testMapId);
// 6. 验证数据一致性恢复
const cleanUsers = await sessionService.getSessionUsers(testSessionId);
expect(cleanUsers).toHaveLength(1);
expect(cleanUsers[0].userId).toBe(testUserId);
});
});
describe('Redis 性能降级处理', () => {
it('应该在 Redis 响应缓慢时使用降级策略', async () => {
// 1. 正常操作基准测试
const startTime = Date.now();
await sessionService.joinSession(testUserId, testSessionId, testMapId);
const normalTime = Date.now() - startTime;
// 2. 模拟 Redis 响应缓慢 - 添加延迟
const originalGet = redis.get.bind(redis);
redis.get = async (key: string) => {
await new Promise(resolve => setTimeout(resolve, 200)); // 200ms 延迟
return originalGet(key);
};
// 3. 测试降级响应时间
const slowStartTime = Date.now();
try {
await positionService.getUserPosition(testUserId, testSessionId);
} catch (error) {
// 可能因为超时而失败,这是预期的
}
const slowTime = Date.now() - slowStartTime;
// 4. 恢复正常 Redis 操作
redis.get = originalGet;
// 5. 验证系统仍然可用
await positionService.updatePosition(testUserId, testSessionId, testPosition);
const position = await positionService.getUserPosition(testUserId, testSessionId);
expect(position).toBeDefined();
// 记录性能数据
console.log(`Normal operation time: ${normalTime}ms`);
console.log(`Slow operation time: ${slowTime}ms`);
});
it('应该在 Redis 内存不足时处理错误', async () => {
// 1. 建立基础会话
await sessionService.joinSession(testUserId, testSessionId, testMapId);
// 2. 模拟内存不足错误
const originalSet = redis.set.bind(redis);
redis.set = async () => {
throw new Error('OOM command not allowed when used memory > maxmemory');
};
// 3. 尝试更新位置 - 应该处理错误
await expect(
positionService.updatePosition(testUserId, testSessionId, testPosition)
).rejects.toThrow();
// 4. 恢复正常操作
redis.set = originalSet;
// 5. 验证系统恢复正常
await positionService.updatePosition(testUserId, testSessionId, testPosition);
const position = await positionService.getUserPosition(testUserId, testSessionId);
expect(position).toBeDefined();
expect(position.x).toBe(testPosition.x);
});
});
describe('Redis 集群故障转移', () => {
it('应该处理 Redis 节点故障', async () => {
// 注意: 这个测试需要 Redis 集群环境,在单节点环境中跳过
if (process.env.REDIS_CLUSTER !== 'true') {
console.log('Skipping cluster failover test - single node Redis');
return;
}
// 1. 建立会话和位置数据
await sessionService.joinSession(testUserId, testSessionId, testMapId);
await positionService.updatePosition(testUserId, testSessionId, testPosition);
// 2. 验证数据存在
const users = await sessionService.getSessionUsers(testSessionId);
const position = await positionService.getUserPosition(testUserId, testSessionId);
expect(users).toHaveLength(1);
expect(position).toBeDefined();
// 3. 模拟节点故障 - 在实际集群环境中需要外部工具
// 这里只能模拟连接错误
const originalPing = redis.ping.bind(redis);
redis.ping = async () => {
throw new Error('Connection lost');
};
// 4. 验证故障检测
await expect(redis.ping()).rejects.toThrow('Connection lost');
// 5. 恢复连接
redis.ping = originalPing;
await redis.ping();
// 6. 验证数据仍然可访问
const recoveredUsers = await sessionService.getSessionUsers(testSessionId);
const recoveredPosition = await positionService.getUserPosition(testUserId, testSessionId);
expect(recoveredUsers).toHaveLength(1);
expect(recoveredPosition).toBeDefined();
});
});
describe('缓存预热和重建', () => {
it('应该支持缓存预热机制', async () => {
// 1. 清空 Redis 缓存
await redis.flushall();
// 2. 在数据库中创建用户档案数据
await userProfilesService.updateUserProfile(testUserId, {
last_position_update: new Date(),
last_position_x: testPosition.x,
last_position_y: testPosition.y,
last_position_z: testPosition.z
});
// 3. 验证 Redis 中没有缓存数据
const cachedPosition = await positionService.getUserPosition(testUserId, testSessionId);
expect(cachedPosition).toBeNull();
// 4. 触发缓存预热 - 通过正常操作
await sessionService.joinSession(testUserId, testSessionId, testMapId);
await positionService.updatePosition(testUserId, testSessionId, testPosition);
// 5. 验证缓存已建立
const warmedPosition = await positionService.getUserPosition(testUserId, testSessionId);
expect(warmedPosition).toBeDefined();
expect(warmedPosition.x).toBe(testPosition.x);
expect(warmedPosition.y).toBe(testPosition.y);
});
it('应该支持批量缓存重建', async () => {
const userIds = ['user1', 'user2', 'user3'];
// 1. 为多个用户建立会话
for (const userId of userIds) {
await userProfilesService.createUserProfile({
user_id: userId,
username: `user-${userId}`,
email: `${userId}@example.com`,
created_at: new Date(),
updated_at: new Date()
});
await sessionService.joinSession(userId, testSessionId, testMapId);
await positionService.updatePosition(userId, testSessionId, {
x: Math.random() * 1000,
y: Math.random() * 1000,
z: 0,
timestamp: Date.now()
});
}
// 2. 验证所有用户都在会话中
const allUsers = await sessionService.getSessionUsers(testSessionId);
expect(allUsers).toHaveLength(userIds.length);
// 3. 清空 Redis 缓存
await redis.flushall();
// 4. 验证缓存数据丢失
const emptyUsers = await sessionService.getSessionUsers(testSessionId);
expect(emptyUsers).toHaveLength(0);
// 5. 重建缓存 - 重新加入会话
for (const userId of userIds) {
await sessionService.joinSession(userId, testSessionId, testMapId);
}
// 6. 验证缓存重建成功
const rebuiltUsers = await sessionService.getSessionUsers(testSessionId);
expect(rebuiltUsers).toHaveLength(userIds.length);
// 清理测试数据
for (const userId of userIds) {
await sessionService.leaveSession(userId, testSessionId);
await userProfilesService.deleteUserProfile(userId);
}
});
});
});