test:添加网络与认证回归测试脚本

This commit is contained in:
2026-03-10 16:17:55 +08:00
parent d96abbb8b9
commit 64850c2cae
2 changed files with 327 additions and 0 deletions

View File

@@ -0,0 +1,185 @@
extends SceneTree
# Auth/BaseLevel 回归测试(无需 GUT
#
# 运行方式:
# "D:\\technology\\biancheng\\Godot\\Godot_v4.5.1-stable_win64.exe" --headless --path . --script tests/unit/test_auth_baselevel_regressions.gd
var _failures: Array[String] = []
class TestBaseLevel:
extends BaseLevel
func _ready() -> void:
# 测试场景中跳过原始 _ready避免网络/场景副作用
pass
func _init() -> void:
call_deferred("_run")
func _run() -> void:
await _test_forgot_password_response_handler()
await _test_auth_request_tracking_cleanup()
await _test_verification_code_timestamp_and_remaining_time()
await _test_send_code_cooldown_starts_on_success_only()
await _test_join_session_wait_has_timeout_guards()
if _failures.is_empty():
print("PASS: test_auth_baselevel_regressions")
quit(0)
return
for failure in _failures:
push_error(failure)
print("FAIL: test_auth_baselevel_regressions (%d)" % _failures.size())
quit(1)
func _test_forgot_password_response_handler() -> void:
var manager := AuthManager.new()
var toast_message: String = ""
manager.show_toast_message.connect(func(message: String, _is_success: bool): toast_message = message)
manager._on_forgot_password_response(true, {}, {})
_assert(
"重置验证码" in toast_message,
"忘记密码成功响应应使用 forgot_password 处理器文案"
)
toast_message = ""
manager._on_forgot_password_response(false, {"error_code": "USER_NOT_FOUND", "message": "用户不存在"}, {})
_assert(
("用户不存在" in toast_message) and ("请检查邮箱或手机号" in toast_message),
"忘记密码失败响应应使用 forgot_password 错误分支"
)
manager.cleanup()
func _test_auth_request_tracking_cleanup() -> void:
var manager := AuthManager.new()
manager.active_request_ids = ["req_a", "req_b"]
manager._on_network_request_completed("req_a", true, {})
_assert(
manager.active_request_ids.size() == 1 and manager.active_request_ids[0] == "req_b",
"请求完成后应从 active_request_ids 中移除对应 request_id"
)
manager._on_network_request_failed("req_b", "NETWORK_ERROR", "x")
_assert(
manager.active_request_ids.is_empty(),
"请求失败后也应从 active_request_ids 中移除对应 request_id"
)
manager.cleanup()
func _test_verification_code_timestamp_and_remaining_time() -> void:
var manager := AuthManager.new()
var email := "cooldown_regression@example.com"
manager._record_verification_code_sent(email)
var sent_timestamp: int = int(manager.verification_codes_sent[email].get("time", 0))
_assert(
sent_timestamp > 86400,
"验证码发送时间应记录 Unix 时间戳,避免跨天计算异常"
)
manager.verification_codes_sent[email].time = int(Time.get_unix_time_from_system()) - 1000
_assert(
manager.get_remaining_cooldown_time(email) == 0,
"冷却已过期时,剩余时间应为 0"
)
manager.cleanup()
func _test_send_code_cooldown_starts_on_success_only() -> void:
var packed_scene: PackedScene = load("res://scenes/ui/AuthScene.tscn")
_assert(packed_scene != null, "应能加载 AuthScene.tscn")
if packed_scene == null:
return
var auth_scene: Control = packed_scene.instantiate()
root.add_child(auth_scene)
await process_frame
auth_scene.register_email.text = "invalid_email"
auth_scene._on_send_code_pressed()
await process_frame
_assert(
auth_scene.cooldown_timer == null,
"邮箱不合法时不应启动验证码冷却计时器"
)
_assert(
not auth_scene.send_code_btn.disabled,
"邮箱不合法时发送按钮不应被锁定"
)
# 模拟发送成功回调触发冷却(并补齐 manager 内部冷却记录)
auth_scene.auth_manager._record_verification_code_sent("regression@example.com")
auth_scene._on_controller_verification_code_sent("ok")
_assert(
auth_scene.cooldown_timer != null,
"验证码发送成功后应启动冷却计时器"
)
_assert(
auth_scene.send_code_btn.disabled,
"验证码发送成功后应禁用发送按钮"
)
auth_scene.queue_free()
await process_frame
func _test_join_session_wait_has_timeout_guards() -> void:
var level := TestBaseLevel.new()
root.add_child(level)
await process_frame
var original_token: String = LocationManager._auth_token
var original_poll_interval: float = level._join_token_poll_interval
var original_token_timeout: float = level._join_token_wait_timeout
var original_player_timeout_frames: int = level._join_player_wait_timeout_frames
level._join_token_poll_interval = 0.01
level._join_token_wait_timeout = 0.03
level._join_player_wait_timeout_frames = 2
# 场景 1: Token 一直未就绪,函数应在超时后退出,而不是无限递归等待
LocationManager._auth_token = ""
level._join_session_with_player("test_session")
await create_timer(0.15).timeout
_assert(
not level._is_joining_session,
"Token 长时间未就绪时应超时退出 join 流程"
)
# 场景 2: Token 已就绪但玩家一直未生成,也应在上限后退出
LocationManager._auth_token = "dummy_token"
level._player_spawned = false
level._local_player = null
level._join_session_with_player("test_session")
await process_frame
await process_frame
await process_frame
await process_frame
_assert(
not level._is_joining_session,
"玩家长时间未就绪时应在帧上限后退出 join 流程"
)
# 恢复现场
LocationManager._auth_token = original_token
level._join_token_poll_interval = original_poll_interval
level._join_token_wait_timeout = original_token_timeout
level._join_player_wait_timeout_frames = original_player_timeout_frames
level.queue_free()
await process_frame
func _assert(condition: bool, message: String) -> void:
if not condition:
_failures.append(message)

