Files
whale-town-front/tests/api/api_test.py
moyin 47cfc14f68 test:添加认证系统测试套件
- 添加UI测试场景,支持模拟各种认证场景
- 实现API测试脚本,覆盖登录、注册、验证码等接口
- 添加测试文档,说明测试用例和预期结果
- 支持自动化测试和手动测试验证
2025-12-24 20:37:33 +08:00

506 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
API接口测试脚本
根据 docs/api-documentation.md 文档自动测试所有API接口
支持完整的功能测试、边界条件测试和错误处理测试
使用方法:
python tests/api/api_test.py
python tests/api/api_test.py --base-url http://localhost:3000
python tests/api/api_test.py --verbose --test-email custom@example.com
"""
import requests
import json
import time
import argparse
import sys
from datetime import datetime
from typing import Dict, Any, Optional, List
import uuid
class APITester:
"""API接口测试类"""
def __init__(self, base_url: str = "https://whaletownend.xinghangee.icu",
test_email: str = "test@example.com", verbose: bool = False):
self.base_url = base_url.rstrip('/')
self.test_email = test_email
self.verbose = verbose
self.session = requests.Session()
self.session.headers.update({
'Content-Type': 'application/json',
'User-Agent': 'whaleTown-API-Tester/1.0'
})
# 测试数据存储
self.test_data = {
'admin_token': None,
'user_token': None,
'test_user_id': None,
'verification_code': None
}
# 测试结果统计
self.stats = {
'total': 0,
'passed': 0,
'failed': 0,
'errors': []
}
def log(self, message: str, level: str = "INFO"):
"""日志输出"""
timestamp = datetime.now().strftime("%H:%M:%S")
if level == "ERROR" or self.verbose:
print(f"[{timestamp}] {level}: {message}")
def make_request(self, method: str, endpoint: str, data: Dict = None,
headers: Dict = None, expected_status: int = None) -> Dict:
"""发送HTTP请求"""
url = f"{self.base_url}{endpoint}"
request_headers = self.session.headers.copy()
if headers:
request_headers.update(headers)
try:
if method.upper() == 'GET':
response = self.session.get(url, headers=request_headers, timeout=30)
elif method.upper() == 'POST':
response = self.session.post(url, json=data, headers=request_headers, timeout=30)
elif method.upper() == 'PUT':
response = self.session.put(url, json=data, headers=request_headers, timeout=30)
else:
raise ValueError(f"不支持的HTTP方法: {method}")
# 检查状态码
if expected_status and response.status_code != expected_status:
self.log(f"状态码不匹配: 期望 {expected_status}, 实际 {response.status_code}", "ERROR")
# 尝试解析JSON
try:
result = response.json()
except json.JSONDecodeError:
result = {"raw_response": response.text, "status_code": response.status_code}
result['status_code'] = response.status_code
return result
except requests.exceptions.RequestException as e:
self.log(f"请求失败: {str(e)}", "ERROR")
return {"error": str(e), "status_code": 0}
def assert_response(self, response: Dict, expected_success: bool = True,
expected_fields: List[str] = None, test_name: str = ""):
"""验证响应结果"""
self.stats['total'] += 1
try:
# 检查基本结构
if 'error' in response:
raise AssertionError(f"请求错误: {response['error']}")
# 检查success字段
if 'success' in response:
if response['success'] != expected_success:
raise AssertionError(f"success字段不匹配: 期望 {expected_success}, 实际 {response['success']}")
# 检查必需字段
if expected_fields:
for field in expected_fields:
if field not in response:
raise AssertionError(f"缺少必需字段: {field}")
self.stats['passed'] += 1
self.log(f"{test_name} - 通过")
return True
except AssertionError as e:
self.stats['failed'] += 1
error_msg = f"{test_name} - 失败: {str(e)}"
self.log(error_msg, "ERROR")
self.stats['errors'].append(error_msg)
return False
def test_app_status(self):
"""测试应用状态接口"""
self.log("开始测试应用状态接口...")
response = self.make_request('GET', '/')
# 应用状态接口可能返回不同的字段结构
if response.get('status_code') == 200:
# 检查是否有基本的状态信息
has_status_info = any(key in response for key in ['service', 'status', 'version', 'timestamp'])
if has_status_info:
self.stats['total'] += 1
self.stats['passed'] += 1
self.log(f"✅ 获取应用状态 - 通过")
else:
self.stats['total'] += 1
self.stats['failed'] += 1
error_msg = f"❌ 获取应用状态 - 失败: 响应格式不正确"
self.log(error_msg, "ERROR")
self.stats['errors'].append(error_msg)
else:
self.stats['total'] += 1
self.stats['failed'] += 1
error_msg = f"❌ 获取应用状态 - 失败: HTTP状态码 {response.get('status_code')}"
self.log(error_msg, "ERROR")
self.stats['errors'].append(error_msg)
def test_send_verification_code(self) -> Optional[str]:
"""测试发送邮箱验证码"""
self.log("开始测试发送邮箱验证码...")
response = self.make_request('POST', '/auth/send-email-verification', {
'email': self.test_email
})
# 测试模式下返回206状态码success为false但有验证码
if response.get('status_code') == 206:
if 'data' in response and 'verification_code' in response['data']:
verification_code = response['data']['verification_code']
self.test_data['verification_code'] = verification_code
self.log(f"获取到验证码: {verification_code}")
self.stats['total'] += 1
self.stats['passed'] += 1
self.log(f"✅ 发送邮箱验证码(测试模式) - 通过")
return verification_code
elif response.get('success') == True:
# 正常模式
self.assert_response(
response,
expected_success=True,
expected_fields=['data'],
test_name="发送邮箱验证码"
)
if 'data' in response and 'verification_code' in response['data']:
return response['data']['verification_code']
# 测试失败
self.stats['total'] += 1
self.stats['failed'] += 1
error_msg = f"❌ 发送邮箱验证码 - 失败: 无法获取验证码"
self.log(error_msg, "ERROR")
self.stats['errors'].append(error_msg)
return None
def test_user_register(self) -> Optional[str]:
"""测试用户注册"""
self.log("开始测试用户注册...")
# 先获取验证码
verification_code = self.test_send_verification_code()
if not verification_code:
self.log("无法获取验证码,跳过注册测试", "ERROR")
return None
# 生成唯一用户名
username = f"testuser_{int(time.time())}"
response = self.make_request('POST', '/auth/register', {
'username': username,
'password': 'Test123456',
'nickname': '测试用户',
'email': self.test_email
})
if self.assert_response(
response,
expected_success=True,
expected_fields=['data'],
test_name="用户注册"
):
if 'data' in response and 'user' in response['data']:
user_id = response['data']['user'].get('id')
self.test_data['test_user_id'] = user_id
self.log(f"注册成功用户ID: {user_id}")
# 保存用户token
if 'access_token' in response['data']:
self.test_data['user_token'] = response['data']['access_token']
return user_id
return None
def test_user_login(self):
"""测试用户登录"""
self.log("开始测试用户登录...")
# 使用已注册的用户登录
username = f"testuser_{int(time.time() - 1)}" # 使用之前的用户名
response = self.make_request('POST', '/auth/login', {
'identifier': username,
'password': 'Test123456'
})
# 如果用户不存在,尝试注册后登录
if not response.get('success'):
self.log("用户不存在,先注册用户...")
self.test_user_register()
# 重新尝试登录
response = self.make_request('POST', '/auth/login', {
'identifier': username,
'password': 'Test123456'
})
self.assert_response(
response,
expected_success=True,
expected_fields=['data'],
test_name="用户登录"
)
def test_admin_login(self) -> Optional[str]:
"""测试管理员登录"""
self.log("开始测试管理员登录...")
# 尝试不同的管理员登录路径
admin_endpoints = ['/admin/auth/login', '/admin/login']
for endpoint in admin_endpoints:
response = self.make_request('POST', endpoint, {
'identifier': 'admin',
'password': 'Admin123456'
})
if response.get('status_code') != 404:
if self.assert_response(
response,
expected_success=True,
expected_fields=['data'],
test_name=f"管理员登录({endpoint})"
):
if 'data' in response and 'access_token' in response['data']:
admin_token = response['data']['access_token']
self.test_data['admin_token'] = admin_token
self.log("管理员登录成功")
return admin_token
break
# 如果所有端点都失败
self.stats['total'] += 1
self.stats['failed'] += 1
error_msg = f"❌ 管理员登录 - 失败: 所有管理员登录端点都不可用"
self.log(error_msg, "ERROR")
self.stats['errors'].append(error_msg)
return None
def test_admin_get_users(self):
"""测试获取用户列表"""
self.log("开始测试获取用户列表...")
admin_token = self.test_data.get('admin_token')
if not admin_token:
admin_token = self.test_admin_login()
if not admin_token:
self.log("无管理员token跳过用户列表测试", "ERROR")
return
response = self.make_request('GET', '/admin/users?limit=10&offset=0',
headers={'Authorization': f'Bearer {admin_token}'})
self.assert_response(
response,
expected_success=True,
expected_fields=['data'],
test_name="获取用户列表"
)
def test_user_status_management(self):
"""测试用户状态管理"""
self.log("开始测试用户状态管理...")
admin_token = self.test_data.get('admin_token')
user_id = self.test_data.get('test_user_id')
if not admin_token:
admin_token = self.test_admin_login()
if not user_id:
user_id = self.test_user_register()
if not admin_token or not user_id:
self.log("缺少必要数据,跳过用户状态管理测试", "ERROR")
return
# 测试修改用户状态
response = self.make_request('PUT', f'/admin/users/{user_id}/status',
data={
'status': 'locked',
'reason': '测试锁定'
},
headers={'Authorization': f'Bearer {admin_token}'})
self.assert_response(
response,
expected_success=True,
expected_fields=['data'],
test_name="修改用户状态"
)
# 测试用户状态统计
response = self.make_request('GET', '/admin/users/status-stats',
headers={'Authorization': f'Bearer {admin_token}'})
self.assert_response(
response,
expected_success=True,
expected_fields=['data'],
test_name="用户状态统计"
)
def test_error_cases(self):
"""测试错误情况"""
self.log("开始测试错误情况...")
# 测试无效登录
response = self.make_request('POST', '/auth/login', {
'identifier': 'nonexistent',
'password': 'wrongpassword'
})
self.assert_response(
response,
expected_success=False,
expected_fields=['error_code'],
test_name="无效用户登录"
)
# 测试无效验证码
response = self.make_request('POST', '/auth/verify-email', {
'email': self.test_email,
'verification_code': '000000'
})
self.assert_response(
response,
expected_success=False,
test_name="无效验证码验证"
)
# 测试无权限访问管理员接口
response = self.make_request('GET', '/admin/users')
# 应该返回401或403
if response.get('status_code') in [401, 403]:
self.assert_response(
response,
expected_success=False,
test_name="无权限访问管理员接口"
)
def test_rate_limiting(self):
"""测试频率限制"""
self.log("开始测试频率限制...")
# 快速连续发送验证码请求
for i in range(3):
response = self.make_request('POST', '/auth/send-email-verification', {
'email': f'ratelimit{i}@example.com'
})
if response.get('status_code') == 429:
self.assert_response(
response,
expected_success=False,
test_name="频率限制触发"
)
break
time.sleep(0.1) # 短暂延迟
def run_all_tests(self):
"""运行所有测试"""
self.log("🚀 开始API接口测试...")
self.log(f"测试目标: {self.base_url}")
self.log(f"测试邮箱: {self.test_email}")
start_time = time.time()
try:
# 基础功能测试
self.test_app_status()
# 认证相关测试
self.test_send_verification_code()
self.test_user_register()
self.test_user_login()
# 管理员功能测试
self.test_admin_login()
self.test_admin_get_users()
self.test_user_status_management()
# 错误处理测试
self.test_error_cases()
# 安全功能测试
self.test_rate_limiting()
except KeyboardInterrupt:
self.log("测试被用户中断", "ERROR")
except Exception as e:
self.log(f"测试过程中发生异常: {str(e)}", "ERROR")
# 输出测试结果
end_time = time.time()
duration = end_time - start_time
print("\n" + "="*60)
print("📊 测试结果统计")
print("="*60)
print(f"总测试数: {self.stats['total']}")
print(f"通过数量: {self.stats['passed']}")
print(f"失败数量: {self.stats['failed']}")
print(f"成功率: {(self.stats['passed']/self.stats['total']*100):.1f}%" if self.stats['total'] > 0 else "0%")
print(f"测试耗时: {duration:.2f}")
if self.stats['errors']:
print("\n❌ 失败的测试:")
for error in self.stats['errors']:
print(f" {error}")
if self.stats['failed'] == 0:
print("\n🎉 所有测试通过!")
return True
else:
print(f"\n⚠️ 有 {self.stats['failed']} 个测试失败")
return False
def main():
"""主函数"""
parser = argparse.ArgumentParser(description='whaleTown API接口测试工具')
parser.add_argument('--base-url', default='https://whaletownend.xinghangee.icu',
help='API服务器地址 (默认: https://whaletownend.xinghangee.icu)')
parser.add_argument('--test-email', default='test@example.com',
help='测试邮箱地址 (默认: test@example.com)')
parser.add_argument('--verbose', '-v', action='store_true',
help='显示详细日志')
args = parser.parse_args()
# 创建测试器并运行测试
tester = APITester(
base_url=args.base_url,
test_email=args.test_email,
verbose=args.verbose
)
success = tester.run_all_tests()
# 根据测试结果设置退出码
sys.exit(0 if success else 1)
if __name__ == '__main__':
main()