extends GutTest ## 速率限制器测试 ## 测试消息速率限制和DoS防护 var rate_limiter: RateLimiter func before_each(): """每个测试前的设置""" rate_limiter = RateLimiter.new() # 设置较小的限制以便测试 rate_limiter.set_rate_limit(3, 1.0) # 每秒最多3条消息 func after_each(): """每个测试后的清理""" if rate_limiter: rate_limiter.queue_free() ## 测试正常消息允许 func test_normal_message_allowed(): """测试正常频率的消息应该被允许""" var client_id = "test_client" # 发送3条消息(在限制内) for i in range(3): var allowed = rate_limiter.is_message_allowed(client_id) assert_true(allowed, "Message %d should be allowed" % (i + 1)) ## 测试速率限制触发 func test_rate_limit_triggered(): """测试超过速率限制时消息被阻止""" var client_id = "test_client" # 发送3条消息(达到限制) for i in range(3): rate_limiter.is_message_allowed(client_id) # 第4条消息应该被阻止 var allowed = rate_limiter.is_message_allowed(client_id) assert_false(allowed, "4th message should be blocked by rate limit") ## 测试时间窗口重置 func test_time_window_reset(): """测试时间窗口重置后允许新消息""" var client_id = "test_client" # 发送3条消息(达到限制) for i in range(3): rate_limiter.is_message_allowed(client_id) # 第4条消息被阻止 assert_false(rate_limiter.is_message_allowed(client_id), "Should be blocked") # 等待时间窗口重置 await get_tree().create_timer(1.1).timeout # 现在应该允许新消息 var allowed = rate_limiter.is_message_allowed(client_id) assert_true(allowed, "Message should be allowed after time window reset") ## 测试多客户端独立限制 func test_multiple_clients_independent(): """测试多个客户端的速率限制是独立的""" var client1 = "client1" var client2 = "client2" # 客户端1发送3条消息(达到限制) for i in range(3): rate_limiter.is_message_allowed(client1) # 客户端1被阻止 assert_false(rate_limiter.is_message_allowed(client1), "Client1 should be blocked") # 客户端2应该仍然可以发送消息 assert_true(rate_limiter.is_message_allowed(client2), "Client2 should still be allowed") ## 测试客户端统计 func test_client_statistics(): """测试客户端消息统计""" var client_id = "test_client" # 发送2条消息 rate_limiter.is_message_allowed(client_id) rate_limiter.is_message_allowed(client_id) var stats = rate_limiter.get_client_stats(client_id) assert_eq(stats.message_count, 2, "Should show 2 messages sent") assert_eq(stats.remaining_quota, 1, "Should show 1 message remaining") assert_true(stats.has("window_reset_time"), "Should include reset time") ## 测试全局统计 func test_global_statistics(): """测试全局统计信息""" # 多个客户端发送消息 rate_limiter.is_message_allowed("client1") rate_limiter.is_message_allowed("client2") rate_limiter.is_message_allowed("client1") var stats = rate_limiter.get_global_stats() assert_true(stats.has("total_clients"), "Should include total clients") assert_true(stats.has("active_clients"), "Should include active clients") assert_true(stats.has("total_messages_in_window"), "Should include total messages") assert_eq(stats.total_clients, 2, "Should have 2 clients") ## 测试限制重置 func test_limit_reset(): """测试手动重置客户端限制""" var client_id = "test_client" # 发送3条消息(达到限制) for i in range(3): rate_limiter.is_message_allowed(client_id) # 确认被阻止 assert_false(rate_limiter.is_message_allowed(client_id), "Should be blocked") # 重置限制 rate_limiter.reset_client_limit(client_id) # 现在应该允许消息 assert_true(rate_limiter.is_message_allowed(client_id), "Should be allowed after reset") ## 测试可疑活动检测 func test_suspicious_activity_detection(): """测试可疑活动检测""" var client_id = "test_client" # 发送接近限制的消息数量 for i in range(3): # 3/3 = 100% 使用率 rate_limiter.is_message_allowed(client_id) # 应该检测为可疑活动 var is_suspicious = rate_limiter.is_suspicious_activity(client_id) assert_true(is_suspicious, "High message rate should be flagged as suspicious") ## 测试机器人模式检测 func test_bot_pattern_detection(): """测试机器人行为模式检测""" var client_id = "test_client" # 模拟机器人:以完全相同的间隔发送消息 # 这需要手动操作消息历史来模拟 if rate_limiter.client_message_history.has(client_id): var client_record = rate_limiter.client_message_history[client_id] else: rate_limiter.client_message_history[client_id] = {"messages": [], "last_cleanup": Time.get_unix_time_from_system()} var client_record = rate_limiter.client_message_history[client_id] var current_time = Time.get_unix_time_from_system() var client_record = rate_limiter.client_message_history[client_id] # 添加完全规律的时间戳(每0.1秒一条消息) for i in range(5): client_record.messages.append(current_time + i * 0.1) # 应该检测为可疑活动(机器人模式) var is_suspicious = rate_limiter.is_suspicious_activity(client_id) assert_true(is_suspicious, "Regular interval messages should be flagged as bot-like") ## 测试动态限制调整 func test_dynamic_limit_adjustment(): """测试动态调整速率限制""" var client_id = "test_client" # 使用初始限制(3条消息) for i in range(3): rate_limiter.is_message_allowed(client_id) assert_false(rate_limiter.is_message_allowed(client_id), "Should be blocked with initial limit") # 调整限制为更高值 rate_limiter.set_rate_limit(5, 1.0) # 重置客户端记录以应用新限制 rate_limiter.reset_client_limit(client_id) # 现在应该能发送更多消息 for i in range(5): var allowed = rate_limiter.is_message_allowed(client_id) assert_true(allowed, "Message %d should be allowed with new limit" % (i + 1)) ## 测试清理功能 func test_cleanup_functionality(): """测试过期记录清理功能""" var client_id = "test_client" # 发送一些消息 rate_limiter.is_message_allowed(client_id) # 确认客户端记录存在 assert_true(rate_limiter.client_message_history.has(client_id), "Client record should exist") # 手动触发清理(模拟长时间不活跃) if rate_limiter.client_message_history.has(client_id): var client_record = rate_limiter.client_message_history[client_id] client_record.last_cleanup = Time.get_unix_time_from_system() - 400 # 设置为很久以前 # 触发清理 rate_limiter._cleanup_old_records() # 不活跃的客户端记录应该被清理 # 注意:这个测试可能需要根据实际的清理逻辑调整 ## 测试边界情况 func test_edge_cases(): """测试边界情况""" # 测试空客户端ID var allowed = rate_limiter.is_message_allowed("") assert_true(allowed, "Empty client ID should be handled gracefully") # 测试非常长的客户端ID var long_id = "a".repeat(1000) allowed = rate_limiter.is_message_allowed(long_id) assert_true(allowed, "Long client ID should be handled gracefully") # 测试零限制 rate_limiter.set_rate_limit(0, 1.0) allowed = rate_limiter.is_message_allowed("test") assert_false(allowed, "Zero rate limit should block all messages") # 测试负数限制(应该被处理为0或默认值) rate_limiter.set_rate_limit(-1, 1.0) allowed = rate_limiter.is_message_allowed("test2") # 行为取决于实现,但不应该崩溃