/** * 数据库故障恢复集成测试 * * 功能描述: * - 测试数据库连接中断恢复机制 * - 验证数据库事务回滚处理 * - 测试数据库死锁恢复能力 * - 确保数据一致性保证 * - 验证数据库连接池管理 * * 测试场景: * 1. 数据库连接中断恢复 * 2. 数据库事务回滚处理 * 3. 数据库死锁恢复 * 4. 数据一致性保证 * 5. 数据库连接池管理 * * 最近修改: * - 2026-01-08: 文件重命名 - 修正kebab-case为snake_case命名规范 (修改者: moyin) * - 2026-01-08: 修复导入路径和方法调用,适配实际的服务接口 (修改者: moyin) * * @author original * @version 1.0.2 * @since 2025-01-01 * @lastModified 2026-01-08 */ import { Test, TestingModule } from '@nestjs/testing'; import { INestApplication } from '@nestjs/common'; import { ConfigModule } from '@nestjs/config'; import { TypeOrmModule, getRepositoryToken } from '@nestjs/typeorm'; import { Repository, DataSource } from 'typeorm'; import Redis from 'ioredis'; import { LocationBroadcastModule } from '../../src/business/location_broadcast/location_broadcast.module'; import { UserProfiles } from '../../src/core/db/user_profiles/user_profiles.entity'; import { UserProfilesService } from '../../src/core/db/user_profiles/user_profiles.service'; import { LocationPositionService } from '../../src/business/location_broadcast/services/location_position.service'; import { LocationSessionService } from '../../src/business/location_broadcast/services/location_session.service'; import { CreateUserProfileDto, UpdatePositionDto } from '../../src/core/db/user_profiles/user_profiles.dto'; describe('Database Recovery Integration Tests', () => { let app: INestApplication; let module: TestingModule; let dataSource: DataSource; let redis: Redis; let userProfilesRepository: Repository; let userProfilesService: UserProfilesService; let positionService: LocationPositionService; let sessionService: LocationSessionService; // 测试数据 const testUserId = BigInt(999999); const testSessionId = 'test-session-db-recovery'; const testMapId = 'test-map-db-recovery'; beforeAll(async () => { // 创建 Redis 实例 redis = new Redis(process.env.REDIS_URL || 'redis://localhost:6379'); module = await Test.createTestingModule({ imports: [ ConfigModule.forRoot({ isGlobal: true, envFilePath: ['.env.test', '.env'] }), TypeOrmModule.forRoot({ type: 'mysql', host: process.env.DB_HOST || 'localhost', port: parseInt(process.env.DB_PORT || '3306', 10), username: process.env.DB_USERNAME || 'root', password: process.env.DB_PASSWORD || '', database: process.env.DB_DATABASE || 'test_db', entities: [__dirname + '/../../src/**/*.entity{.ts,.js}'], synchronize: true, dropSchema: true, extra: { connectionLimit: 10, acquireTimeout: 60000, timeout: 60000 } }), LocationBroadcastModule ] }) .overrideProvider('default_IORedisModuleConnectionToken') .useValue(redis) .compile(); app = module.createNestApplication(); await app.init(); // 获取服务实例 dataSource = module.get(DataSource); userProfilesRepository = module.get>( getRepositoryToken(UserProfiles) ); userProfilesService = module.get(UserProfilesService); positionService = module.get(LocationPositionService); sessionService = module.get(LocationSessionService); // 确保连接正常 await dataSource.query('SELECT 1'); await redis.ping(); }); afterAll(async () => { await redis.flushall(); await redis.disconnect(); await app.close(); }); beforeEach(async () => { // 清理数据 await redis.flushall(); await userProfilesRepository.delete({}); // 创建测试用户 const createDto: CreateUserProfileDto = { user_id: testUserId, bio: 'test user for db recovery', current_map: 'plaza', pos_x: 0, pos_y: 0, status: 1 }; await userProfilesService.create(createDto); }); afterEach(async () => { // 清理测试数据 try { await userProfilesRepository.delete({ user_id: testUserId }); } catch (error) { // 忽略清理错误 } }); describe('数据库连接故障恢复', () => { it('应该处理数据库连接超时', async () => { // 1. 正常操作 const updateDto: UpdatePositionDto = { pos_x: 300, pos_y: 400, current_map: testMapId }; await userProfilesService.updatePosition(testUserId, updateDto); // 验证数据存在 const profile = await userProfilesService.findByUserId(testUserId); expect(profile).toBeDefined(); expect(profile?.pos_x).toBe(300); // 2. 模拟数据库连接超时 const originalQuery = dataSource.query.bind(dataSource); dataSource.query = async () => { throw new Error('Connection timeout'); }; // 3. 尝试数据库操作 - 应该失败 await expect( userProfilesService.findByUserId(testUserId) ).rejects.toThrow(); // 4. 恢复连接 dataSource.query = originalQuery; // 5. 验证连接恢复后操作正常 const recoveredProfile = await userProfilesService.findByUserId(testUserId); expect(recoveredProfile).toBeDefined(); expect(recoveredProfile?.user_id).toBe(testUserId); }); it('应该处理数据库连接池耗尽', async () => { // 1. 创建多个并发连接来耗尽连接池 const concurrentOperations: Promise[] = []; for (let i = 0; i < 15; i++) { // 超过连接池限制 (10) const createDto: CreateUserProfileDto = { user_id: BigInt(i + 1000), bio: `concurrent user ${i}`, current_map: 'plaza', pos_x: 0, pos_y: 0, status: 1 }; concurrentOperations.push( userProfilesService.create(createDto).catch(error => { // 捕获可能的错误,让 Promise.allSettled 处理 throw error; }) ); } // 2. 执行并发操作 - 部分可能因连接池耗尽而失败 const results = await Promise.allSettled(concurrentOperations); // 3. 统计成功和失败的操作 const successful = results.filter(r => r.status === 'fulfilled').length; const failed = results.filter(r => r.status === 'rejected').length; console.log(`Successful operations: ${successful}, Failed: ${failed}`); // 4. 等待连接释放 await new Promise(resolve => setTimeout(resolve, 1000)); // 5. 验证系统恢复正常 const testProfile = await userProfilesService.findByUserId(testUserId); expect(testProfile).toBeDefined(); // 清理并发创建的用户 for (let i = 0; i < 15; i++) { try { await userProfilesRepository.delete({ user_id: BigInt(i + 1000) }); } catch (error) { // 忽略清理错误 } } }); }); describe('数据库事务处理', () => { it('应该处理事务回滚', async () => { // 1. 开始事务 const queryRunner = dataSource.createQueryRunner(); await queryRunner.connect(); await queryRunner.startTransaction(); try { // 2. 在事务中更新用户档案 await queryRunner.manager.update(UserProfiles, { user_id: testUserId }, { pos_x: 300, pos_y: 400, last_position_update: new Date() } ); // 3. 验证事务中的数据 const transactionProfile = await queryRunner.manager.findOne(UserProfiles, { where: { user_id: testUserId } }); if (transactionProfile) { expect(transactionProfile.pos_x).toBe(300); } // 4. 模拟错误导致回滚 throw new Error('Simulated transaction error'); } catch (error) { // 5. 回滚事务 await queryRunner.rollbackTransaction(); } finally { await queryRunner.release(); } // 6. 验证数据已回滚 const rolledBackProfile = await userProfilesService.findByUserId(testUserId); expect(rolledBackProfile?.pos_x).toBe(0); // 应该回到初始值 expect(rolledBackProfile?.pos_y).toBe(0); }); it('应该处理并发事务冲突', async () => { // 1. 创建两个并发事务 const transaction1 = dataSource.createQueryRunner(); const transaction2 = dataSource.createQueryRunner(); await transaction1.connect(); await transaction2.connect(); await transaction1.startTransaction(); await transaction2.startTransaction(); try { // 2. 两个事务同时更新同一用户 const updatePromise1 = transaction1.manager.update(UserProfiles, { user_id: testUserId }, { pos_x: 100, last_position_update: new Date() } ); const updatePromise2 = transaction2.manager.update(UserProfiles, { user_id: testUserId }, { pos_x: 200, last_position_update: new Date() } ); // 3. 等待两个更新完成 await Promise.all([updatePromise1, updatePromise2]); // 4. 提交事务 await transaction1.commitTransaction(); await transaction2.commitTransaction(); } catch (error) { // 处理死锁或冲突 await transaction1.rollbackTransaction(); await transaction2.rollbackTransaction(); console.log('Transaction conflict handled:', (error as Error).message); } finally { await transaction1.release(); await transaction2.release(); } // 5. 验证最终状态一致 const finalProfile = await userProfilesService.findByUserId(testUserId); expect(finalProfile).toBeDefined(); // 最终值应该是其中一个事务的结果 expect([100, 200, 0]).toContain(finalProfile?.pos_x); }); }); describe('数据一致性保证', () => { it('应该保证 Redis 和数据库数据一致性', async () => { // 1. 通过服务更新位置 (应该同时更新 Redis 和数据库) const updateDto: UpdatePositionDto = { pos_x: 300, pos_y: 400, current_map: testMapId }; await userProfilesService.updatePosition(testUserId, updateDto); // 2. 验证数据库中的数据 const dbProfile = await userProfilesService.findByUserId(testUserId); expect(dbProfile?.pos_x).toBe(300); expect(dbProfile?.pos_y).toBe(400); // 3. 模拟 Redis 数据丢失 await redis.flushall(); // 4. 验证数据库数据仍存在 const persistentDbProfile = await userProfilesService.findByUserId(testUserId); expect(persistentDbProfile?.pos_x).toBe(300); // 5. 重新更新位置应该恢复一致性 const newUpdateDto: UpdatePositionDto = { pos_x: 500, pos_y: 600, current_map: testMapId }; await userProfilesService.updatePosition(testUserId, newUpdateDto); // 6. 验证一致性恢复 const restoredDbProfile = await userProfilesService.findByUserId(testUserId); expect(restoredDbProfile?.pos_x).toBe(500); expect(restoredDbProfile?.pos_y).toBe(600); }); it('应该处理数据库写入失败的情况', async () => { // 1. 模拟数据库写入失败 const originalUpdate = userProfilesService.update.bind(userProfilesService); userProfilesService.update = async () => { throw new Error('Database write failed'); }; // 2. 尝试更新位置 - 应该失败 const updateDto: UpdatePositionDto = { pos_x: 300, pos_y: 400, current_map: testMapId }; await expect( userProfilesService.updatePosition(testUserId, updateDto) ).rejects.toThrow(); // 3. 恢复数据库操作 userProfilesService.update = originalUpdate; // 4. 重新更新位置应该成功 await userProfilesService.updatePosition(testUserId, updateDto); // 5. 验证最终一致性 const finalDbProfile = await userProfilesService.findByUserId(testUserId); expect(finalDbProfile?.pos_x).toBe(300); expect(finalDbProfile?.pos_y).toBe(400); }); }); describe('数据库性能降级', () => { it('应该处理数据库查询缓慢的情况', async () => { // 1. 正常操作基准测试 const startTime = Date.now(); await userProfilesService.findByUserId(testUserId); const normalTime = Date.now() - startTime; // 2. 模拟数据库查询缓慢 const originalFindOne = userProfilesRepository.findOne.bind(userProfilesRepository); userProfilesRepository.findOne = async (options: any) => { await new Promise(resolve => setTimeout(resolve, 500)); // 500ms 延迟 return originalFindOne(options); }; // 3. 测试缓慢查询 const slowStartTime = Date.now(); const slowProfile = await userProfilesService.findByUserId(testUserId); const slowTime = Date.now() - slowStartTime; // 4. 恢复正常操作 userProfilesRepository.findOne = originalFindOne; // 5. 验证数据正确性 expect(slowProfile).toBeDefined(); expect(slowProfile?.user_id).toBe(testUserId); // 记录性能数据 console.log(`Normal query time: ${normalTime}ms`); console.log(`Slow query time: ${slowTime}ms`); expect(slowTime).toBeGreaterThan(normalTime); }); it('应该处理数据库死锁', async () => { // 创建额外的测试用户 const testUserId2 = BigInt(2000); const createDto2: CreateUserProfileDto = { user_id: testUserId2, bio: 'deadlock test user', current_map: 'plaza', pos_x: 0, pos_y: 0, status: 1 }; await userProfilesService.create(createDto2); try { // 1. 创建两个事务 const transaction1 = dataSource.createQueryRunner(); const transaction2 = dataSource.createQueryRunner(); await transaction1.connect(); await transaction2.connect(); await transaction1.startTransaction(); await transaction2.startTransaction(); // 2. 模拟可能导致死锁的操作序列 // 事务1: 锁定用户1,然后尝试锁定用户2 await transaction1.manager.update(UserProfiles, { user_id: testUserId }, { pos_x: 100 } ); // 事务2: 锁定用户2,然后尝试锁定用户1 await transaction2.manager.update(UserProfiles, { user_id: testUserId2 }, { pos_x: 200 } ); // 3. 交叉更新可能导致死锁 const deadlockPromise1 = transaction1.manager.update(UserProfiles, { user_id: testUserId2 }, { pos_y: 100 } ); const deadlockPromise2 = transaction2.manager.update(UserProfiles, { user_id: testUserId }, { pos_y: 200 } ); // 4. 等待操作完成或死锁检测 try { await Promise.all([deadlockPromise1, deadlockPromise2]); await transaction1.commitTransaction(); await transaction2.commitTransaction(); } catch (error) { // 处理死锁 await transaction1.rollbackTransaction(); await transaction2.rollbackTransaction(); console.log('Deadlock detected and handled:', (error as Error).message); } await transaction1.release(); await transaction2.release(); // 5. 验证系统恢复正常 const profile1 = await userProfilesService.findByUserId(testUserId); const profile2 = await userProfilesService.findByUserId(testUserId2); expect(profile1).toBeDefined(); expect(profile2).toBeDefined(); } finally { // 清理测试用户 await userProfilesRepository.delete({ user_id: testUserId2 }); } }); }); describe('数据恢复和备份', () => { it('应该支持数据恢复机制', async () => { // 1. 创建初始数据 const updateDto: UpdatePositionDto = { pos_x: 300, pos_y: 400, current_map: testMapId }; await userProfilesService.updatePosition(testUserId, updateDto); // 2. 验证数据存在 const originalProfile = await userProfilesService.findByUserId(testUserId); expect(originalProfile?.pos_x).toBe(300); // 3. 模拟数据损坏 - 直接修改数据库 await userProfilesRepository.update( { user_id: testUserId }, { pos_x: 0, pos_y: 0 } ); // 4. 验证数据损坏 const corruptedProfile = await userProfilesService.findByUserId(testUserId); expect(corruptedProfile?.pos_x).toBe(0); // 5. 通过重新更新位置来恢复数据 await userProfilesService.updatePosition(testUserId, updateDto); // 6. 验证数据恢复 const recoveredProfile = await userProfilesService.findByUserId(testUserId); expect(recoveredProfile?.pos_x).toBe(300); expect(recoveredProfile?.pos_y).toBe(400); }); it('应该处理批量数据恢复', async () => { const bigIntIds = [BigInt(3001), BigInt(3002), BigInt(3003)]; try { // 1. 创建多个用户和位置数据 for (let i = 0; i < bigIntIds.length; i++) { const createDto: CreateUserProfileDto = { user_id: bigIntIds[i], bio: `batch user ${i}`, current_map: 'plaza', pos_x: i * 100, pos_y: i * 100, status: 1 }; await userProfilesService.create(createDto); } // 2. 验证所有数据存在 for (let i = 0; i < bigIntIds.length; i++) { const profile = await userProfilesService.findByUserId(bigIntIds[i]); expect(profile?.pos_x).toBe(i * 100); } // 3. 模拟批量数据损坏 for (const bigIntId of bigIntIds) { await userProfilesRepository.update( { user_id: bigIntId }, { pos_x: 0, pos_y: 0 } ); } // 4. 批量恢复数据 for (let i = 0; i < bigIntIds.length; i++) { const updateDto: UpdatePositionDto = { pos_x: i * 100, pos_y: i * 100, current_map: testMapId }; await userProfilesService.updatePosition(bigIntIds[i], updateDto); } // 5. 验证批量恢复成功 for (let i = 0; i < bigIntIds.length; i++) { const profile = await userProfilesService.findByUserId(bigIntIds[i]); expect(profile?.pos_x).toBe(i * 100); expect(profile?.pos_y).toBe(i * 100); } } finally { // 清理测试数据 for (const bigIntId of bigIntIds) { try { await userProfilesRepository.delete({ user_id: bigIntId }); } catch (error) { // 忽略清理错误 } } } }); }); });