590 lines
19 KiB
TypeScript
590 lines
19 KiB
TypeScript
/**
|
||
* 数据库故障恢复集成测试
|
||
*
|
||
* 功能描述:
|
||
* - 测试数据库连接中断恢复机制
|
||
* - 验证数据库事务回滚处理
|
||
* - 测试数据库死锁恢复能力
|
||
* - 确保数据一致性保证
|
||
* - 验证数据库连接池管理
|
||
*
|
||
* 测试场景:
|
||
* 1. 数据库连接中断恢复
|
||
* 2. 数据库事务回滚处理
|
||
* 3. 数据库死锁恢复
|
||
* 4. 数据一致性保证
|
||
* 5. 数据库连接池管理
|
||
*
|
||
* 最近修改:
|
||
* - 2026-01-08: 文件重命名 - 修正kebab-case为snake_case命名规范 (修改者: moyin)
|
||
* - 2026-01-08: 修复导入路径和方法调用,适配实际的服务接口 (修改者: moyin)
|
||
*
|
||
* @author original
|
||
* @version 1.0.2
|
||
* @since 2025-01-01
|
||
* @lastModified 2026-01-08
|
||
*/
|
||
|
||
import { Test, TestingModule } from '@nestjs/testing';
|
||
import { INestApplication } from '@nestjs/common';
|
||
import { ConfigModule } from '@nestjs/config';
|
||
import { TypeOrmModule, getRepositoryToken } from '@nestjs/typeorm';
|
||
import { Repository, DataSource } from 'typeorm';
|
||
import Redis from 'ioredis';
|
||
import { LocationBroadcastModule } from '../../src/business/location_broadcast/location_broadcast.module';
|
||
import { UserProfiles } from '../../src/core/db/user_profiles/user_profiles.entity';
|
||
import { UserProfilesService } from '../../src/core/db/user_profiles/user_profiles.service';
|
||
import { LocationPositionService } from '../../src/business/location_broadcast/services/location_position.service';
|
||
import { LocationSessionService } from '../../src/business/location_broadcast/services/location_session.service';
|
||
import { CreateUserProfileDto, UpdatePositionDto } from '../../src/core/db/user_profiles/user_profiles.dto';
|
||
|
||
describe('Database Recovery Integration Tests', () => {
|
||
let app: INestApplication;
|
||
let module: TestingModule;
|
||
let dataSource: DataSource;
|
||
let redis: Redis;
|
||
let userProfilesRepository: Repository<UserProfiles>;
|
||
let userProfilesService: UserProfilesService;
|
||
let positionService: LocationPositionService;
|
||
let sessionService: LocationSessionService;
|
||
|
||
// 测试数据
|
||
const testUserId = BigInt(999999);
|
||
const testSessionId = 'test-session-db-recovery';
|
||
const testMapId = 'test-map-db-recovery';
|
||
|
||
beforeAll(async () => {
|
||
// 创建 Redis 实例
|
||
redis = new Redis(process.env.REDIS_URL || 'redis://localhost:6379');
|
||
|
||
module = await Test.createTestingModule({
|
||
imports: [
|
||
ConfigModule.forRoot({
|
||
isGlobal: true,
|
||
envFilePath: ['.env.test', '.env']
|
||
}),
|
||
TypeOrmModule.forRoot({
|
||
type: 'mysql',
|
||
host: process.env.DB_HOST || 'localhost',
|
||
port: parseInt(process.env.DB_PORT || '3306', 10),
|
||
username: process.env.DB_USERNAME || 'root',
|
||
password: process.env.DB_PASSWORD || '',
|
||
database: process.env.DB_DATABASE || 'test_db',
|
||
entities: [__dirname + '/../../src/**/*.entity{.ts,.js}'],
|
||
synchronize: true,
|
||
dropSchema: true,
|
||
extra: {
|
||
connectionLimit: 10,
|
||
acquireTimeout: 60000,
|
||
timeout: 60000
|
||
}
|
||
}),
|
||
LocationBroadcastModule
|
||
]
|
||
})
|
||
.overrideProvider('default_IORedisModuleConnectionToken')
|
||
.useValue(redis)
|
||
.compile();
|
||
|
||
app = module.createNestApplication();
|
||
await app.init();
|
||
|
||
// 获取服务实例
|
||
dataSource = module.get<DataSource>(DataSource);
|
||
userProfilesRepository = module.get<Repository<UserProfiles>>(
|
||
getRepositoryToken(UserProfiles)
|
||
);
|
||
userProfilesService = module.get<UserProfilesService>(UserProfilesService);
|
||
positionService = module.get<LocationPositionService>(LocationPositionService);
|
||
sessionService = module.get<LocationSessionService>(LocationSessionService);
|
||
|
||
// 确保连接正常
|
||
await dataSource.query('SELECT 1');
|
||
await redis.ping();
|
||
});
|
||
|
||
afterAll(async () => {
|
||
await redis.flushall();
|
||
await redis.disconnect();
|
||
await app.close();
|
||
});
|
||
|
||
beforeEach(async () => {
|
||
// 清理数据
|
||
await redis.flushall();
|
||
await userProfilesRepository.delete({});
|
||
|
||
// 创建测试用户
|
||
const createDto: CreateUserProfileDto = {
|
||
user_id: testUserId,
|
||
bio: 'test user for db recovery',
|
||
current_map: 'plaza',
|
||
pos_x: 0,
|
||
pos_y: 0,
|
||
status: 1
|
||
};
|
||
await userProfilesService.create(createDto);
|
||
});
|
||
|
||
afterEach(async () => {
|
||
// 清理测试数据
|
||
try {
|
||
await userProfilesRepository.delete({ user_id: testUserId });
|
||
} catch (error) {
|
||
// 忽略清理错误
|
||
}
|
||
});
|
||
|
||
describe('数据库连接故障恢复', () => {
|
||
it('应该处理数据库连接超时', async () => {
|
||
// 1. 正常操作
|
||
const updateDto: UpdatePositionDto = {
|
||
pos_x: 300,
|
||
pos_y: 400,
|
||
current_map: testMapId
|
||
};
|
||
await userProfilesService.updatePosition(testUserId, updateDto);
|
||
|
||
// 验证数据存在
|
||
const profile = await userProfilesService.findByUserId(testUserId);
|
||
expect(profile).toBeDefined();
|
||
expect(profile?.pos_x).toBe(300);
|
||
|
||
// 2. 模拟数据库连接超时
|
||
const originalQuery = dataSource.query.bind(dataSource);
|
||
dataSource.query = async () => {
|
||
throw new Error('Connection timeout');
|
||
};
|
||
|
||
// 3. 尝试数据库操作 - 应该失败
|
||
await expect(
|
||
userProfilesService.findByUserId(testUserId)
|
||
).rejects.toThrow();
|
||
|
||
// 4. 恢复连接
|
||
dataSource.query = originalQuery;
|
||
|
||
// 5. 验证连接恢复后操作正常
|
||
const recoveredProfile = await userProfilesService.findByUserId(testUserId);
|
||
expect(recoveredProfile).toBeDefined();
|
||
expect(recoveredProfile?.user_id).toBe(testUserId);
|
||
});
|
||
|
||
it('应该处理数据库连接池耗尽', async () => {
|
||
// 1. 创建多个并发连接来耗尽连接池
|
||
const concurrentOperations: Promise<any>[] = [];
|
||
|
||
for (let i = 0; i < 15; i++) { // 超过连接池限制 (10)
|
||
const createDto: CreateUserProfileDto = {
|
||
user_id: BigInt(i + 1000),
|
||
bio: `concurrent user ${i}`,
|
||
current_map: 'plaza',
|
||
pos_x: 0,
|
||
pos_y: 0,
|
||
status: 1
|
||
};
|
||
concurrentOperations.push(
|
||
userProfilesService.create(createDto).catch(error => {
|
||
// 捕获可能的错误,让 Promise.allSettled 处理
|
||
throw error;
|
||
})
|
||
);
|
||
}
|
||
|
||
// 2. 执行并发操作 - 部分可能因连接池耗尽而失败
|
||
const results = await Promise.allSettled(concurrentOperations);
|
||
|
||
// 3. 统计成功和失败的操作
|
||
const successful = results.filter(r => r.status === 'fulfilled').length;
|
||
const failed = results.filter(r => r.status === 'rejected').length;
|
||
|
||
console.log(`Successful operations: ${successful}, Failed: ${failed}`);
|
||
|
||
// 4. 等待连接释放
|
||
await new Promise(resolve => setTimeout(resolve, 1000));
|
||
|
||
// 5. 验证系统恢复正常
|
||
const testProfile = await userProfilesService.findByUserId(testUserId);
|
||
expect(testProfile).toBeDefined();
|
||
|
||
// 清理并发创建的用户
|
||
for (let i = 0; i < 15; i++) {
|
||
try {
|
||
await userProfilesRepository.delete({ user_id: BigInt(i + 1000) });
|
||
} catch (error) {
|
||
// 忽略清理错误
|
||
}
|
||
}
|
||
});
|
||
});
|
||
|
||
describe('数据库事务处理', () => {
|
||
it('应该处理事务回滚', async () => {
|
||
// 1. 开始事务
|
||
const queryRunner = dataSource.createQueryRunner();
|
||
await queryRunner.connect();
|
||
await queryRunner.startTransaction();
|
||
|
||
try {
|
||
// 2. 在事务中更新用户档案
|
||
await queryRunner.manager.update(UserProfiles,
|
||
{ user_id: testUserId },
|
||
{
|
||
pos_x: 300,
|
||
pos_y: 400,
|
||
last_position_update: new Date()
|
||
}
|
||
);
|
||
|
||
// 3. 验证事务中的数据
|
||
const transactionProfile = await queryRunner.manager.findOne(UserProfiles, {
|
||
where: { user_id: testUserId }
|
||
});
|
||
|
||
if (transactionProfile) {
|
||
expect(transactionProfile.pos_x).toBe(300);
|
||
}
|
||
|
||
// 4. 模拟错误导致回滚
|
||
throw new Error('Simulated transaction error');
|
||
|
||
} catch (error) {
|
||
// 5. 回滚事务
|
||
await queryRunner.rollbackTransaction();
|
||
} finally {
|
||
await queryRunner.release();
|
||
}
|
||
|
||
// 6. 验证数据已回滚
|
||
const rolledBackProfile = await userProfilesService.findByUserId(testUserId);
|
||
expect(rolledBackProfile?.pos_x).toBe(0); // 应该回到初始值
|
||
expect(rolledBackProfile?.pos_y).toBe(0);
|
||
});
|
||
|
||
it('应该处理并发事务冲突', async () => {
|
||
// 1. 创建两个并发事务
|
||
const transaction1 = dataSource.createQueryRunner();
|
||
const transaction2 = dataSource.createQueryRunner();
|
||
|
||
await transaction1.connect();
|
||
await transaction2.connect();
|
||
await transaction1.startTransaction();
|
||
await transaction2.startTransaction();
|
||
|
||
try {
|
||
// 2. 两个事务同时更新同一用户
|
||
const updatePromise1 = transaction1.manager.update(UserProfiles,
|
||
{ user_id: testUserId },
|
||
{
|
||
pos_x: 100,
|
||
last_position_update: new Date()
|
||
}
|
||
);
|
||
|
||
const updatePromise2 = transaction2.manager.update(UserProfiles,
|
||
{ user_id: testUserId },
|
||
{
|
||
pos_x: 200,
|
||
last_position_update: new Date()
|
||
}
|
||
);
|
||
|
||
// 3. 等待两个更新完成
|
||
await Promise.all([updatePromise1, updatePromise2]);
|
||
|
||
// 4. 提交事务
|
||
await transaction1.commitTransaction();
|
||
await transaction2.commitTransaction();
|
||
|
||
} catch (error) {
|
||
// 处理死锁或冲突
|
||
await transaction1.rollbackTransaction();
|
||
await transaction2.rollbackTransaction();
|
||
console.log('Transaction conflict handled:', (error as Error).message);
|
||
} finally {
|
||
await transaction1.release();
|
||
await transaction2.release();
|
||
}
|
||
|
||
// 5. 验证最终状态一致
|
||
const finalProfile = await userProfilesService.findByUserId(testUserId);
|
||
expect(finalProfile).toBeDefined();
|
||
// 最终值应该是其中一个事务的结果
|
||
expect([100, 200, 0]).toContain(finalProfile?.pos_x);
|
||
});
|
||
});
|
||
|
||
describe('数据一致性保证', () => {
|
||
it('应该保证 Redis 和数据库数据一致性', async () => {
|
||
// 1. 通过服务更新位置 (应该同时更新 Redis 和数据库)
|
||
const updateDto: UpdatePositionDto = {
|
||
pos_x: 300,
|
||
pos_y: 400,
|
||
current_map: testMapId
|
||
};
|
||
await userProfilesService.updatePosition(testUserId, updateDto);
|
||
|
||
// 2. 验证数据库中的数据
|
||
const dbProfile = await userProfilesService.findByUserId(testUserId);
|
||
expect(dbProfile?.pos_x).toBe(300);
|
||
expect(dbProfile?.pos_y).toBe(400);
|
||
|
||
// 3. 模拟 Redis 数据丢失
|
||
await redis.flushall();
|
||
|
||
// 4. 验证数据库数据仍存在
|
||
const persistentDbProfile = await userProfilesService.findByUserId(testUserId);
|
||
expect(persistentDbProfile?.pos_x).toBe(300);
|
||
|
||
// 5. 重新更新位置应该恢复一致性
|
||
const newUpdateDto: UpdatePositionDto = {
|
||
pos_x: 500,
|
||
pos_y: 600,
|
||
current_map: testMapId
|
||
};
|
||
await userProfilesService.updatePosition(testUserId, newUpdateDto);
|
||
|
||
// 6. 验证一致性恢复
|
||
const restoredDbProfile = await userProfilesService.findByUserId(testUserId);
|
||
expect(restoredDbProfile?.pos_x).toBe(500);
|
||
expect(restoredDbProfile?.pos_y).toBe(600);
|
||
});
|
||
|
||
it('应该处理数据库写入失败的情况', async () => {
|
||
// 1. 模拟数据库写入失败
|
||
const originalUpdate = userProfilesService.update.bind(userProfilesService);
|
||
userProfilesService.update = async () => {
|
||
throw new Error('Database write failed');
|
||
};
|
||
|
||
// 2. 尝试更新位置 - 应该失败
|
||
const updateDto: UpdatePositionDto = {
|
||
pos_x: 300,
|
||
pos_y: 400,
|
||
current_map: testMapId
|
||
};
|
||
await expect(
|
||
userProfilesService.updatePosition(testUserId, updateDto)
|
||
).rejects.toThrow();
|
||
|
||
// 3. 恢复数据库操作
|
||
userProfilesService.update = originalUpdate;
|
||
|
||
// 4. 重新更新位置应该成功
|
||
await userProfilesService.updatePosition(testUserId, updateDto);
|
||
|
||
// 5. 验证最终一致性
|
||
const finalDbProfile = await userProfilesService.findByUserId(testUserId);
|
||
expect(finalDbProfile?.pos_x).toBe(300);
|
||
expect(finalDbProfile?.pos_y).toBe(400);
|
||
});
|
||
});
|
||
|
||
describe('数据库性能降级', () => {
|
||
it('应该处理数据库查询缓慢的情况', async () => {
|
||
// 1. 正常操作基准测试
|
||
const startTime = Date.now();
|
||
await userProfilesService.findByUserId(testUserId);
|
||
const normalTime = Date.now() - startTime;
|
||
|
||
// 2. 模拟数据库查询缓慢
|
||
const originalFindOne = userProfilesRepository.findOne.bind(userProfilesRepository);
|
||
userProfilesRepository.findOne = async (options: any) => {
|
||
await new Promise(resolve => setTimeout(resolve, 500)); // 500ms 延迟
|
||
return originalFindOne(options);
|
||
};
|
||
|
||
// 3. 测试缓慢查询
|
||
const slowStartTime = Date.now();
|
||
const slowProfile = await userProfilesService.findByUserId(testUserId);
|
||
const slowTime = Date.now() - slowStartTime;
|
||
|
||
// 4. 恢复正常操作
|
||
userProfilesRepository.findOne = originalFindOne;
|
||
|
||
// 5. 验证数据正确性
|
||
expect(slowProfile).toBeDefined();
|
||
expect(slowProfile?.user_id).toBe(testUserId);
|
||
|
||
// 记录性能数据
|
||
console.log(`Normal query time: ${normalTime}ms`);
|
||
console.log(`Slow query time: ${slowTime}ms`);
|
||
expect(slowTime).toBeGreaterThan(normalTime);
|
||
});
|
||
|
||
it('应该处理数据库死锁', async () => {
|
||
// 创建额外的测试用户
|
||
const testUserId2 = BigInt(2000);
|
||
const createDto2: CreateUserProfileDto = {
|
||
user_id: testUserId2,
|
||
bio: 'deadlock test user',
|
||
current_map: 'plaza',
|
||
pos_x: 0,
|
||
pos_y: 0,
|
||
status: 1
|
||
};
|
||
await userProfilesService.create(createDto2);
|
||
|
||
try {
|
||
// 1. 创建两个事务
|
||
const transaction1 = dataSource.createQueryRunner();
|
||
const transaction2 = dataSource.createQueryRunner();
|
||
|
||
await transaction1.connect();
|
||
await transaction2.connect();
|
||
await transaction1.startTransaction();
|
||
await transaction2.startTransaction();
|
||
|
||
// 2. 模拟可能导致死锁的操作序列
|
||
// 事务1: 锁定用户1,然后尝试锁定用户2
|
||
await transaction1.manager.update(UserProfiles,
|
||
{ user_id: testUserId },
|
||
{ pos_x: 100 }
|
||
);
|
||
|
||
// 事务2: 锁定用户2,然后尝试锁定用户1
|
||
await transaction2.manager.update(UserProfiles,
|
||
{ user_id: testUserId2 },
|
||
{ pos_x: 200 }
|
||
);
|
||
|
||
// 3. 交叉更新可能导致死锁
|
||
const deadlockPromise1 = transaction1.manager.update(UserProfiles,
|
||
{ user_id: testUserId2 },
|
||
{ pos_y: 100 }
|
||
);
|
||
|
||
const deadlockPromise2 = transaction2.manager.update(UserProfiles,
|
||
{ user_id: testUserId },
|
||
{ pos_y: 200 }
|
||
);
|
||
|
||
// 4. 等待操作完成或死锁检测
|
||
try {
|
||
await Promise.all([deadlockPromise1, deadlockPromise2]);
|
||
await transaction1.commitTransaction();
|
||
await transaction2.commitTransaction();
|
||
} catch (error) {
|
||
// 处理死锁
|
||
await transaction1.rollbackTransaction();
|
||
await transaction2.rollbackTransaction();
|
||
console.log('Deadlock detected and handled:', (error as Error).message);
|
||
}
|
||
|
||
await transaction1.release();
|
||
await transaction2.release();
|
||
|
||
// 5. 验证系统恢复正常
|
||
const profile1 = await userProfilesService.findByUserId(testUserId);
|
||
const profile2 = await userProfilesService.findByUserId(testUserId2);
|
||
|
||
expect(profile1).toBeDefined();
|
||
expect(profile2).toBeDefined();
|
||
|
||
} finally {
|
||
// 清理测试用户
|
||
await userProfilesRepository.delete({ user_id: testUserId2 });
|
||
}
|
||
});
|
||
});
|
||
|
||
describe('数据恢复和备份', () => {
|
||
it('应该支持数据恢复机制', async () => {
|
||
// 1. 创建初始数据
|
||
const updateDto: UpdatePositionDto = {
|
||
pos_x: 300,
|
||
pos_y: 400,
|
||
current_map: testMapId
|
||
};
|
||
await userProfilesService.updatePosition(testUserId, updateDto);
|
||
|
||
// 2. 验证数据存在
|
||
const originalProfile = await userProfilesService.findByUserId(testUserId);
|
||
expect(originalProfile?.pos_x).toBe(300);
|
||
|
||
// 3. 模拟数据损坏 - 直接修改数据库
|
||
await userProfilesRepository.update(
|
||
{ user_id: testUserId },
|
||
{
|
||
pos_x: 0,
|
||
pos_y: 0
|
||
}
|
||
);
|
||
|
||
// 4. 验证数据损坏
|
||
const corruptedProfile = await userProfilesService.findByUserId(testUserId);
|
||
expect(corruptedProfile?.pos_x).toBe(0);
|
||
|
||
// 5. 通过重新更新位置来恢复数据
|
||
await userProfilesService.updatePosition(testUserId, updateDto);
|
||
|
||
// 6. 验证数据恢复
|
||
const recoveredProfile = await userProfilesService.findByUserId(testUserId);
|
||
expect(recoveredProfile?.pos_x).toBe(300);
|
||
expect(recoveredProfile?.pos_y).toBe(400);
|
||
});
|
||
|
||
it('应该处理批量数据恢复', async () => {
|
||
const bigIntIds = [BigInt(3001), BigInt(3002), BigInt(3003)];
|
||
|
||
try {
|
||
// 1. 创建多个用户和位置数据
|
||
for (let i = 0; i < bigIntIds.length; i++) {
|
||
const createDto: CreateUserProfileDto = {
|
||
user_id: bigIntIds[i],
|
||
bio: `batch user ${i}`,
|
||
current_map: 'plaza',
|
||
pos_x: i * 100,
|
||
pos_y: i * 100,
|
||
status: 1
|
||
};
|
||
await userProfilesService.create(createDto);
|
||
}
|
||
|
||
// 2. 验证所有数据存在
|
||
for (let i = 0; i < bigIntIds.length; i++) {
|
||
const profile = await userProfilesService.findByUserId(bigIntIds[i]);
|
||
expect(profile?.pos_x).toBe(i * 100);
|
||
}
|
||
|
||
// 3. 模拟批量数据损坏
|
||
for (const bigIntId of bigIntIds) {
|
||
await userProfilesRepository.update(
|
||
{ user_id: bigIntId },
|
||
{
|
||
pos_x: 0,
|
||
pos_y: 0
|
||
}
|
||
);
|
||
}
|
||
|
||
// 4. 批量恢复数据
|
||
for (let i = 0; i < bigIntIds.length; i++) {
|
||
const updateDto: UpdatePositionDto = {
|
||
pos_x: i * 100,
|
||
pos_y: i * 100,
|
||
current_map: testMapId
|
||
};
|
||
await userProfilesService.updatePosition(bigIntIds[i], updateDto);
|
||
}
|
||
|
||
// 5. 验证批量恢复成功
|
||
for (let i = 0; i < bigIntIds.length; i++) {
|
||
const profile = await userProfilesService.findByUserId(bigIntIds[i]);
|
||
expect(profile?.pos_x).toBe(i * 100);
|
||
expect(profile?.pos_y).toBe(i * 100);
|
||
}
|
||
|
||
} finally {
|
||
// 清理测试数据
|
||
for (const bigIntId of bigIntIds) {
|
||
try {
|
||
await userProfilesRepository.delete({ user_id: bigIntId });
|
||
} catch (error) {
|
||
// 忽略清理错误
|
||
}
|
||
}
|
||
}
|
||
});
|
||
});
|
||
}); |