extends GutTest # ============================================================================ # test_websocket_manager.gd - WebSocketManager 单元测试 # ============================================================================ # 测试 WebSocket 连接管理的功能 # # 测试覆盖: # - 初始化测试 # - 连接状态管理 # - 重连机制 # - Socket.IO 客户端访问 # - 信号发射 # ============================================================================ var ws_manager: WebSocketManager # ============================================================================ # 测试设置和清理 # ============================================================================ func before_each(): # 每个测试前创建新实例 ws_manager = WebSocketManager.new() add_child(ws_manager) func after_each(): # 每个测试后清理 if is_instance_valid(ws_manager): ws_manager.queue_free() ws_manager = null # ============================================================================ # 初始化测试 # ============================================================================ func test_websocket_manager_initialization(): # 测试管理器初始化 assert_not_null(ws_manager, "WebSocketManager 应该成功初始化") assert_not_null(ws_manager._socket_client, "Socket.IO 客户端应该被创建") assert_eq(ws_manager.is_websocket_connected(), false, "初始状态应该是未连接") func test_initial_connection_state(): # 测试初始连接状态 assert_eq(ws_manager.get_connection_state(), WebSocketManager.ConnectionState.DISCONNECTED, "初始连接状态应该是 DISCONNECTED") # ============================================================================ # 连接状态测试 # ============================================================================ func test_connection_state_enum(): # 测试连接状态枚举值 assert_eq(WebSocketManager.ConnectionState.DISCONNECTED, 0, "DISCONNECTED 应该是 0") assert_eq(WebSocketManager.ConnectionState.CONNECTING, 1, "CONNECTING 应该是 1") assert_eq(WebSocketManager.ConnectionState.CONNECTED, 2, "CONNECTED 应该是 2") assert_eq(WebSocketManager.ConnectionState.RECONNECTING, 3, "RECONNECTING 应该是 3") assert_eq(WebSocketManager.ConnectionState.ERROR, 4, "ERROR 应该是 4") func test_get_state_description(): # 测试状态描述方法 ws_manager._connection_state = WebSocketManager.ConnectionState.DISCONNECTED var desc := ws_manager.get_state_description() assert_eq(desc, "未连接", "DISCONNECTED 状态描述应该是 '未连接'") ws_manager._connection_state = WebSocketManager.ConnectionState.CONNECTED desc = ws_manager.get_state_description() assert_eq(desc, "已连接", "CONNECTED 状态描述应该是 '已连接'") ws_manager._connection_state = WebSocketManager.ConnectionState.CONNECTING desc = ws_manager.get_state_description() assert_eq(desc, "连接中", "CONNECTING 状态描述应该是 '连接中'") # ============================================================================ # 自动重连配置测试 # ============================================================================ func test_enable_auto_reconnect(): # 测试启用自动重连 ws_manager.enable_auto_reconnect(true, 5, 3.0) assert_true(ws_manager._auto_reconnect_enabled, "自动重连应该被启用") assert_eq(ws_manager._max_reconnect_attempts, 5, "最大重连次数应该是 5") assert_eq(ws_manager._reconnect_base_delay, 3.0, "基础重连延迟应该是 3.0 秒") func test_disable_auto_reconnect(): # 测试禁用自动重连 ws_manager.enable_auto_reconnect(false) assert_false(ws_manager._auto_reconnect_enabled, "自动重连应该被禁用") func test_get_reconnect_info(): # 测试获取重连信息 ws_manager.enable_auto_reconnect(true, 5, 3.0) ws_manager._reconnect_attempt = 2 var info := ws_manager.get_reconnect_info() assert_true(info.has("enabled"), "重连信息应该包含 'enabled'") assert_true(info.has("attempt"), "重连信息应该包含 'attempt'") assert_true(info.has("max_attempts"), "重连信息应该包含 'max_attempts'") assert_true(info.has("next_delay"), "重连信息应该包含 'next_delay'") assert_eq(info["enabled"], true, "自动重连应该启用") assert_eq(info["attempt"], 2, "当前重连尝试次数应该是 2") assert_eq(info["max_attempts"], 5, "最大重连次数应该是 5") # ============================================================================ # 重连延迟计算测试 # ============================================================================ func test_reconnect_delay_calculation(): # 测试重连延迟计算(指数退避) ws_manager._reconnect_base_delay = 2.0 ws_manager._reconnect_attempt = 1 var delay1 := ws_manager._calculate_reconnect_delay() assert_eq(delay1, 2.0, "第 1 次重连延迟应该是 2.0 秒") ws_manager._reconnect_attempt = 2 var delay2 := ws_manager._calculate_reconnect_delay() assert_eq(delay2, 4.0, "第 2 次重连延迟应该是 4.0 秒") ws_manager._reconnect_attempt = 3 var delay3 := ws_manager._calculate_reconnect_delay() assert_eq(delay3, 8.0, "第 3 次重连延迟应该是 8.0 秒") func test_max_reconnect_delay(): # 测试最大重连延迟限制 ws_manager._reconnect_base_delay = 10.0 ws_manager._reconnect_attempt = 10 var delay := ws_manager._calculate_reconnect_delay() assert_eq(delay, WebSocketManager.MAX_RECONNECT_DELAY, "重连延迟应该被限制在最大值") # ============================================================================ # Socket.IO 客户端访问测试 # ============================================================================ func test_get_socket_client(): # 测试获取 Socket.IO 客户端 var socket_client := ws_manager.get_socket_client() assert_not_null(socket_client, "应该返回 Socket.IO 客户端") assert_true(socket_client is SocketIOClient, "返回的应该是 SocketIOClient 实例") # ============================================================================ # 信号测试 # ============================================================================ func test_connection_state_changed_signal(): # 测试连接状态变化信号 watch_signals(ws_manager) # 手动触发信号 ws_manager.emit_signal("connection_state_changed", WebSocketManager.ConnectionState.CONNECTED) assert_signal_emitted(ws_manager, "connection_state_changed", "应该发射 connection_state_changed 信号") func test_connection_lost_signal(): # 测试连接丢失信号 watch_signals(ws_manager) ws_manager.emit_signal("connection_lost") assert_signal_emitted(ws_manager, "connection_lost", "应该发射 connection_lost 信号") func test_reconnection_succeeded_signal(): # 测试重连成功信号 watch_signals(ws_manager) ws_manager.emit_signal("reconnection_succeeded") assert_signal_emitted(ws_manager, "reconnection_succeeded", "应该发射 reconnection_succeeded 信号") func test_reconnection_failed_signal(): # 测试重连失败信号 watch_signals(ws_manager) ws_manager.emit_signal("reconnection_failed", 3, 5) assert_signal_emitted(ws_manager, "reconnection_failed", "应该发射 reconnection_failed 信号") # ============================================================================ # 常量测试 # ============================================================================ func test_websocket_url_constant(): # 测试 WebSocket URL 常量 assert_eq(WebSocketManager.WEBSOCKET_URL, "wss://whaletownend.xinghangee.icu/game", "WebSocket URL 应该匹配") func test_default_reconnect_constants(): # 测试默认重连常量 assert_eq(WebSocketManager.DEFAULT_MAX_RECONNECT_ATTEMPTS, 5, "默认最大重连次数应该是 5") assert_eq(WebSocketManager.DEFAULT_RECONNECT_BASE_DELAY, 3.0, "默认基础重连延迟应该是 3.0 秒") func test_max_reconnect_delay_constant(): # 测试最大重连延迟常量 assert_eq(WebSocketManager.MAX_RECONNECT_DELAY, 30.0, "最大重连延迟应该是 30.0 秒") # ============================================================================ # 边界条件测试 # ============================================================================ func test_zero_max_reconnect_attempts(): # 测试零次重连尝试 ws_manager.enable_auto_reconnect(true, 0, 1.0) ws_manager._reconnect_attempt = 1 var should_reconnect := ws_manager._reconnect_attempt < ws_manager._max_reconnect_attempts assert_false(should_reconnect, "最大重连次数为 0 时不应该重连") func test_very_large_reconnect_attempts(): # 测试非常大的重连次数 ws_manager.enable_auto_reconnect(true, 1000, 1.0) ws_manager._reconnect_attempt = 999 var should_reconnect := ws_manager._reconnect_attempt < ws_manager._max_reconnect_attempts assert_true(should_reconnect, "应该允许更多重连尝试") func test_zero_base_delay(): # 测试零基础延迟 ws_manager._reconnect_base_delay = 0.0 ws_manager._reconnect_attempt = 1 var delay := ws_manager._calculate_reconnect_delay() # 即使基础延迟为 0,也应该返回一个合理的值 assert_true(delay >= 0.0, "延迟应该 >= 0") # ============================================================================ # 状态转换测试 # ============================================================================ func test_state_disconnected_to_connecting(): # 测试从 DISCONNECTED 到 CONNECTING 的状态转换 ws_manager._set_connection_state(WebSocketManager.ConnectionState.CONNECTING) assert_eq(ws_manager.get_connection_state(), WebSocketManager.ConnectionState.CONNECTING, "状态应该转换为 CONNECTING") func test_state_connecting_to_connected(): # 测试从 CONNECTING 到 CONNECTED 的状态转换 ws_manager._set_connection_state(WebSocketManager.ConnectionState.CONNECTING) ws_manager._set_connection_state(WebSocketManager.ConnectionState.CONNECTED) assert_eq(ws_manager.get_connection_state(), WebSocketManager.ConnectionState.CONNECTED, "状态应该转换为 CONNECTED") func test_state_connected_to_reconnecting(): # 测试从 CONNECTED 到 RECONNECTING 的状态转换 ws_manager._set_connection_state(WebSocketManager.ConnectionState.CONNECTED) ws_manager._set_connection_state(WebSocketManager.ConnectionState.RECONNECTING) assert_eq(ws_manager.get_connection_state(), WebSocketManager.ConnectionState.RECONNECTING, "状态应该转换为 RECONNECTING")