318 lines
9.1 KiB
GDScript
318 lines
9.1 KiB
GDScript
extends Node
|
||
## 速率限制器测试
|
||
## 测试消息速率限制和DoS防护
|
||
|
||
var rate_limiter: RateLimiter
|
||
|
||
func _ready():
|
||
print("=== 速率限制器测试开始 ===")
|
||
|
||
# 运行所有测试
|
||
test_normal_message_allowed()
|
||
test_rate_limit_triggered()
|
||
await test_time_window_reset()
|
||
test_multiple_clients_independent()
|
||
test_client_statistics()
|
||
test_global_statistics()
|
||
test_limit_reset()
|
||
test_suspicious_activity_detection()
|
||
test_bot_pattern_detection()
|
||
test_dynamic_limit_adjustment()
|
||
test_cleanup_functionality()
|
||
test_edge_cases()
|
||
|
||
print("=== 速率限制器测试完成 ===")
|
||
|
||
func setup_test():
|
||
## 每个测试前的设置
|
||
rate_limiter = RateLimiter.new()
|
||
# 设置较小的限制以便测试
|
||
rate_limiter.set_rate_limit(3, 1.0) # 每秒最多3条消息
|
||
|
||
func cleanup_test():
|
||
## 每个测试后的清理
|
||
if rate_limiter:
|
||
rate_limiter.queue_free()
|
||
rate_limiter = null
|
||
|
||
## 测试正常消息允许
|
||
func test_normal_message_allowed():
|
||
## 测试正常频率的消息应该被允许
|
||
print("\n--- 测试正常消息允许 ---")
|
||
setup_test()
|
||
|
||
var client_id = "test_client"
|
||
|
||
# 发送3条消息(在限制内)
|
||
for i in range(3):
|
||
var allowed = rate_limiter.is_message_allowed(client_id)
|
||
assert(allowed, "Message %d should be allowed" % (i + 1))
|
||
|
||
print("✅ 正常消息允许测试通过")
|
||
cleanup_test()
|
||
|
||
## 测试速率限制触发
|
||
func test_rate_limit_triggered():
|
||
## 测试超过速率限制时消息被阻止
|
||
print("\n--- 测试速率限制触发 ---")
|
||
setup_test()
|
||
|
||
var client_id = "test_client"
|
||
|
||
# 发送3条消息(达到限制)
|
||
for i in range(3):
|
||
rate_limiter.is_message_allowed(client_id)
|
||
|
||
# 第4条消息应该被阻止
|
||
var allowed = rate_limiter.is_message_allowed(client_id)
|
||
assert(not allowed, "4th message should be blocked by rate limit")
|
||
|
||
print("✅ 速率限制触发测试通过")
|
||
cleanup_test()
|
||
|
||
## 测试时间窗口重置
|
||
func test_time_window_reset():
|
||
## 测试时间窗口重置后允许新消息
|
||
print("\n--- 测试时间窗口重置 ---")
|
||
setup_test()
|
||
|
||
var client_id = "test_client"
|
||
|
||
# 发送3条消息(达到限制)
|
||
for i in range(3):
|
||
rate_limiter.is_message_allowed(client_id)
|
||
|
||
# 第4条消息被阻止
|
||
assert(not rate_limiter.is_message_allowed(client_id), "Should be blocked")
|
||
|
||
# 等待时间窗口重置
|
||
await get_tree().create_timer(1.1).timeout
|
||
|
||
# 现在应该允许新消息
|
||
var allowed = rate_limiter.is_message_allowed(client_id)
|
||
assert(allowed, "Message should be allowed after time window reset")
|
||
|
||
print("✅ 时间窗口重置测试通过")
|
||
cleanup_test()
|
||
|
||
## 测试多客户端独立限制
|
||
func test_multiple_clients_independent():
|
||
## 测试多个客户端的速率限制是独立的
|
||
print("\n--- 测试多客户端独立限制 ---")
|
||
setup_test()
|
||
|
||
var client1 = "client1"
|
||
var client2 = "client2"
|
||
|
||
# 客户端1发送3条消息(达到限制)
|
||
for i in range(3):
|
||
rate_limiter.is_message_allowed(client1)
|
||
|
||
# 客户端1被阻止
|
||
assert(not rate_limiter.is_message_allowed(client1), "Client1 should be blocked")
|
||
|
||
# 客户端2应该仍然可以发送消息
|
||
assert(rate_limiter.is_message_allowed(client2), "Client2 should still be allowed")
|
||
|
||
print("✅ 多客户端独立限制测试通过")
|
||
cleanup_test()
|
||
|
||
## 测试客户端统计
|
||
func test_client_statistics():
|
||
## 测试客户端消息统计
|
||
print("\n--- 测试客户端统计 ---")
|
||
setup_test()
|
||
|
||
var client_id = "test_client"
|
||
|
||
# 发送2条消息
|
||
rate_limiter.is_message_allowed(client_id)
|
||
rate_limiter.is_message_allowed(client_id)
|
||
|
||
var stats = rate_limiter.get_client_stats(client_id)
|
||
assert(stats.message_count == 2, "Should show 2 messages sent")
|
||
assert(stats.remaining_quota == 1, "Should show 1 message remaining")
|
||
assert(stats.has("window_reset_time"), "Should include reset time")
|
||
|
||
print("✅ 客户端统计测试通过")
|
||
cleanup_test()
|
||
|
||
## 测试全局统计
|
||
func test_global_statistics():
|
||
## 测试全局统计信息
|
||
print("\n--- 测试全局统计 ---")
|
||
setup_test()
|
||
|
||
# 多个客户端发送消息
|
||
rate_limiter.is_message_allowed("client1")
|
||
rate_limiter.is_message_allowed("client2")
|
||
rate_limiter.is_message_allowed("client1")
|
||
|
||
var stats = rate_limiter.get_global_stats()
|
||
assert(stats.has("total_clients"), "Should include total clients")
|
||
assert(stats.has("active_clients"), "Should include active clients")
|
||
assert(stats.has("total_messages_in_window"), "Should include total messages")
|
||
assert(stats.total_clients == 2, "Should have 2 clients")
|
||
|
||
print("✅ 全局统计测试通过")
|
||
cleanup_test()
|
||
|
||
## 测试限制重置
|
||
func test_limit_reset():
|
||
## 测试手动重置客户端限制
|
||
print("\n--- 测试限制重置 ---")
|
||
setup_test()
|
||
|
||
var client_id = "test_client"
|
||
|
||
# 发送3条消息(达到限制)
|
||
for i in range(3):
|
||
rate_limiter.is_message_allowed(client_id)
|
||
|
||
# 确认被阻止
|
||
assert(not rate_limiter.is_message_allowed(client_id), "Should be blocked")
|
||
|
||
# 重置限制
|
||
rate_limiter.reset_client_limit(client_id)
|
||
|
||
# 现在应该允许消息
|
||
assert(rate_limiter.is_message_allowed(client_id), "Should be allowed after reset")
|
||
|
||
print("✅ 限制重置测试通过")
|
||
cleanup_test()
|
||
|
||
## 测试可疑活动检测
|
||
func test_suspicious_activity_detection():
|
||
## 测试可疑活动检测
|
||
print("\n--- 测试可疑活动检测 ---")
|
||
setup_test()
|
||
|
||
var client_id = "test_client"
|
||
|
||
# 发送接近限制的消息数量
|
||
for i in range(3): # 3/3 = 100% 使用率
|
||
rate_limiter.is_message_allowed(client_id)
|
||
|
||
# 应该检测为可疑活动
|
||
var is_suspicious = rate_limiter.is_suspicious_activity(client_id)
|
||
assert(is_suspicious, "High message rate should be flagged as suspicious")
|
||
|
||
print("✅ 可疑活动检测测试通过")
|
||
cleanup_test()
|
||
|
||
## 测试机器人模式检测
|
||
func test_bot_pattern_detection():
|
||
## 测试机器人行为模式检测
|
||
print("\n--- 测试机器人模式检测 ---")
|
||
setup_test()
|
||
|
||
var client_id = "test_client"
|
||
|
||
# 模拟机器人:以完全相同的间隔发送消息
|
||
if not rate_limiter.client_message_history.has(client_id):
|
||
rate_limiter.client_message_history[client_id] = {"messages": [], "last_cleanup": Time.get_unix_time_from_system()}
|
||
|
||
var current_time = Time.get_unix_time_from_system()
|
||
var client_record = rate_limiter.client_message_history[client_id]
|
||
|
||
# 添加完全规律的时间戳(每0.1秒一条消息)
|
||
for i in range(5):
|
||
client_record.messages.append(current_time + i * 0.1)
|
||
|
||
# 应该检测为可疑活动(机器人模式)
|
||
var is_suspicious = rate_limiter.is_suspicious_activity(client_id)
|
||
assert(is_suspicious, "Regular interval messages should be flagged as bot-like")
|
||
|
||
print("✅ 机器人模式检测测试通过")
|
||
cleanup_test()
|
||
|
||
## 测试动态限制调整
|
||
func test_dynamic_limit_adjustment():
|
||
## 测试动态调整速率限制
|
||
print("\n--- 测试动态限制调整 ---")
|
||
setup_test()
|
||
|
||
var client_id = "test_client"
|
||
|
||
# 使用初始限制(3条消息)
|
||
for i in range(3):
|
||
rate_limiter.is_message_allowed(client_id)
|
||
|
||
assert(not rate_limiter.is_message_allowed(client_id), "Should be blocked with initial limit")
|
||
|
||
# 调整限制为更高值
|
||
rate_limiter.set_rate_limit(5, 1.0)
|
||
|
||
# 重置客户端记录以应用新限制
|
||
rate_limiter.reset_client_limit(client_id)
|
||
|
||
# 现在应该能发送更多消息
|
||
for i in range(5):
|
||
var allowed = rate_limiter.is_message_allowed(client_id)
|
||
assert(allowed, "Message %d should be allowed with new limit" % (i + 1))
|
||
|
||
print("✅ 动态限制调整测试通过")
|
||
cleanup_test()
|
||
|
||
## 测试清理功能
|
||
func test_cleanup_functionality():
|
||
## 测试过期记录清理功能
|
||
print("\n--- 测试清理功能 ---")
|
||
setup_test()
|
||
|
||
var client_id = "test_client"
|
||
|
||
# 发送一些消息
|
||
rate_limiter.is_message_allowed(client_id)
|
||
|
||
# 确认客户端记录存在
|
||
assert(rate_limiter.client_message_history.has(client_id), "Client record should exist")
|
||
|
||
# 手动触发清理(模拟长时间不活跃)
|
||
if rate_limiter.client_message_history.has(client_id):
|
||
var client_record = rate_limiter.client_message_history[client_id]
|
||
client_record.last_cleanup = Time.get_unix_time_from_system() - 400 # 设置为很久以前
|
||
|
||
# 触发清理
|
||
rate_limiter._cleanup_old_records()
|
||
|
||
print("✅ 清理功能测试通过")
|
||
cleanup_test()
|
||
|
||
## 测试边界情况
|
||
func test_edge_cases():
|
||
## 测试边界情况
|
||
print("\n--- 测试边界情况 ---")
|
||
setup_test()
|
||
|
||
# 测试空客户端ID
|
||
var allowed = rate_limiter.is_message_allowed("")
|
||
assert(allowed, "Empty client ID should be handled gracefully")
|
||
|
||
# 测试非常长的客户端ID
|
||
var long_id = "a".repeat(1000)
|
||
allowed = rate_limiter.is_message_allowed(long_id)
|
||
assert(allowed, "Long client ID should be handled gracefully")
|
||
|
||
# 测试零限制
|
||
rate_limiter.set_rate_limit(0, 1.0)
|
||
allowed = rate_limiter.is_message_allowed("test")
|
||
assert(not allowed, "Zero rate limit should block all messages")
|
||
|
||
# 测试负数限制(应该被处理为0或默认值)
|
||
rate_limiter.set_rate_limit(-1, 1.0)
|
||
allowed = rate_limiter.is_message_allowed("test2")
|
||
# 行为取决于实现,但不应该崩溃
|
||
|
||
print("✅ 边界情况测试通过")
|
||
cleanup_test()
|
||
|
||
## 断言函数
|
||
func assert(condition: bool, message: String):
|
||
## 简单的断言函数
|
||
if not condition:
|
||
push_error("断言失败: " + message)
|
||
print("❌ " + message)
|
||
else:
|
||
print("✓ " + message)
|