Files
whale-town-end/test/location_broadcast/database_recovery.integration_spec.ts
moyin 71bc317c57 test:添加位置广播系统端到端测试
- 添加并发用户测试场景
- 实现数据库恢复集成测试
- 重命名登录测试文件以符合命名规范
2026-01-08 23:06:11 +08:00

590 lines
19 KiB
TypeScript
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
/**
* 数据库故障恢复集成测试
*
* 功能描述:
* - 测试数据库连接中断恢复机制
* - 验证数据库事务回滚处理
* - 测试数据库死锁恢复能力
* - 确保数据一致性保证
* - 验证数据库连接池管理
*
* 测试场景:
* 1. 数据库连接中断恢复
* 2. 数据库事务回滚处理
* 3. 数据库死锁恢复
* 4. 数据一致性保证
* 5. 数据库连接池管理
*
* 最近修改:
* - 2026-01-08: 文件重命名 - 修正kebab-case为snake_case命名规范 (修改者: moyin)
* - 2026-01-08: 修复导入路径和方法调用,适配实际的服务接口 (修改者: moyin)
*
* @author original
* @version 1.0.2
* @since 2025-01-01
* @lastModified 2026-01-08
*/
import { Test, TestingModule } from '@nestjs/testing';
import { INestApplication } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { TypeOrmModule, getRepositoryToken } from '@nestjs/typeorm';
import { Repository, DataSource } from 'typeorm';
import Redis from 'ioredis';
import { LocationBroadcastModule } from '../../src/business/location_broadcast/location_broadcast.module';
import { UserProfiles } from '../../src/core/db/user_profiles/user_profiles.entity';
import { UserProfilesService } from '../../src/core/db/user_profiles/user_profiles.service';
import { LocationPositionService } from '../../src/business/location_broadcast/services/location_position.service';
import { LocationSessionService } from '../../src/business/location_broadcast/services/location_session.service';
import { CreateUserProfileDto, UpdatePositionDto } from '../../src/core/db/user_profiles/user_profiles.dto';
describe('Database Recovery Integration Tests', () => {
let app: INestApplication;
let module: TestingModule;
let dataSource: DataSource;
let redis: Redis;
let userProfilesRepository: Repository<UserProfiles>;
let userProfilesService: UserProfilesService;
let positionService: LocationPositionService;
let sessionService: LocationSessionService;
// 测试数据
const testUserId = BigInt(999999);
const testSessionId = 'test-session-db-recovery';
const testMapId = 'test-map-db-recovery';
beforeAll(async () => {
// 创建 Redis 实例
redis = new Redis(process.env.REDIS_URL || 'redis://localhost:6379');
module = await Test.createTestingModule({
imports: [
ConfigModule.forRoot({
isGlobal: true,
envFilePath: ['.env.test', '.env']
}),
TypeOrmModule.forRoot({
type: 'mysql',
host: process.env.DB_HOST || 'localhost',
port: parseInt(process.env.DB_PORT || '3306', 10),
username: process.env.DB_USERNAME || 'root',
password: process.env.DB_PASSWORD || '',
database: process.env.DB_DATABASE || 'test_db',
entities: [__dirname + '/../../src/**/*.entity{.ts,.js}'],
synchronize: true,
dropSchema: true,
extra: {
connectionLimit: 10,
acquireTimeout: 60000,
timeout: 60000
}
}),
LocationBroadcastModule
]
})
.overrideProvider('default_IORedisModuleConnectionToken')
.useValue(redis)
.compile();
app = module.createNestApplication();
await app.init();
// 获取服务实例
dataSource = module.get<DataSource>(DataSource);
userProfilesRepository = module.get<Repository<UserProfiles>>(
getRepositoryToken(UserProfiles)
);
userProfilesService = module.get<UserProfilesService>(UserProfilesService);
positionService = module.get<LocationPositionService>(LocationPositionService);
sessionService = module.get<LocationSessionService>(LocationSessionService);
// 确保连接正常
await dataSource.query('SELECT 1');
await redis.ping();
});
afterAll(async () => {
await redis.flushall();
await redis.disconnect();
await app.close();
});
beforeEach(async () => {
// 清理数据
await redis.flushall();
await userProfilesRepository.delete({});
// 创建测试用户
const createDto: CreateUserProfileDto = {
user_id: testUserId,
bio: 'test user for db recovery',
current_map: 'plaza',
pos_x: 0,
pos_y: 0,
status: 1
};
await userProfilesService.create(createDto);
});
afterEach(async () => {
// 清理测试数据
try {
await userProfilesRepository.delete({ user_id: testUserId });
} catch (error) {
// 忽略清理错误
}
});
describe('数据库连接故障恢复', () => {
it('应该处理数据库连接超时', async () => {
// 1. 正常操作
const updateDto: UpdatePositionDto = {
pos_x: 300,
pos_y: 400,
current_map: testMapId
};
await userProfilesService.updatePosition(testUserId, updateDto);
// 验证数据存在
const profile = await userProfilesService.findByUserId(testUserId);
expect(profile).toBeDefined();
expect(profile?.pos_x).toBe(300);
// 2. 模拟数据库连接超时
const originalQuery = dataSource.query.bind(dataSource);
dataSource.query = async () => {
throw new Error('Connection timeout');
};
// 3. 尝试数据库操作 - 应该失败
await expect(
userProfilesService.findByUserId(testUserId)
).rejects.toThrow();
// 4. 恢复连接
dataSource.query = originalQuery;
// 5. 验证连接恢复后操作正常
const recoveredProfile = await userProfilesService.findByUserId(testUserId);
expect(recoveredProfile).toBeDefined();
expect(recoveredProfile?.user_id).toBe(testUserId);
});
it('应该处理数据库连接池耗尽', async () => {
// 1. 创建多个并发连接来耗尽连接池
const concurrentOperations: Promise<any>[] = [];
for (let i = 0; i < 15; i++) { // 超过连接池限制 (10)
const createDto: CreateUserProfileDto = {
user_id: BigInt(i + 1000),
bio: `concurrent user ${i}`,
current_map: 'plaza',
pos_x: 0,
pos_y: 0,
status: 1
};
concurrentOperations.push(
userProfilesService.create(createDto).catch(error => {
// 捕获可能的错误,让 Promise.allSettled 处理
throw error;
})
);
}
// 2. 执行并发操作 - 部分可能因连接池耗尽而失败
const results = await Promise.allSettled(concurrentOperations);
// 3. 统计成功和失败的操作
const successful = results.filter(r => r.status === 'fulfilled').length;
const failed = results.filter(r => r.status === 'rejected').length;
console.log(`Successful operations: ${successful}, Failed: ${failed}`);
// 4. 等待连接释放
await new Promise(resolve => setTimeout(resolve, 1000));
// 5. 验证系统恢复正常
const testProfile = await userProfilesService.findByUserId(testUserId);
expect(testProfile).toBeDefined();
// 清理并发创建的用户
for (let i = 0; i < 15; i++) {
try {
await userProfilesRepository.delete({ user_id: BigInt(i + 1000) });
} catch (error) {
// 忽略清理错误
}
}
});
});
describe('数据库事务处理', () => {
it('应该处理事务回滚', async () => {
// 1. 开始事务
const queryRunner = dataSource.createQueryRunner();
await queryRunner.connect();
await queryRunner.startTransaction();
try {
// 2. 在事务中更新用户档案
await queryRunner.manager.update(UserProfiles,
{ user_id: testUserId },
{
pos_x: 300,
pos_y: 400,
last_position_update: new Date()
}
);
// 3. 验证事务中的数据
const transactionProfile = await queryRunner.manager.findOne(UserProfiles, {
where: { user_id: testUserId }
});
if (transactionProfile) {
expect(transactionProfile.pos_x).toBe(300);
}
// 4. 模拟错误导致回滚
throw new Error('Simulated transaction error');
} catch (error) {
// 5. 回滚事务
await queryRunner.rollbackTransaction();
} finally {
await queryRunner.release();
}
// 6. 验证数据已回滚
const rolledBackProfile = await userProfilesService.findByUserId(testUserId);
expect(rolledBackProfile?.pos_x).toBe(0); // 应该回到初始值
expect(rolledBackProfile?.pos_y).toBe(0);
});
it('应该处理并发事务冲突', async () => {
// 1. 创建两个并发事务
const transaction1 = dataSource.createQueryRunner();
const transaction2 = dataSource.createQueryRunner();
await transaction1.connect();
await transaction2.connect();
await transaction1.startTransaction();
await transaction2.startTransaction();
try {
// 2. 两个事务同时更新同一用户
const updatePromise1 = transaction1.manager.update(UserProfiles,
{ user_id: testUserId },
{
pos_x: 100,
last_position_update: new Date()
}
);
const updatePromise2 = transaction2.manager.update(UserProfiles,
{ user_id: testUserId },
{
pos_x: 200,
last_position_update: new Date()
}
);
// 3. 等待两个更新完成
await Promise.all([updatePromise1, updatePromise2]);
// 4. 提交事务
await transaction1.commitTransaction();
await transaction2.commitTransaction();
} catch (error) {
// 处理死锁或冲突
await transaction1.rollbackTransaction();
await transaction2.rollbackTransaction();
console.log('Transaction conflict handled:', (error as Error).message);
} finally {
await transaction1.release();
await transaction2.release();
}
// 5. 验证最终状态一致
const finalProfile = await userProfilesService.findByUserId(testUserId);
expect(finalProfile).toBeDefined();
// 最终值应该是其中一个事务的结果
expect([100, 200, 0]).toContain(finalProfile?.pos_x);
});
});
describe('数据一致性保证', () => {
it('应该保证 Redis 和数据库数据一致性', async () => {
// 1. 通过服务更新位置 (应该同时更新 Redis 和数据库)
const updateDto: UpdatePositionDto = {
pos_x: 300,
pos_y: 400,
current_map: testMapId
};
await userProfilesService.updatePosition(testUserId, updateDto);
// 2. 验证数据库中的数据
const dbProfile = await userProfilesService.findByUserId(testUserId);
expect(dbProfile?.pos_x).toBe(300);
expect(dbProfile?.pos_y).toBe(400);
// 3. 模拟 Redis 数据丢失
await redis.flushall();
// 4. 验证数据库数据仍存在
const persistentDbProfile = await userProfilesService.findByUserId(testUserId);
expect(persistentDbProfile?.pos_x).toBe(300);
// 5. 重新更新位置应该恢复一致性
const newUpdateDto: UpdatePositionDto = {
pos_x: 500,
pos_y: 600,
current_map: testMapId
};
await userProfilesService.updatePosition(testUserId, newUpdateDto);
// 6. 验证一致性恢复
const restoredDbProfile = await userProfilesService.findByUserId(testUserId);
expect(restoredDbProfile?.pos_x).toBe(500);
expect(restoredDbProfile?.pos_y).toBe(600);
});
it('应该处理数据库写入失败的情况', async () => {
// 1. 模拟数据库写入失败
const originalUpdate = userProfilesService.update.bind(userProfilesService);
userProfilesService.update = async () => {
throw new Error('Database write failed');
};
// 2. 尝试更新位置 - 应该失败
const updateDto: UpdatePositionDto = {
pos_x: 300,
pos_y: 400,
current_map: testMapId
};
await expect(
userProfilesService.updatePosition(testUserId, updateDto)
).rejects.toThrow();
// 3. 恢复数据库操作
userProfilesService.update = originalUpdate;
// 4. 重新更新位置应该成功
await userProfilesService.updatePosition(testUserId, updateDto);
// 5. 验证最终一致性
const finalDbProfile = await userProfilesService.findByUserId(testUserId);
expect(finalDbProfile?.pos_x).toBe(300);
expect(finalDbProfile?.pos_y).toBe(400);
});
});
describe('数据库性能降级', () => {
it('应该处理数据库查询缓慢的情况', async () => {
// 1. 正常操作基准测试
const startTime = Date.now();
await userProfilesService.findByUserId(testUserId);
const normalTime = Date.now() - startTime;
// 2. 模拟数据库查询缓慢
const originalFindOne = userProfilesRepository.findOne.bind(userProfilesRepository);
userProfilesRepository.findOne = async (options: any) => {
await new Promise(resolve => setTimeout(resolve, 500)); // 500ms 延迟
return originalFindOne(options);
};
// 3. 测试缓慢查询
const slowStartTime = Date.now();
const slowProfile = await userProfilesService.findByUserId(testUserId);
const slowTime = Date.now() - slowStartTime;
// 4. 恢复正常操作
userProfilesRepository.findOne = originalFindOne;
// 5. 验证数据正确性
expect(slowProfile).toBeDefined();
expect(slowProfile?.user_id).toBe(testUserId);
// 记录性能数据
console.log(`Normal query time: ${normalTime}ms`);
console.log(`Slow query time: ${slowTime}ms`);
expect(slowTime).toBeGreaterThan(normalTime);
});
it('应该处理数据库死锁', async () => {
// 创建额外的测试用户
const testUserId2 = BigInt(2000);
const createDto2: CreateUserProfileDto = {
user_id: testUserId2,
bio: 'deadlock test user',
current_map: 'plaza',
pos_x: 0,
pos_y: 0,
status: 1
};
await userProfilesService.create(createDto2);
try {
// 1. 创建两个事务
const transaction1 = dataSource.createQueryRunner();
const transaction2 = dataSource.createQueryRunner();
await transaction1.connect();
await transaction2.connect();
await transaction1.startTransaction();
await transaction2.startTransaction();
// 2. 模拟可能导致死锁的操作序列
// 事务1: 锁定用户1然后尝试锁定用户2
await transaction1.manager.update(UserProfiles,
{ user_id: testUserId },
{ pos_x: 100 }
);
// 事务2: 锁定用户2然后尝试锁定用户1
await transaction2.manager.update(UserProfiles,
{ user_id: testUserId2 },
{ pos_x: 200 }
);
// 3. 交叉更新可能导致死锁
const deadlockPromise1 = transaction1.manager.update(UserProfiles,
{ user_id: testUserId2 },
{ pos_y: 100 }
);
const deadlockPromise2 = transaction2.manager.update(UserProfiles,
{ user_id: testUserId },
{ pos_y: 200 }
);
// 4. 等待操作完成或死锁检测
try {
await Promise.all([deadlockPromise1, deadlockPromise2]);
await transaction1.commitTransaction();
await transaction2.commitTransaction();
} catch (error) {
// 处理死锁
await transaction1.rollbackTransaction();
await transaction2.rollbackTransaction();
console.log('Deadlock detected and handled:', (error as Error).message);
}
await transaction1.release();
await transaction2.release();
// 5. 验证系统恢复正常
const profile1 = await userProfilesService.findByUserId(testUserId);
const profile2 = await userProfilesService.findByUserId(testUserId2);
expect(profile1).toBeDefined();
expect(profile2).toBeDefined();
} finally {
// 清理测试用户
await userProfilesRepository.delete({ user_id: testUserId2 });
}
});
});
describe('数据恢复和备份', () => {
it('应该支持数据恢复机制', async () => {
// 1. 创建初始数据
const updateDto: UpdatePositionDto = {
pos_x: 300,
pos_y: 400,
current_map: testMapId
};
await userProfilesService.updatePosition(testUserId, updateDto);
// 2. 验证数据存在
const originalProfile = await userProfilesService.findByUserId(testUserId);
expect(originalProfile?.pos_x).toBe(300);
// 3. 模拟数据损坏 - 直接修改数据库
await userProfilesRepository.update(
{ user_id: testUserId },
{
pos_x: 0,
pos_y: 0
}
);
// 4. 验证数据损坏
const corruptedProfile = await userProfilesService.findByUserId(testUserId);
expect(corruptedProfile?.pos_x).toBe(0);
// 5. 通过重新更新位置来恢复数据
await userProfilesService.updatePosition(testUserId, updateDto);
// 6. 验证数据恢复
const recoveredProfile = await userProfilesService.findByUserId(testUserId);
expect(recoveredProfile?.pos_x).toBe(300);
expect(recoveredProfile?.pos_y).toBe(400);
});
it('应该处理批量数据恢复', async () => {
const bigIntIds = [BigInt(3001), BigInt(3002), BigInt(3003)];
try {
// 1. 创建多个用户和位置数据
for (let i = 0; i < bigIntIds.length; i++) {
const createDto: CreateUserProfileDto = {
user_id: bigIntIds[i],
bio: `batch user ${i}`,
current_map: 'plaza',
pos_x: i * 100,
pos_y: i * 100,
status: 1
};
await userProfilesService.create(createDto);
}
// 2. 验证所有数据存在
for (let i = 0; i < bigIntIds.length; i++) {
const profile = await userProfilesService.findByUserId(bigIntIds[i]);
expect(profile?.pos_x).toBe(i * 100);
}
// 3. 模拟批量数据损坏
for (const bigIntId of bigIntIds) {
await userProfilesRepository.update(
{ user_id: bigIntId },
{
pos_x: 0,
pos_y: 0
}
);
}
// 4. 批量恢复数据
for (let i = 0; i < bigIntIds.length; i++) {
const updateDto: UpdatePositionDto = {
pos_x: i * 100,
pos_y: i * 100,
current_map: testMapId
};
await userProfilesService.updatePosition(bigIntIds[i], updateDto);
}
// 5. 验证批量恢复成功
for (let i = 0; i < bigIntIds.length; i++) {
const profile = await userProfilesService.findByUserId(bigIntIds[i]);
expect(profile?.pos_x).toBe(i * 100);
expect(profile?.pos_y).toBe(i * 100);
}
} finally {
// 清理测试数据
for (const bigIntId of bigIntIds) {
try {
await userProfilesRepository.delete({ user_id: bigIntId });
} catch (error) {
// 忽略清理错误
}
}
}
});
});
});