test:添加认证系统测试套件

- 添加UI测试场景,支持模拟各种认证场景
- 实现API测试脚本,覆盖登录、注册、验证码等接口
- 添加测试文档,说明测试用例和预期结果
- 支持自动化测试和手动测试验证
This commit is contained in:
2025-12-24 20:37:33 +08:00
parent c6bcca4e7f
commit 47cfc14f68
7 changed files with 1511 additions and 0 deletions

506
tests/api/api_test.py Normal file
View File

@@ -0,0 +1,506 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
API接口测试脚本
根据 docs/api-documentation.md 文档自动测试所有API接口
支持完整的功能测试、边界条件测试和错误处理测试
使用方法:
python tests/api/api_test.py
python tests/api/api_test.py --base-url http://localhost:3000
python tests/api/api_test.py --verbose --test-email custom@example.com
"""
import requests
import json
import time
import argparse
import sys
from datetime import datetime
from typing import Dict, Any, Optional, List
import uuid
class APITester:
"""API接口测试类"""
def __init__(self, base_url: str = "https://whaletownend.xinghangee.icu",
test_email: str = "test@example.com", verbose: bool = False):
self.base_url = base_url.rstrip('/')
self.test_email = test_email
self.verbose = verbose
self.session = requests.Session()
self.session.headers.update({
'Content-Type': 'application/json',
'User-Agent': 'whaleTown-API-Tester/1.0'
})
# 测试数据存储
self.test_data = {
'admin_token': None,
'user_token': None,
'test_user_id': None,
'verification_code': None
}
# 测试结果统计
self.stats = {
'total': 0,
'passed': 0,
'failed': 0,
'errors': []
}
def log(self, message: str, level: str = "INFO"):
"""日志输出"""
timestamp = datetime.now().strftime("%H:%M:%S")
if level == "ERROR" or self.verbose:
print(f"[{timestamp}] {level}: {message}")
def make_request(self, method: str, endpoint: str, data: Dict = None,
headers: Dict = None, expected_status: int = None) -> Dict:
"""发送HTTP请求"""
url = f"{self.base_url}{endpoint}"
request_headers = self.session.headers.copy()
if headers:
request_headers.update(headers)
try:
if method.upper() == 'GET':
response = self.session.get(url, headers=request_headers, timeout=30)
elif method.upper() == 'POST':
response = self.session.post(url, json=data, headers=request_headers, timeout=30)
elif method.upper() == 'PUT':
response = self.session.put(url, json=data, headers=request_headers, timeout=30)
else:
raise ValueError(f"不支持的HTTP方法: {method}")
# 检查状态码
if expected_status and response.status_code != expected_status:
self.log(f"状态码不匹配: 期望 {expected_status}, 实际 {response.status_code}", "ERROR")
# 尝试解析JSON
try:
result = response.json()
except json.JSONDecodeError:
result = {"raw_response": response.text, "status_code": response.status_code}
result['status_code'] = response.status_code
return result
except requests.exceptions.RequestException as e:
self.log(f"请求失败: {str(e)}", "ERROR")
return {"error": str(e), "status_code": 0}
def assert_response(self, response: Dict, expected_success: bool = True,
expected_fields: List[str] = None, test_name: str = ""):
"""验证响应结果"""
self.stats['total'] += 1
try:
# 检查基本结构
if 'error' in response:
raise AssertionError(f"请求错误: {response['error']}")
# 检查success字段
if 'success' in response:
if response['success'] != expected_success:
raise AssertionError(f"success字段不匹配: 期望 {expected_success}, 实际 {response['success']}")
# 检查必需字段
if expected_fields:
for field in expected_fields:
if field not in response:
raise AssertionError(f"缺少必需字段: {field}")
self.stats['passed'] += 1
self.log(f"{test_name} - 通过")
return True
except AssertionError as e:
self.stats['failed'] += 1
error_msg = f"{test_name} - 失败: {str(e)}"
self.log(error_msg, "ERROR")
self.stats['errors'].append(error_msg)
return False
def test_app_status(self):
"""测试应用状态接口"""
self.log("开始测试应用状态接口...")
response = self.make_request('GET', '/')
# 应用状态接口可能返回不同的字段结构
if response.get('status_code') == 200:
# 检查是否有基本的状态信息
has_status_info = any(key in response for key in ['service', 'status', 'version', 'timestamp'])
if has_status_info:
self.stats['total'] += 1
self.stats['passed'] += 1
self.log(f"✅ 获取应用状态 - 通过")
else:
self.stats['total'] += 1
self.stats['failed'] += 1
error_msg = f"❌ 获取应用状态 - 失败: 响应格式不正确"
self.log(error_msg, "ERROR")
self.stats['errors'].append(error_msg)
else:
self.stats['total'] += 1
self.stats['failed'] += 1
error_msg = f"❌ 获取应用状态 - 失败: HTTP状态码 {response.get('status_code')}"
self.log(error_msg, "ERROR")
self.stats['errors'].append(error_msg)
def test_send_verification_code(self) -> Optional[str]:
"""测试发送邮箱验证码"""
self.log("开始测试发送邮箱验证码...")
response = self.make_request('POST', '/auth/send-email-verification', {
'email': self.test_email
})
# 测试模式下返回206状态码success为false但有验证码
if response.get('status_code') == 206:
if 'data' in response and 'verification_code' in response['data']:
verification_code = response['data']['verification_code']
self.test_data['verification_code'] = verification_code
self.log(f"获取到验证码: {verification_code}")
self.stats['total'] += 1
self.stats['passed'] += 1
self.log(f"✅ 发送邮箱验证码(测试模式) - 通过")
return verification_code
elif response.get('success') == True:
# 正常模式
self.assert_response(
response,
expected_success=True,
expected_fields=['data'],
test_name="发送邮箱验证码"
)
if 'data' in response and 'verification_code' in response['data']:
return response['data']['verification_code']
# 测试失败
self.stats['total'] += 1
self.stats['failed'] += 1
error_msg = f"❌ 发送邮箱验证码 - 失败: 无法获取验证码"
self.log(error_msg, "ERROR")
self.stats['errors'].append(error_msg)
return None
def test_user_register(self) -> Optional[str]:
"""测试用户注册"""
self.log("开始测试用户注册...")
# 先获取验证码
verification_code = self.test_send_verification_code()
if not verification_code:
self.log("无法获取验证码,跳过注册测试", "ERROR")
return None
# 生成唯一用户名
username = f"testuser_{int(time.time())}"
response = self.make_request('POST', '/auth/register', {
'username': username,
'password': 'Test123456',
'nickname': '测试用户',
'email': self.test_email
})
if self.assert_response(
response,
expected_success=True,
expected_fields=['data'],
test_name="用户注册"
):
if 'data' in response and 'user' in response['data']:
user_id = response['data']['user'].get('id')
self.test_data['test_user_id'] = user_id
self.log(f"注册成功用户ID: {user_id}")
# 保存用户token
if 'access_token' in response['data']:
self.test_data['user_token'] = response['data']['access_token']
return user_id
return None
def test_user_login(self):
"""测试用户登录"""
self.log("开始测试用户登录...")
# 使用已注册的用户登录
username = f"testuser_{int(time.time() - 1)}" # 使用之前的用户名
response = self.make_request('POST', '/auth/login', {
'identifier': username,
'password': 'Test123456'
})
# 如果用户不存在,尝试注册后登录
if not response.get('success'):
self.log("用户不存在,先注册用户...")
self.test_user_register()
# 重新尝试登录
response = self.make_request('POST', '/auth/login', {
'identifier': username,
'password': 'Test123456'
})
self.assert_response(
response,
expected_success=True,
expected_fields=['data'],
test_name="用户登录"
)
def test_admin_login(self) -> Optional[str]:
"""测试管理员登录"""
self.log("开始测试管理员登录...")
# 尝试不同的管理员登录路径
admin_endpoints = ['/admin/auth/login', '/admin/login']
for endpoint in admin_endpoints:
response = self.make_request('POST', endpoint, {
'identifier': 'admin',
'password': 'Admin123456'
})
if response.get('status_code') != 404:
if self.assert_response(
response,
expected_success=True,
expected_fields=['data'],
test_name=f"管理员登录({endpoint})"
):
if 'data' in response and 'access_token' in response['data']:
admin_token = response['data']['access_token']
self.test_data['admin_token'] = admin_token
self.log("管理员登录成功")
return admin_token
break
# 如果所有端点都失败
self.stats['total'] += 1
self.stats['failed'] += 1
error_msg = f"❌ 管理员登录 - 失败: 所有管理员登录端点都不可用"
self.log(error_msg, "ERROR")
self.stats['errors'].append(error_msg)
return None
def test_admin_get_users(self):
"""测试获取用户列表"""
self.log("开始测试获取用户列表...")
admin_token = self.test_data.get('admin_token')
if not admin_token:
admin_token = self.test_admin_login()
if not admin_token:
self.log("无管理员token跳过用户列表测试", "ERROR")
return
response = self.make_request('GET', '/admin/users?limit=10&offset=0',
headers={'Authorization': f'Bearer {admin_token}'})
self.assert_response(
response,
expected_success=True,
expected_fields=['data'],
test_name="获取用户列表"
)
def test_user_status_management(self):
"""测试用户状态管理"""
self.log("开始测试用户状态管理...")
admin_token = self.test_data.get('admin_token')
user_id = self.test_data.get('test_user_id')
if not admin_token:
admin_token = self.test_admin_login()
if not user_id:
user_id = self.test_user_register()
if not admin_token or not user_id:
self.log("缺少必要数据,跳过用户状态管理测试", "ERROR")
return
# 测试修改用户状态
response = self.make_request('PUT', f'/admin/users/{user_id}/status',
data={
'status': 'locked',
'reason': '测试锁定'
},
headers={'Authorization': f'Bearer {admin_token}'})
self.assert_response(
response,
expected_success=True,
expected_fields=['data'],
test_name="修改用户状态"
)
# 测试用户状态统计
response = self.make_request('GET', '/admin/users/status-stats',
headers={'Authorization': f'Bearer {admin_token}'})
self.assert_response(
response,
expected_success=True,
expected_fields=['data'],
test_name="用户状态统计"
)
def test_error_cases(self):
"""测试错误情况"""
self.log("开始测试错误情况...")
# 测试无效登录
response = self.make_request('POST', '/auth/login', {
'identifier': 'nonexistent',
'password': 'wrongpassword'
})
self.assert_response(
response,
expected_success=False,
expected_fields=['error_code'],
test_name="无效用户登录"
)
# 测试无效验证码
response = self.make_request('POST', '/auth/verify-email', {
'email': self.test_email,
'verification_code': '000000'
})
self.assert_response(
response,
expected_success=False,
test_name="无效验证码验证"
)
# 测试无权限访问管理员接口
response = self.make_request('GET', '/admin/users')
# 应该返回401或403
if response.get('status_code') in [401, 403]:
self.assert_response(
response,
expected_success=False,
test_name="无权限访问管理员接口"
)
def test_rate_limiting(self):
"""测试频率限制"""
self.log("开始测试频率限制...")
# 快速连续发送验证码请求
for i in range(3):
response = self.make_request('POST', '/auth/send-email-verification', {
'email': f'ratelimit{i}@example.com'
})
if response.get('status_code') == 429:
self.assert_response(
response,
expected_success=False,
test_name="频率限制触发"
)
break
time.sleep(0.1) # 短暂延迟
def run_all_tests(self):
"""运行所有测试"""
self.log("🚀 开始API接口测试...")
self.log(f"测试目标: {self.base_url}")
self.log(f"测试邮箱: {self.test_email}")
start_time = time.time()
try:
# 基础功能测试
self.test_app_status()
# 认证相关测试
self.test_send_verification_code()
self.test_user_register()
self.test_user_login()
# 管理员功能测试
self.test_admin_login()
self.test_admin_get_users()
self.test_user_status_management()
# 错误处理测试
self.test_error_cases()
# 安全功能测试
self.test_rate_limiting()
except KeyboardInterrupt:
self.log("测试被用户中断", "ERROR")
except Exception as e:
self.log(f"测试过程中发生异常: {str(e)}", "ERROR")
# 输出测试结果
end_time = time.time()
duration = end_time - start_time
print("\n" + "="*60)
print("📊 测试结果统计")
print("="*60)
print(f"总测试数: {self.stats['total']}")
print(f"通过数量: {self.stats['passed']}")
print(f"失败数量: {self.stats['failed']}")
print(f"成功率: {(self.stats['passed']/self.stats['total']*100):.1f}%" if self.stats['total'] > 0 else "0%")
print(f"测试耗时: {duration:.2f}")
if self.stats['errors']:
print("\n❌ 失败的测试:")
for error in self.stats['errors']:
print(f" {error}")
if self.stats['failed'] == 0:
print("\n🎉 所有测试通过!")
return True
else:
print(f"\n⚠️ 有 {self.stats['failed']} 个测试失败")
return False
def main():
"""主函数"""
parser = argparse.ArgumentParser(description='whaleTown API接口测试工具')
parser.add_argument('--base-url', default='https://whaletownend.xinghangee.icu',
help='API服务器地址 (默认: https://whaletownend.xinghangee.icu)')
parser.add_argument('--test-email', default='test@example.com',
help='测试邮箱地址 (默认: test@example.com)')
parser.add_argument('--verbose', '-v', action='store_true',
help='显示详细日志')
args = parser.parse_args()
# 创建测试器并运行测试
tester = APITester(
base_url=args.base_url,
test_email=args.test_email,
verbose=args.verbose
)
success = tester.run_all_tests()
# 根据测试结果设置退出码
sys.exit(0 if success else 1)
if __name__ == '__main__':
main()