#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ API接口测试脚本 根据 docs/api-documentation.md 文档自动测试所有API接口 支持完整的功能测试、边界条件测试和错误处理测试 使用方法: python tests/api/api_test.py python tests/api/api_test.py --base-url http://localhost:3000 python tests/api/api_test.py --verbose --test-email custom@example.com """ import requests import json import time import argparse import sys from datetime import datetime from typing import Dict, Any, Optional, List import uuid class APITester: """API接口测试类""" def __init__(self, base_url: str = "https://whaletownend.xinghangee.icu", test_email: str = "test@example.com", verbose: bool = False): self.base_url = base_url.rstrip('/') self.test_email = test_email self.verbose = verbose self.session = requests.Session() self.session.headers.update({ 'Content-Type': 'application/json', 'User-Agent': 'whaleTown-API-Tester/1.0' }) # 测试数据存储 self.test_data = { 'admin_token': None, 'user_token': None, 'test_user_id': None, 'verification_code': None } # 测试结果统计 self.stats = { 'total': 0, 'passed': 0, 'failed': 0, 'errors': [] } def log(self, message: str, level: str = "INFO"): """日志输出""" timestamp = datetime.now().strftime("%H:%M:%S") if level == "ERROR" or self.verbose: print(f"[{timestamp}] {level}: {message}") def make_request(self, method: str, endpoint: str, data: Dict = None, headers: Dict = None, expected_status: int = None) -> Dict: """发送HTTP请求""" url = f"{self.base_url}{endpoint}" request_headers = self.session.headers.copy() if headers: request_headers.update(headers) try: if method.upper() == 'GET': response = self.session.get(url, headers=request_headers, timeout=30) elif method.upper() == 'POST': response = self.session.post(url, json=data, headers=request_headers, timeout=30) elif method.upper() == 'PUT': response = self.session.put(url, json=data, headers=request_headers, timeout=30) else: raise ValueError(f"不支持的HTTP方法: {method}") # 检查状态码 if expected_status and response.status_code != expected_status: self.log(f"状态码不匹配: 期望 {expected_status}, 实际 {response.status_code}", "ERROR") # 尝试解析JSON try: result = response.json() except json.JSONDecodeError: result = {"raw_response": response.text, "status_code": response.status_code} result['status_code'] = response.status_code return result except requests.exceptions.RequestException as e: self.log(f"请求失败: {str(e)}", "ERROR") return {"error": str(e), "status_code": 0} def assert_response(self, response: Dict, expected_success: bool = True, expected_fields: List[str] = None, test_name: str = ""): """验证响应结果""" self.stats['total'] += 1 try: # 检查基本结构 if 'error' in response: raise AssertionError(f"请求错误: {response['error']}") # 检查success字段 if 'success' in response: if response['success'] != expected_success: raise AssertionError(f"success字段不匹配: 期望 {expected_success}, 实际 {response['success']}") # 检查必需字段 if expected_fields: for field in expected_fields: if field not in response: raise AssertionError(f"缺少必需字段: {field}") self.stats['passed'] += 1 self.log(f"✅ {test_name} - 通过") return True except AssertionError as e: self.stats['failed'] += 1 error_msg = f"❌ {test_name} - 失败: {str(e)}" self.log(error_msg, "ERROR") self.stats['errors'].append(error_msg) return False def test_app_status(self): """测试应用状态接口""" self.log("开始测试应用状态接口...") response = self.make_request('GET', '/') # 应用状态接口可能返回不同的字段结构 if response.get('status_code') == 200: # 检查是否有基本的状态信息 has_status_info = any(key in response for key in ['service', 'status', 'version', 'timestamp']) if has_status_info: self.stats['total'] += 1 self.stats['passed'] += 1 self.log(f"✅ 获取应用状态 - 通过") else: self.stats['total'] += 1 self.stats['failed'] += 1 error_msg = f"❌ 获取应用状态 - 失败: 响应格式不正确" self.log(error_msg, "ERROR") self.stats['errors'].append(error_msg) else: self.stats['total'] += 1 self.stats['failed'] += 1 error_msg = f"❌ 获取应用状态 - 失败: HTTP状态码 {response.get('status_code')}" self.log(error_msg, "ERROR") self.stats['errors'].append(error_msg) def test_send_verification_code(self) -> Optional[str]: """测试发送邮箱验证码""" self.log("开始测试发送邮箱验证码...") response = self.make_request('POST', '/auth/send-email-verification', { 'email': self.test_email }) # 测试模式下返回206状态码,success为false但有验证码 if response.get('status_code') == 206: if 'data' in response and 'verification_code' in response['data']: verification_code = response['data']['verification_code'] self.test_data['verification_code'] = verification_code self.log(f"获取到验证码: {verification_code}") self.stats['total'] += 1 self.stats['passed'] += 1 self.log(f"✅ 发送邮箱验证码(测试模式) - 通过") return verification_code elif response.get('success') == True: # 正常模式 self.assert_response( response, expected_success=True, expected_fields=['data'], test_name="发送邮箱验证码" ) if 'data' in response and 'verification_code' in response['data']: return response['data']['verification_code'] # 测试失败 self.stats['total'] += 1 self.stats['failed'] += 1 error_msg = f"❌ 发送邮箱验证码 - 失败: 无法获取验证码" self.log(error_msg, "ERROR") self.stats['errors'].append(error_msg) return None def test_user_register(self) -> Optional[str]: """测试用户注册""" self.log("开始测试用户注册...") # 先获取验证码 verification_code = self.test_send_verification_code() if not verification_code: self.log("无法获取验证码,跳过注册测试", "ERROR") return None # 生成唯一用户名 username = f"testuser_{int(time.time())}" response = self.make_request('POST', '/auth/register', { 'username': username, 'password': 'Test123456', 'nickname': '测试用户', 'email': self.test_email }) if self.assert_response( response, expected_success=True, expected_fields=['data'], test_name="用户注册" ): if 'data' in response and 'user' in response['data']: user_id = response['data']['user'].get('id') self.test_data['test_user_id'] = user_id self.log(f"注册成功,用户ID: {user_id}") # 保存用户token if 'access_token' in response['data']: self.test_data['user_token'] = response['data']['access_token'] return user_id return None def test_user_login(self): """测试用户登录""" self.log("开始测试用户登录...") # 使用已注册的用户登录 username = f"testuser_{int(time.time() - 1)}" # 使用之前的用户名 response = self.make_request('POST', '/auth/login', { 'identifier': username, 'password': 'Test123456' }) # 如果用户不存在,尝试注册后登录 if not response.get('success'): self.log("用户不存在,先注册用户...") self.test_user_register() # 重新尝试登录 response = self.make_request('POST', '/auth/login', { 'identifier': username, 'password': 'Test123456' }) self.assert_response( response, expected_success=True, expected_fields=['data'], test_name="用户登录" ) def test_admin_login(self) -> Optional[str]: """测试管理员登录""" self.log("开始测试管理员登录...") # 尝试不同的管理员登录路径 admin_endpoints = ['/admin/auth/login', '/admin/login'] for endpoint in admin_endpoints: response = self.make_request('POST', endpoint, { 'identifier': 'admin', 'password': 'Admin123456' }) if response.get('status_code') != 404: if self.assert_response( response, expected_success=True, expected_fields=['data'], test_name=f"管理员登录({endpoint})" ): if 'data' in response and 'access_token' in response['data']: admin_token = response['data']['access_token'] self.test_data['admin_token'] = admin_token self.log("管理员登录成功") return admin_token break # 如果所有端点都失败 self.stats['total'] += 1 self.stats['failed'] += 1 error_msg = f"❌ 管理员登录 - 失败: 所有管理员登录端点都不可用" self.log(error_msg, "ERROR") self.stats['errors'].append(error_msg) return None def test_admin_get_users(self): """测试获取用户列表""" self.log("开始测试获取用户列表...") admin_token = self.test_data.get('admin_token') if not admin_token: admin_token = self.test_admin_login() if not admin_token: self.log("无管理员token,跳过用户列表测试", "ERROR") return response = self.make_request('GET', '/admin/users?limit=10&offset=0', headers={'Authorization': f'Bearer {admin_token}'}) self.assert_response( response, expected_success=True, expected_fields=['data'], test_name="获取用户列表" ) def test_user_status_management(self): """测试用户状态管理""" self.log("开始测试用户状态管理...") admin_token = self.test_data.get('admin_token') user_id = self.test_data.get('test_user_id') if not admin_token: admin_token = self.test_admin_login() if not user_id: user_id = self.test_user_register() if not admin_token or not user_id: self.log("缺少必要数据,跳过用户状态管理测试", "ERROR") return # 测试修改用户状态 response = self.make_request('PUT', f'/admin/users/{user_id}/status', data={ 'status': 'locked', 'reason': '测试锁定' }, headers={'Authorization': f'Bearer {admin_token}'}) self.assert_response( response, expected_success=True, expected_fields=['data'], test_name="修改用户状态" ) # 测试用户状态统计 response = self.make_request('GET', '/admin/users/status-stats', headers={'Authorization': f'Bearer {admin_token}'}) self.assert_response( response, expected_success=True, expected_fields=['data'], test_name="用户状态统计" ) def test_error_cases(self): """测试错误情况""" self.log("开始测试错误情况...") # 测试无效登录 response = self.make_request('POST', '/auth/login', { 'identifier': 'nonexistent', 'password': 'wrongpassword' }) self.assert_response( response, expected_success=False, expected_fields=['error_code'], test_name="无效用户登录" ) # 测试无效验证码 response = self.make_request('POST', '/auth/verify-email', { 'email': self.test_email, 'verification_code': '000000' }) self.assert_response( response, expected_success=False, test_name="无效验证码验证" ) # 测试无权限访问管理员接口 response = self.make_request('GET', '/admin/users') # 应该返回401或403 if response.get('status_code') in [401, 403]: self.assert_response( response, expected_success=False, test_name="无权限访问管理员接口" ) def test_rate_limiting(self): """测试频率限制""" self.log("开始测试频率限制...") # 快速连续发送验证码请求 for i in range(3): response = self.make_request('POST', '/auth/send-email-verification', { 'email': f'ratelimit{i}@example.com' }) if response.get('status_code') == 429: self.assert_response( response, expected_success=False, test_name="频率限制触发" ) break time.sleep(0.1) # 短暂延迟 def run_all_tests(self): """运行所有测试""" self.log("🚀 开始API接口测试...") self.log(f"测试目标: {self.base_url}") self.log(f"测试邮箱: {self.test_email}") start_time = time.time() try: # 基础功能测试 self.test_app_status() # 认证相关测试 self.test_send_verification_code() self.test_user_register() self.test_user_login() # 管理员功能测试 self.test_admin_login() self.test_admin_get_users() self.test_user_status_management() # 错误处理测试 self.test_error_cases() # 安全功能测试 self.test_rate_limiting() except KeyboardInterrupt: self.log("测试被用户中断", "ERROR") except Exception as e: self.log(f"测试过程中发生异常: {str(e)}", "ERROR") # 输出测试结果 end_time = time.time() duration = end_time - start_time print("\n" + "="*60) print("📊 测试结果统计") print("="*60) print(f"总测试数: {self.stats['total']}") print(f"通过数量: {self.stats['passed']}") print(f"失败数量: {self.stats['failed']}") print(f"成功率: {(self.stats['passed']/self.stats['total']*100):.1f}%" if self.stats['total'] > 0 else "0%") print(f"测试耗时: {duration:.2f}秒") if self.stats['errors']: print("\n❌ 失败的测试:") for error in self.stats['errors']: print(f" {error}") if self.stats['failed'] == 0: print("\n🎉 所有测试通过!") return True else: print(f"\n⚠️ 有 {self.stats['failed']} 个测试失败") return False def main(): """主函数""" parser = argparse.ArgumentParser(description='whaleTown API接口测试工具') parser.add_argument('--base-url', default='https://whaletownend.xinghangee.icu', help='API服务器地址 (默认: https://whaletownend.xinghangee.icu)') parser.add_argument('--test-email', default='test@example.com', help='测试邮箱地址 (默认: test@example.com)') parser.add_argument('--verbose', '-v', action='store_true', help='显示详细日志') args = parser.parse_args() # 创建测试器并运行测试 tester = APITester( base_url=args.base_url, test_email=args.test_email, verbose=args.verbose ) success = tester.run_all_tests() # 根据测试结果设置退出码 sys.exit(0 if success else 1) if __name__ == '__main__': main()