Files
whale-town/tests/test_rate_limiter.gd
2025-12-06 17:33:14 +08:00

318 lines
9.1 KiB
GDScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
extends Node
## 速率限制器测试
## 测试消息速率限制和DoS防护
var rate_limiter: RateLimiter
func _ready():
print("=== 速率限制器测试开始 ===")
# 运行所有测试
test_normal_message_allowed()
test_rate_limit_triggered()
await test_time_window_reset()
test_multiple_clients_independent()
test_client_statistics()
test_global_statistics()
test_limit_reset()
test_suspicious_activity_detection()
test_bot_pattern_detection()
test_dynamic_limit_adjustment()
test_cleanup_functionality()
test_edge_cases()
print("=== 速率限制器测试完成 ===")
func setup_test():
## 每个测试前的设置
rate_limiter = RateLimiter.new()
# 设置较小的限制以便测试
rate_limiter.set_rate_limit(3, 1.0) # 每秒最多3条消息
func cleanup_test():
## 每个测试后的清理
if rate_limiter:
rate_limiter.queue_free()
rate_limiter = null
## 测试正常消息允许
func test_normal_message_allowed():
## 测试正常频率的消息应该被允许
print("\n--- 测试正常消息允许 ---")
setup_test()
var client_id = "test_client"
# 发送3条消息在限制内
for i in range(3):
var allowed = rate_limiter.is_message_allowed(client_id)
assert(allowed, "Message %d should be allowed" % (i + 1))
print("✅ 正常消息允许测试通过")
cleanup_test()
## 测试速率限制触发
func test_rate_limit_triggered():
## 测试超过速率限制时消息被阻止
print("\n--- 测试速率限制触发 ---")
setup_test()
var client_id = "test_client"
# 发送3条消息达到限制
for i in range(3):
rate_limiter.is_message_allowed(client_id)
# 第4条消息应该被阻止
var allowed = rate_limiter.is_message_allowed(client_id)
assert(not allowed, "4th message should be blocked by rate limit")
print("✅ 速率限制触发测试通过")
cleanup_test()
## 测试时间窗口重置
func test_time_window_reset():
## 测试时间窗口重置后允许新消息
print("\n--- 测试时间窗口重置 ---")
setup_test()
var client_id = "test_client"
# 发送3条消息达到限制
for i in range(3):
rate_limiter.is_message_allowed(client_id)
# 第4条消息被阻止
assert(not rate_limiter.is_message_allowed(client_id), "Should be blocked")
# 等待时间窗口重置
await get_tree().create_timer(1.1).timeout
# 现在应该允许新消息
var allowed = rate_limiter.is_message_allowed(client_id)
assert(allowed, "Message should be allowed after time window reset")
print("✅ 时间窗口重置测试通过")
cleanup_test()
## 测试多客户端独立限制
func test_multiple_clients_independent():
## 测试多个客户端的速率限制是独立的
print("\n--- 测试多客户端独立限制 ---")
setup_test()
var client1 = "client1"
var client2 = "client2"
# 客户端1发送3条消息达到限制
for i in range(3):
rate_limiter.is_message_allowed(client1)
# 客户端1被阻止
assert(not rate_limiter.is_message_allowed(client1), "Client1 should be blocked")
# 客户端2应该仍然可以发送消息
assert(rate_limiter.is_message_allowed(client2), "Client2 should still be allowed")
print("✅ 多客户端独立限制测试通过")
cleanup_test()
## 测试客户端统计
func test_client_statistics():
## 测试客户端消息统计
print("\n--- 测试客户端统计 ---")
setup_test()
var client_id = "test_client"
# 发送2条消息
rate_limiter.is_message_allowed(client_id)
rate_limiter.is_message_allowed(client_id)
var stats = rate_limiter.get_client_stats(client_id)
assert(stats.message_count == 2, "Should show 2 messages sent")
assert(stats.remaining_quota == 1, "Should show 1 message remaining")
assert(stats.has("window_reset_time"), "Should include reset time")
print("✅ 客户端统计测试通过")
cleanup_test()
## 测试全局统计
func test_global_statistics():
## 测试全局统计信息
print("\n--- 测试全局统计 ---")
setup_test()
# 多个客户端发送消息
rate_limiter.is_message_allowed("client1")
rate_limiter.is_message_allowed("client2")
rate_limiter.is_message_allowed("client1")
var stats = rate_limiter.get_global_stats()
assert(stats.has("total_clients"), "Should include total clients")
assert(stats.has("active_clients"), "Should include active clients")
assert(stats.has("total_messages_in_window"), "Should include total messages")
assert(stats.total_clients == 2, "Should have 2 clients")
print("✅ 全局统计测试通过")
cleanup_test()
## 测试限制重置
func test_limit_reset():
## 测试手动重置客户端限制
print("\n--- 测试限制重置 ---")
setup_test()
var client_id = "test_client"
# 发送3条消息达到限制
for i in range(3):
rate_limiter.is_message_allowed(client_id)
# 确认被阻止
assert(not rate_limiter.is_message_allowed(client_id), "Should be blocked")
# 重置限制
rate_limiter.reset_client_limit(client_id)
# 现在应该允许消息
assert(rate_limiter.is_message_allowed(client_id), "Should be allowed after reset")
print("✅ 限制重置测试通过")
cleanup_test()
## 测试可疑活动检测
func test_suspicious_activity_detection():
## 测试可疑活动检测
print("\n--- 测试可疑活动检测 ---")
setup_test()
var client_id = "test_client"
# 发送接近限制的消息数量
for i in range(3): # 3/3 = 100% 使用率
rate_limiter.is_message_allowed(client_id)
# 应该检测为可疑活动
var is_suspicious = rate_limiter.is_suspicious_activity(client_id)
assert(is_suspicious, "High message rate should be flagged as suspicious")
print("✅ 可疑活动检测测试通过")
cleanup_test()
## 测试机器人模式检测
func test_bot_pattern_detection():
## 测试机器人行为模式检测
print("\n--- 测试机器人模式检测 ---")
setup_test()
var client_id = "test_client"
# 模拟机器人:以完全相同的间隔发送消息
if not rate_limiter.client_message_history.has(client_id):
rate_limiter.client_message_history[client_id] = {"messages": [], "last_cleanup": Time.get_unix_time_from_system()}
var current_time = Time.get_unix_time_from_system()
var client_record = rate_limiter.client_message_history[client_id]
# 添加完全规律的时间戳每0.1秒一条消息)
for i in range(5):
client_record.messages.append(current_time + i * 0.1)
# 应该检测为可疑活动(机器人模式)
var is_suspicious = rate_limiter.is_suspicious_activity(client_id)
assert(is_suspicious, "Regular interval messages should be flagged as bot-like")
print("✅ 机器人模式检测测试通过")
cleanup_test()
## 测试动态限制调整
func test_dynamic_limit_adjustment():
## 测试动态调整速率限制
print("\n--- 测试动态限制调整 ---")
setup_test()
var client_id = "test_client"
# 使用初始限制3条消息
for i in range(3):
rate_limiter.is_message_allowed(client_id)
assert(not rate_limiter.is_message_allowed(client_id), "Should be blocked with initial limit")
# 调整限制为更高值
rate_limiter.set_rate_limit(5, 1.0)
# 重置客户端记录以应用新限制
rate_limiter.reset_client_limit(client_id)
# 现在应该能发送更多消息
for i in range(5):
var allowed = rate_limiter.is_message_allowed(client_id)
assert(allowed, "Message %d should be allowed with new limit" % (i + 1))
print("✅ 动态限制调整测试通过")
cleanup_test()
## 测试清理功能
func test_cleanup_functionality():
## 测试过期记录清理功能
print("\n--- 测试清理功能 ---")
setup_test()
var client_id = "test_client"
# 发送一些消息
rate_limiter.is_message_allowed(client_id)
# 确认客户端记录存在
assert(rate_limiter.client_message_history.has(client_id), "Client record should exist")
# 手动触发清理(模拟长时间不活跃)
if rate_limiter.client_message_history.has(client_id):
var client_record = rate_limiter.client_message_history[client_id]
client_record.last_cleanup = Time.get_unix_time_from_system() - 400 # 设置为很久以前
# 触发清理
rate_limiter._cleanup_old_records()
print("✅ 清理功能测试通过")
cleanup_test()
## 测试边界情况
func test_edge_cases():
## 测试边界情况
print("\n--- 测试边界情况 ---")
setup_test()
# 测试空客户端ID
var allowed = rate_limiter.is_message_allowed("")
assert(allowed, "Empty client ID should be handled gracefully")
# 测试非常长的客户端ID
var long_id = "a".repeat(1000)
allowed = rate_limiter.is_message_allowed(long_id)
assert(allowed, "Long client ID should be handled gracefully")
# 测试零限制
rate_limiter.set_rate_limit(0, 1.0)
allowed = rate_limiter.is_message_allowed("test")
assert(not allowed, "Zero rate limit should block all messages")
# 测试负数限制应该被处理为0或默认值
rate_limiter.set_rate_limit(-1, 1.0)
allowed = rate_limiter.is_message_allowed("test2")
# 行为取决于实现,但不应该崩溃
print("✅ 边界情况测试通过")
cleanup_test()
## 断言函数
func assert(condition: bool, message: String):
## 简单的断言函数
if not condition:
push_error("断言失败: " + message)
print("" + message)
else:
print("" + message)