View File

@@ -0,0 +1,142 @@
extends SceneTree
# WebSocket 关闭码与重连行为测试(无需 GUT
#
# 运行方式:
# "D:\\technology\\biancheng\\Godot\\Godot_v4.5.1-stable_win64.exe" --headless --path . --script tests/unit/test_websocket_close_code.gd
var _failures: Array[String] = []
func _init() -> void:
call_deferred("_run")
func _run() -> void:
var manager := ChatWebSocketManager.new()
root.add_child(manager)
await process_frame
_test_close_code_mapping(manager)
_test_idle_closed_state_does_not_trigger_reconnect(manager)
_test_reconnect_on_unclean_close(manager)
_test_no_reconnect_on_clean_close(manager)
_test_reconnect_attempt_not_reset_on_reconnect_connect(manager)
_test_reconnection_success_emits_and_resets_attempt(manager)
manager.queue_free()
await process_frame
if _failures.is_empty():
print("PASS: test_websocket_close_code")
quit(0)
return
for failure in _failures:
push_error(failure)
print("FAIL: test_websocket_close_code (%d)" % _failures.size())
quit(1)
func _test_close_code_mapping(manager: ChatWebSocketManager) -> void:
_assert(
not manager._is_clean_close_code(-1),
"close_code=-1 应判定为非干净关闭(异常断开)"
)
_assert(
manager._is_clean_close_code(1000),
"close_code=1000 应判定为干净关闭"
)
_assert(
manager._is_clean_close_code(0),
"close_code=0 不应判定为异常断开"
)
func _test_idle_closed_state_does_not_trigger_reconnect(manager: ChatWebSocketManager) -> void:
manager.enable_auto_reconnect(true, 5, 3.0)
manager._set_connection_state(ChatWebSocketManager.ConnectionState.DISCONNECTED)
manager._reconnect_attempt = 0
manager._closed_state_handled = false
manager._check_websocket_state()
_assert(
manager.get_connection_state() == ChatWebSocketManager.ConnectionState.DISCONNECTED,
"空闲 DISCONNECTED + STATE_CLOSED 不应触发自动重连"
)
_assert(
manager._reconnect_attempt == 0,
"空闲关闭态不应消耗重连次数"
)
func _test_reconnect_on_unclean_close(manager: ChatWebSocketManager) -> void:
var lost_count: int = 0
manager.connection_lost.connect(func(): lost_count += 1)
manager.enable_auto_reconnect(true, 5, 3.0)
manager._reconnect_attempt = 0
manager._set_connection_state(ChatWebSocketManager.ConnectionState.CONNECTED)
manager._on_websocket_closed(false)
_assert(
manager.get_connection_state() == ChatWebSocketManager.ConnectionState.RECONNECTING,
"异常断开时应进入 RECONNECTING 状态"
)
_assert(
manager._reconnect_attempt == 1,
"异常断开时应增加一次重连尝试计数"
)
_assert(
lost_count == 1,
"异常断开时应发射一次 connection_lost 信号"
)
# 清理重连定时器,避免影响后续用例
if is_instance_valid(manager._reconnect_timer):
manager._reconnect_timer.stop()
func _test_no_reconnect_on_clean_close(manager: ChatWebSocketManager) -> void:
var reconnect_attempt_before: int = manager._reconnect_attempt
manager._set_connection_state(ChatWebSocketManager.ConnectionState.CONNECTED)
manager._on_websocket_closed(true)
_assert(
manager.get_connection_state() == ChatWebSocketManager.ConnectionState.DISCONNECTED,
"干净关闭时应回到 DISCONNECTED 状态"
)
_assert(
manager._reconnect_attempt == reconnect_attempt_before,
"干净关闭时不应增加重连尝试计数"
)
func _test_reconnect_attempt_not_reset_on_reconnect_connect(manager: ChatWebSocketManager) -> void:
manager._reconnect_attempt = 3
manager._set_connection_state(ChatWebSocketManager.ConnectionState.RECONNECTING)
manager.connect_to_game_server(true)
_assert(
manager._reconnect_attempt == 3,
"重连触发 connect_to_game_server(true) 时不应重置重连尝试计数"
)
func _test_reconnection_success_emits_and_resets_attempt(manager: ChatWebSocketManager) -> void:
var success_count: int = 0
manager.reconnection_succeeded.connect(func(): success_count += 1)
manager._reconnect_attempt = 2
manager._set_connection_state(ChatWebSocketManager.ConnectionState.RECONNECTING)
manager._on_websocket_connected()
_assert(
success_count == 1,
"重连成功时应发射 reconnection_succeeded 信号"
)
_assert(
manager._reconnect_attempt == 0,
"重连成功时应重置重连尝试计数"
)
_assert(
manager.get_connection_state() == ChatWebSocketManager.ConnectionState.CONNECTED,
"重连成功后应进入 CONNECTED 状态"
)
func _assert(condition: bool, message: String) -> void:
if not condition:
_failures.append(message)