143 lines
4.4 KiB
GDScript
143 lines
4.4 KiB
GDScript
extends SceneTree
|
||
|
||
# WebSocket 关闭码与重连行为测试(无需 GUT)
|
||
#
|
||
# 运行方式:
|
||
# "D:\\technology\\biancheng\\Godot\\Godot_v4.5.1-stable_win64.exe" --headless --path . --script tests/unit/test_websocket_close_code.gd
|
||
|
||
var _failures: Array[String] = []
|
||
|
||
func _init() -> void:
|
||
call_deferred("_run")
|
||
|
||
func _run() -> void:
|
||
var manager := ChatWebSocketManager.new()
|
||
root.add_child(manager)
|
||
await process_frame
|
||
|
||
_test_close_code_mapping(manager)
|
||
_test_idle_closed_state_does_not_trigger_reconnect(manager)
|
||
_test_reconnect_on_unclean_close(manager)
|
||
_test_no_reconnect_on_clean_close(manager)
|
||
_test_reconnect_attempt_not_reset_on_reconnect_connect(manager)
|
||
_test_reconnection_success_emits_and_resets_attempt(manager)
|
||
|
||
manager.queue_free()
|
||
await process_frame
|
||
|
||
if _failures.is_empty():
|
||
print("PASS: test_websocket_close_code")
|
||
quit(0)
|
||
return
|
||
|
||
for failure in _failures:
|
||
push_error(failure)
|
||
print("FAIL: test_websocket_close_code (%d)" % _failures.size())
|
||
quit(1)
|
||
|
||
func _test_close_code_mapping(manager: ChatWebSocketManager) -> void:
|
||
_assert(
|
||
not manager._is_clean_close_code(-1),
|
||
"close_code=-1 应判定为非干净关闭(异常断开)"
|
||
)
|
||
_assert(
|
||
manager._is_clean_close_code(1000),
|
||
"close_code=1000 应判定为干净关闭"
|
||
)
|
||
_assert(
|
||
manager._is_clean_close_code(0),
|
||
"close_code=0 不应判定为异常断开"
|
||
)
|
||
|
||
func _test_idle_closed_state_does_not_trigger_reconnect(manager: ChatWebSocketManager) -> void:
|
||
manager.enable_auto_reconnect(true, 5, 3.0)
|
||
manager._set_connection_state(ChatWebSocketManager.ConnectionState.DISCONNECTED)
|
||
manager._reconnect_attempt = 0
|
||
manager._closed_state_handled = false
|
||
|
||
manager._check_websocket_state()
|
||
|
||
_assert(
|
||
manager.get_connection_state() == ChatWebSocketManager.ConnectionState.DISCONNECTED,
|
||
"空闲 DISCONNECTED + STATE_CLOSED 不应触发自动重连"
|
||
)
|
||
_assert(
|
||
manager._reconnect_attempt == 0,
|
||
"空闲关闭态不应消耗重连次数"
|
||
)
|
||
|
||
func _test_reconnect_on_unclean_close(manager: ChatWebSocketManager) -> void:
|
||
var lost_count: int = 0
|
||
manager.connection_lost.connect(func(): lost_count += 1)
|
||
|
||
manager.enable_auto_reconnect(true, 5, 3.0)
|
||
manager._reconnect_attempt = 0
|
||
manager._set_connection_state(ChatWebSocketManager.ConnectionState.CONNECTED)
|
||
manager._on_websocket_closed(false)
|
||
|
||
_assert(
|
||
manager.get_connection_state() == ChatWebSocketManager.ConnectionState.RECONNECTING,
|
||
"异常断开时应进入 RECONNECTING 状态"
|
||
)
|
||
_assert(
|
||
manager._reconnect_attempt == 1,
|
||
"异常断开时应增加一次重连尝试计数"
|
||
)
|
||
_assert(
|
||
lost_count == 1,
|
||
"异常断开时应发射一次 connection_lost 信号"
|
||
)
|
||
|
||
# 清理重连定时器,避免影响后续用例
|
||
if is_instance_valid(manager._reconnect_timer):
|
||
manager._reconnect_timer.stop()
|
||
|
||
func _test_no_reconnect_on_clean_close(manager: ChatWebSocketManager) -> void:
|
||
var reconnect_attempt_before: int = manager._reconnect_attempt
|
||
manager._set_connection_state(ChatWebSocketManager.ConnectionState.CONNECTED)
|
||
manager._on_websocket_closed(true)
|
||
|
||
_assert(
|
||
manager.get_connection_state() == ChatWebSocketManager.ConnectionState.DISCONNECTED,
|
||
"干净关闭时应回到 DISCONNECTED 状态"
|
||
)
|
||
_assert(
|
||
manager._reconnect_attempt == reconnect_attempt_before,
|
||
"干净关闭时不应增加重连尝试计数"
|
||
)
|
||
|
||
func _test_reconnect_attempt_not_reset_on_reconnect_connect(manager: ChatWebSocketManager) -> void:
|
||
manager._reconnect_attempt = 3
|
||
manager._set_connection_state(ChatWebSocketManager.ConnectionState.RECONNECTING)
|
||
manager.connect_to_game_server(true)
|
||
|
||
_assert(
|
||
manager._reconnect_attempt == 3,
|
||
"重连触发 connect_to_game_server(true) 时不应重置重连尝试计数"
|
||
)
|
||
|
||
func _test_reconnection_success_emits_and_resets_attempt(manager: ChatWebSocketManager) -> void:
|
||
var success_count: int = 0
|
||
manager.reconnection_succeeded.connect(func(): success_count += 1)
|
||
|
||
manager._reconnect_attempt = 2
|
||
manager._set_connection_state(ChatWebSocketManager.ConnectionState.RECONNECTING)
|
||
manager._on_websocket_connected()
|
||
|
||
_assert(
|
||
success_count == 1,
|
||
"重连成功时应发射 reconnection_succeeded 信号"
|
||
)
|
||
_assert(
|
||
manager._reconnect_attempt == 0,
|
||
"重连成功时应重置重连尝试计数"
|
||
)
|
||
_assert(
|
||
manager.get_connection_state() == ChatWebSocketManager.ConnectionState.CONNECTED,
|
||
"重连成功后应进入 CONNECTED 状态"
|
||
)
|
||
|
||
func _assert(condition: bool, message: String) -> void:
|
||
if not condition:
|
||
_failures.append(message)
|