Files
whale-town/scripts/SecurityManager.gd
2025-12-05 19:00:14 +08:00

467 lines
13 KiB
GDScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
extends Node
class_name SecurityManager
## 安全管理器
## 提供输入验证、防护措施和安全检查功能
# 安全配置常量从SecurityConfig获取
const DEFAULT_MAX_MESSAGE_LENGTH = 500
const DEFAULT_MAX_USERNAME_LENGTH = 50
const DEFAULT_MAX_CHARACTER_NAME_LENGTH = 20
const DEFAULT_MIN_CHARACTER_NAME_LENGTH = 2
# 恶意模式检测
const MALICIOUS_PATTERNS = [
"<script",
"javascript:",
"vbscript:",
"onload=",
"onerror=",
"onclick=",
"eval(",
"document.cookie",
"window.location",
"alert(",
"confirm(",
"prompt(",
"innerHTML",
"outerHTML"
]
# SQL注入模式虽然我们使用JSON但仍需防护
const SQL_INJECTION_PATTERNS = [
"'",
"\"",
";",
"--",
"/*",
"*/",
"union",
"select",
"insert",
"update",
"delete",
"drop",
"create",
"alter"
]
# 会话管理
var active_sessions: Dictionary = {}
var session_timeout: float = 1800.0 # 30分钟会话超时
var max_failed_attempts: int = 5
var failed_attempts: Dictionary = {}
var lockout_duration: float = 300.0 # 5分钟锁定时间
## 验证用户输入
static func validate_input(input: String, input_type: String) -> Dictionary:
"""
验证用户输入的安全性和有效性
@param input: 输入字符串
@param input_type: 输入类型 ("username", "character_name", "message")
@return: 验证结果 {valid: bool, error: String, sanitized: String}
"""
var result = {
"valid": false,
"error": "",
"sanitized": ""
}
# 基础检查
if input == null:
result.error = "输入不能为空"
return result
# 转换为字符串并去除首尾空格
var clean_input = str(input).strip_edges()
# 获取配置值
var max_username_length = SecurityConfig.get_config("input_validation", "max_username_length", DEFAULT_MAX_USERNAME_LENGTH)
var max_character_name_length = SecurityConfig.get_config("input_validation", "max_character_name_length", DEFAULT_MAX_CHARACTER_NAME_LENGTH)
var min_character_name_length = SecurityConfig.get_config("input_validation", "min_character_name_length", DEFAULT_MIN_CHARACTER_NAME_LENGTH)
var max_message_length = SecurityConfig.get_config("input_validation", "max_message_length", DEFAULT_MAX_MESSAGE_LENGTH)
# 长度检查
match input_type:
"username":
if clean_input.length() == 0:
result.error = "用户名不能为空"
return result
if clean_input.length() > max_username_length:
result.error = "用户名长度不能超过 %d 个字符" % max_username_length
return result
"character_name":
if clean_input.length() < min_character_name_length:
result.error = "角色名称至少需要 %d 个字符" % min_character_name_length
return result
if clean_input.length() > max_character_name_length:
result.error = "角色名称不能超过 %d 个字符" % max_character_name_length
return result
"message":
if clean_input.length() == 0:
result.error = "消息不能为空"
return result
if clean_input.length() > max_message_length:
result.error = "消息长度不能超过 %d 个字符" % max_message_length
return result
# 恶意内容检测
var malicious_check = detect_malicious_content(clean_input)
if not malicious_check.safe:
result.error = "输入包含不安全内容: " + malicious_check.reason
return result
# 清理输入
var sanitized = sanitize_input(clean_input)
result.valid = true
result.sanitized = sanitized
return result
## 检测恶意内容
static func detect_malicious_content(input: String) -> Dictionary:
"""
检测输入中的恶意内容
@param input: 输入字符串
@return: {safe: bool, reason: String}
"""
var input_lower = input.to_lower()
# 检查脚本注入
for pattern in MALICIOUS_PATTERNS:
if input_lower.contains(pattern.to_lower()):
return {
"safe": false,
"reason": "检测到潜在的脚本注入: " + pattern
}
# 检查SQL注入虽然我们不使用SQL但作为额外防护
for pattern in SQL_INJECTION_PATTERNS:
if input_lower.contains(pattern.to_lower()):
return {
"safe": false,
"reason": "检测到潜在的注入攻击: " + pattern
}
# 检查过长的重复字符可能的DoS攻击
if has_excessive_repetition(input):
return {
"safe": false,
"reason": "检测到异常的重复字符模式"
}
return {"safe": true, "reason": ""}
## 检查过度重复字符
static func has_excessive_repetition(input: String) -> bool:
"""
检查字符串是否包含过度重复的字符
@param input: 输入字符串
@return: 是否包含过度重复
"""
if input.length() < 10:
return false
var char_counts = {}
for character in input:
char_counts[character] = char_counts.get(character, 0) + 1
# 如果任何字符重复超过输入长度的70%,认为是异常
var threshold = input.length() * 0.7
for count in char_counts.values():
if count > threshold:
return true
return false
## 清理输入
static func sanitize_input(input: String) -> String:
"""
清理输入字符串,移除潜在的危险字符
@param input: 输入字符串
@return: 清理后的字符串
"""
var sanitized = input
# 移除控制字符(除了换行和制表符)
var clean_chars = []
for i in range(sanitized.length()):
var char_code = sanitized.unicode_at(i)
# 保留可打印字符、空格、换行、制表符以及Unicode字符支持中文等
if (char_code >= 32 and char_code <= 126) or char_code == 10 or char_code == 9 or char_code > 127:
clean_chars.append(sanitized[i])
sanitized = "".join(clean_chars)
# 移除HTML标签
sanitized = remove_html_tags(sanitized)
# 限制连续空格
sanitized = limit_consecutive_spaces(sanitized)
return sanitized.strip_edges()
## 移除HTML标签
static func remove_html_tags(input: String) -> String:
"""
移除HTML标签
@param input: 输入字符串
@return: 移除标签后的字符串
"""
var regex = RegEx.new()
regex.compile("<[^>]*>")
return regex.sub(input, "", true)
## 限制连续空格
static func limit_consecutive_spaces(input: String) -> String:
"""
将多个连续空格替换为单个空格
@param input: 输入字符串
@return: 处理后的字符串
"""
var regex = RegEx.new()
regex.compile("\\s+")
return regex.sub(input, " ", true)
## 验证消息格式
static func validate_message_format(message: Dictionary) -> bool:
"""
验证网络消息格式的安全性
@param message: 消息字典
@return: 是否安全
"""
# 基础格式检查
if not message.has("type") or not message.has("data") or not message.has("timestamp"):
return false
# 检查消息类型是否在允许列表中
var allowed_types = [
"auth_request",
"auth_response",
"character_create",
"character_move",
"character_state",
"dialogue_send",
"world_state",
"ping",
"pong",
"error"
]
if not message.type in allowed_types:
return false
# 检查时间戳是否合理(不能太旧或太新)
var current_time = Time.get_unix_time_from_system() * 1000 # 转换为毫秒
var msg_time = message.get("timestamp", 0)
var time_diff = abs(current_time - msg_time)
# 允许5分钟的时间差毫秒
if time_diff > 300000: # 5分钟 = 300000毫秒
return false
# 检查数据字段
if typeof(message.data) != TYPE_DICTIONARY:
return false
return true
## 创建会话
func create_session(client_id: String, username: String) -> String:
"""
创建新的用户会话
@param client_id: 客户端ID
@param username: 用户名
@return: 会话令牌
"""
var session_token = generate_session_token()
var session_data = {
"client_id": client_id,
"username": username,
"created_at": Time.get_unix_time_from_system(),
"last_activity": Time.get_unix_time_from_system(),
"is_active": true
}
active_sessions[session_token] = session_data
print("Session created for user: " + username)
return session_token
## 验证会话
func validate_session(session_token: String) -> bool:
"""
验证会话是否有效
@param session_token: 会话令牌
@return: 是否有效
"""
if not active_sessions.has(session_token):
return false
var session = active_sessions[session_token]
var current_time = Time.get_unix_time_from_system()
# 检查会话是否过期
if current_time - session.last_activity > session_timeout:
invalidate_session(session_token)
return false
# 更新最后活动时间
session.last_activity = current_time
return session.is_active
## 使会话无效
func invalidate_session(session_token: String) -> void:
"""
使会话无效
@param session_token: 会话令牌
"""
if active_sessions.has(session_token):
var session = active_sessions[session_token]
print("Session invalidated for user: " + session.get("username", "unknown"))
active_sessions.erase(session_token)
## 生成会话令牌
func generate_session_token() -> String:
"""
生成安全的会话令牌
@return: 会话令牌
"""
var timestamp = Time.get_unix_time_from_system()
var random1 = randi()
var random2 = randi()
var random3 = randi()
# 创建更复杂的令牌
var token_data = str(timestamp) + "_" + str(random1) + "_" + str(random2) + "_" + str(random3)
return token_data.sha256_text()
## 记录失败尝试
func record_failed_attempt(identifier: String) -> bool:
"""
记录失败的认证尝试
@param identifier: 标识符IP地址或客户端ID
@return: 是否应该锁定
"""
var current_time = Time.get_unix_time_from_system()
if not failed_attempts.has(identifier):
failed_attempts[identifier] = {
"count": 0,
"first_attempt": current_time,
"last_attempt": current_time,
"locked_until": 0
}
var attempt_data = failed_attempts[identifier]
# 检查是否仍在锁定期
if current_time < attempt_data.locked_until:
return true # 仍被锁定
# 如果距离第一次尝试超过1小时重置计数
if current_time - attempt_data.first_attempt > 3600:
attempt_data.count = 0
attempt_data.first_attempt = current_time
attempt_data.count += 1
attempt_data.last_attempt = current_time
# 检查是否需要锁定
if attempt_data.count >= max_failed_attempts:
attempt_data.locked_until = current_time + lockout_duration
print("WARNING: Client locked due to too many failed attempts: " + identifier)
return true
return false
## 检查是否被锁定
func is_locked(identifier: String) -> bool:
"""
检查标识符是否被锁定
@param identifier: 标识符
@return: 是否被锁定
"""
if not failed_attempts.has(identifier):
return false
var current_time = Time.get_unix_time_from_system()
var attempt_data = failed_attempts[identifier]
return current_time < attempt_data.locked_until
## 清除失败尝试记录
func clear_failed_attempts(identifier: String) -> void:
"""
清除失败尝试记录(成功认证后调用)
@param identifier: 标识符
"""
if failed_attempts.has(identifier):
failed_attempts.erase(identifier)
## 清理过期会话
func cleanup_expired_sessions() -> void:
"""清理过期的会话"""
var current_time = Time.get_unix_time_from_system()
var expired_tokens = []
for token in active_sessions:
var session = active_sessions[token]
if current_time - session.last_activity > session_timeout:
expired_tokens.append(token)
for token in expired_tokens:
invalidate_session(token)
if expired_tokens.size() > 0:
print("Cleaned up %d expired sessions" % expired_tokens.size())
## 获取安全统计
func get_security_stats() -> Dictionary:
"""
获取安全统计信息
@return: 统计信息
"""
return {
"active_sessions": active_sessions.size(),
"failed_attempts": failed_attempts.size(),
"locked_clients": _count_locked_clients()
}
## 计算被锁定的客户端数量
func _count_locked_clients() -> int:
"""计算当前被锁定的客户端数量"""
var current_time = Time.get_unix_time_from_system()
var locked_count = 0
for identifier in failed_attempts:
var attempt_data = failed_attempts[identifier]
if current_time < attempt_data.locked_until:
locked_count += 1
return locked_count
## 定期清理任务
func _ready():
"""初始化安全管理器"""
# 初始化安全配置
_initialize_config()
# 每5分钟清理一次过期会话
var cleanup_interval = SecurityConfig.get_config("session_management", "cleanup_interval", 300.0)
var cleanup_timer = Timer.new()
cleanup_timer.wait_time = cleanup_interval
cleanup_timer.timeout.connect(cleanup_expired_sessions)
cleanup_timer.autostart = true
add_child(cleanup_timer)
print("SecurityManager initialized with security level: " + SecurityConfig.get_security_level())
## 初始化配置
func _initialize_config():
"""从SecurityConfig初始化配置值"""
session_timeout = SecurityConfig.get_config("session_management", "session_timeout", 1800.0)
max_failed_attempts = SecurityConfig.get_config("session_management", "max_failed_attempts", 5)
lockout_duration = SecurityConfig.get_config("session_management", "lockout_duration", 300.0)