extends Node ## 速率限制器测试 ## 测试消息速率限制和DoS防护 var rate_limiter: RateLimiter func _ready(): print("=== 速率限制器测试开始 ===") # 运行所有测试 test_normal_message_allowed() test_rate_limit_triggered() await test_time_window_reset() test_multiple_clients_independent() test_client_statistics() test_global_statistics() test_limit_reset() test_suspicious_activity_detection() test_bot_pattern_detection() test_dynamic_limit_adjustment() test_cleanup_functionality() test_edge_cases() print("=== 速率限制器测试完成 ===") func setup_test(): ## 每个测试前的设置 rate_limiter = RateLimiter.new() # 设置较小的限制以便测试 rate_limiter.set_rate_limit(3, 1.0) # 每秒最多3条消息 func cleanup_test(): ## 每个测试后的清理 if rate_limiter: rate_limiter.queue_free() rate_limiter = null ## 测试正常消息允许 func test_normal_message_allowed(): ## 测试正常频率的消息应该被允许 print("\n--- 测试正常消息允许 ---") setup_test() var client_id = "test_client" # 发送3条消息(在限制内) for i in range(3): var allowed = rate_limiter.is_message_allowed(client_id) assert(allowed, "Message %d should be allowed" % (i + 1)) print("✅ 正常消息允许测试通过") cleanup_test() ## 测试速率限制触发 func test_rate_limit_triggered(): ## 测试超过速率限制时消息被阻止 print("\n--- 测试速率限制触发 ---") setup_test() var client_id = "test_client" # 发送3条消息(达到限制) for i in range(3): rate_limiter.is_message_allowed(client_id) # 第4条消息应该被阻止 var allowed = rate_limiter.is_message_allowed(client_id) assert(not allowed, "4th message should be blocked by rate limit") print("✅ 速率限制触发测试通过") cleanup_test() ## 测试时间窗口重置 func test_time_window_reset(): ## 测试时间窗口重置后允许新消息 print("\n--- 测试时间窗口重置 ---") setup_test() var client_id = "test_client" # 发送3条消息(达到限制) for i in range(3): rate_limiter.is_message_allowed(client_id) # 第4条消息被阻止 assert(not rate_limiter.is_message_allowed(client_id), "Should be blocked") # 等待时间窗口重置 await get_tree().create_timer(1.1).timeout # 现在应该允许新消息 var allowed = rate_limiter.is_message_allowed(client_id) assert(allowed, "Message should be allowed after time window reset") print("✅ 时间窗口重置测试通过") cleanup_test() ## 测试多客户端独立限制 func test_multiple_clients_independent(): ## 测试多个客户端的速率限制是独立的 print("\n--- 测试多客户端独立限制 ---") setup_test() var client1 = "client1" var client2 = "client2" # 客户端1发送3条消息(达到限制) for i in range(3): rate_limiter.is_message_allowed(client1) # 客户端1被阻止 assert(not rate_limiter.is_message_allowed(client1), "Client1 should be blocked") # 客户端2应该仍然可以发送消息 assert(rate_limiter.is_message_allowed(client2), "Client2 should still be allowed") print("✅ 多客户端独立限制测试通过") cleanup_test() ## 测试客户端统计 func test_client_statistics(): ## 测试客户端消息统计 print("\n--- 测试客户端统计 ---") setup_test() var client_id = "test_client" # 发送2条消息 rate_limiter.is_message_allowed(client_id) rate_limiter.is_message_allowed(client_id) var stats = rate_limiter.get_client_stats(client_id) assert(stats.message_count == 2, "Should show 2 messages sent") assert(stats.remaining_quota == 1, "Should show 1 message remaining") assert(stats.has("window_reset_time"), "Should include reset time") print("✅ 客户端统计测试通过") cleanup_test() ## 测试全局统计 func test_global_statistics(): ## 测试全局统计信息 print("\n--- 测试全局统计 ---") setup_test() # 多个客户端发送消息 rate_limiter.is_message_allowed("client1") rate_limiter.is_message_allowed("client2") rate_limiter.is_message_allowed("client1") var stats = rate_limiter.get_global_stats() assert(stats.has("total_clients"), "Should include total clients") assert(stats.has("active_clients"), "Should include active clients") assert(stats.has("total_messages_in_window"), "Should include total messages") assert(stats.total_clients == 2, "Should have 2 clients") print("✅ 全局统计测试通过") cleanup_test() ## 测试限制重置 func test_limit_reset(): ## 测试手动重置客户端限制 print("\n--- 测试限制重置 ---") setup_test() var client_id = "test_client" # 发送3条消息(达到限制) for i in range(3): rate_limiter.is_message_allowed(client_id) # 确认被阻止 assert(not rate_limiter.is_message_allowed(client_id), "Should be blocked") # 重置限制 rate_limiter.reset_client_limit(client_id) # 现在应该允许消息 assert(rate_limiter.is_message_allowed(client_id), "Should be allowed after reset") print("✅ 限制重置测试通过") cleanup_test() ## 测试可疑活动检测 func test_suspicious_activity_detection(): ## 测试可疑活动检测 print("\n--- 测试可疑活动检测 ---") setup_test() var client_id = "test_client" # 发送接近限制的消息数量 for i in range(3): # 3/3 = 100% 使用率 rate_limiter.is_message_allowed(client_id) # 应该检测为可疑活动 var is_suspicious = rate_limiter.is_suspicious_activity(client_id) assert(is_suspicious, "High message rate should be flagged as suspicious") print("✅ 可疑活动检测测试通过") cleanup_test() ## 测试机器人模式检测 func test_bot_pattern_detection(): ## 测试机器人行为模式检测 print("\n--- 测试机器人模式检测 ---") setup_test() var client_id = "test_client" # 模拟机器人:以完全相同的间隔发送消息 if not rate_limiter.client_message_history.has(client_id): rate_limiter.client_message_history[client_id] = {"messages": [], "last_cleanup": Time.get_unix_time_from_system()} var current_time = Time.get_unix_time_from_system() var client_record = rate_limiter.client_message_history[client_id] # 添加完全规律的时间戳(每0.1秒一条消息) for i in range(5): client_record.messages.append(current_time + i * 0.1) # 应该检测为可疑活动(机器人模式) var is_suspicious = rate_limiter.is_suspicious_activity(client_id) assert(is_suspicious, "Regular interval messages should be flagged as bot-like") print("✅ 机器人模式检测测试通过") cleanup_test() ## 测试动态限制调整 func test_dynamic_limit_adjustment(): ## 测试动态调整速率限制 print("\n--- 测试动态限制调整 ---") setup_test() var client_id = "test_client" # 使用初始限制(3条消息) for i in range(3): rate_limiter.is_message_allowed(client_id) assert(not rate_limiter.is_message_allowed(client_id), "Should be blocked with initial limit") # 调整限制为更高值 rate_limiter.set_rate_limit(5, 1.0) # 重置客户端记录以应用新限制 rate_limiter.reset_client_limit(client_id) # 现在应该能发送更多消息 for i in range(5): var allowed = rate_limiter.is_message_allowed(client_id) assert(allowed, "Message %d should be allowed with new limit" % (i + 1)) print("✅ 动态限制调整测试通过") cleanup_test() ## 测试清理功能 func test_cleanup_functionality(): ## 测试过期记录清理功能 print("\n--- 测试清理功能 ---") setup_test() var client_id = "test_client" # 发送一些消息 rate_limiter.is_message_allowed(client_id) # 确认客户端记录存在 assert(rate_limiter.client_message_history.has(client_id), "Client record should exist") # 手动触发清理(模拟长时间不活跃) if rate_limiter.client_message_history.has(client_id): var client_record = rate_limiter.client_message_history[client_id] client_record.last_cleanup = Time.get_unix_time_from_system() - 400 # 设置为很久以前 # 触发清理 rate_limiter._cleanup_old_records() print("✅ 清理功能测试通过") cleanup_test() ## 测试边界情况 func test_edge_cases(): ## 测试边界情况 print("\n--- 测试边界情况 ---") setup_test() # 测试空客户端ID var allowed = rate_limiter.is_message_allowed("") assert(allowed, "Empty client ID should be handled gracefully") # 测试非常长的客户端ID var long_id = "a".repeat(1000) allowed = rate_limiter.is_message_allowed(long_id) assert(allowed, "Long client ID should be handled gracefully") # 测试零限制 rate_limiter.set_rate_limit(0, 1.0) allowed = rate_limiter.is_message_allowed("test") assert(not allowed, "Zero rate limit should block all messages") # 测试负数限制(应该被处理为0或默认值) rate_limiter.set_rate_limit(-1, 1.0) allowed = rate_limiter.is_message_allowed("test2") # 行为取决于实现,但不应该崩溃 print("✅ 边界情况测试通过") cleanup_test() ## 断言函数 func assert(condition: bool, message: String): ## 简单的断言函数 if not condition: push_error("断言失败: " + message) print("❌ " + message) else: print("✓ " + message)