""" 认证模块:用户注册、登录、邮箱验证码(支持 Redis / 数据库双存储与自动降级) """ import json import math import os import logging from datetime import timedelta, timezone from typing import Optional, Tuple from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy.orm import Session from app.api.dependencies import get_db from app.core.security import ( create_access_token, generate_verification_code, hash_password, hash_verification_code, verify_password, verify_verification_code, ) from app.models.models import AppUser, VerificationPurpose, utcnow from app.schemas.auth_schema import ( AuthTokenResponse, LoginCodeSendRequest, LoginRequest, LoginWithCodeRequest, MessageResponse, RegisterCodeSendRequest, RegisterRequest, UserProfileResponse, ) from app.utils.email_utils import send_html_email from app.utils.redis_client import get_redis_client from app.core.verification.email.verificationService import EmailVerificationService, get_verification_service, TooManyCodeRequestsError, CodeExpiredError, CodeInvalidError router = APIRouter() logger = logging.getLogger(__name__) DEFAULT_REGISTER_CODE_EXPIRE_MINUTES = 10 DEFAULT_LOGIN_CODE_EXPIRE_MINUTES = 10 DEFAULT_CODE_SEND_COOLDOWN_SECONDS = 60 REGISTER_CODE_EXPIRE_MINUTES = int( os.getenv("REGISTER_CODE_EXPIRE_MINUTES", str(DEFAULT_REGISTER_CODE_EXPIRE_MINUTES)) ) LOGIN_CODE_EXPIRE_MINUTES = int( os.getenv("LOGIN_CODE_EXPIRE_MINUTES", str(DEFAULT_LOGIN_CODE_EXPIRE_MINUTES)) ) CODE_SEND_COOLDOWN_SECONDS = int( os.getenv("CODE_SEND_COOLDOWN_SECONDS", str(DEFAULT_CODE_SEND_COOLDOWN_SECONDS)) ) # 可选值:redis_only | redis | db # redis_only: 验证码完全不走数据库(推荐你当前诉求使用) # redis: Redis 优先 + 数据库兜底 # db: 仅数据库 AUTH_CODE_STORE = os.getenv("AUTH_CODE_STORE", "redis_only").strip().lower() AUTH_CODE_REDIS_PREFIX = os.getenv("AUTH_CODE_REDIS_PREFIX", "insightradar:auth_code").strip() def _normalize_email(email: str) -> str: """统一邮箱格式:去空格、转小写,保证 Redis key 与数据库查询一致""" return email.strip().lower() def _build_verification_email(code: str, purpose_text: str, expire_minutes: int) -> str: return f"""

聚势智见邮箱验证

您的{purpose_text}验证码是:

{code}

该验证码在 {expire_minutes} 分钟内有效。请勿泄露给他人。

""" def _is_redis_only() -> bool: return AUTH_CODE_STORE in {"redis_only", "redis-only"} def _is_redis_enabled() -> bool: return _is_redis_only() or AUTH_CODE_STORE == "redis" def _get_redis_for_codes(): if not _is_redis_enabled(): return None return get_redis_client() def _require_redis_for_codes(): client = _get_redis_for_codes() if client is None: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Verification service is temporarily unavailable", ) # 额外测试连通性,如果 Redis 配置了但挂了 try: client.ping() except Exception: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Verification service is temporarily unavailable", ) return client def _redis_code_key(email: str, purpose: VerificationPurpose) -> str: """Redis 中验证码的 key,格式:前缀:用途:邮箱:code""" return f"{AUTH_CODE_REDIS_PREFIX}:{purpose.value.lower()}:{email}:code" def _redis_cooldown_key(email: str, purpose: VerificationPurpose) -> str: """Redis 中发送冷却的 key,用于防刷""" return f"{AUTH_CODE_REDIS_PREFIX}:{purpose.value.lower()}:{email}:cooldown" def _cache_code_in_redis( *, email: str, purpose: VerificationPurpose, code_hash: str, expire_minutes: int, ) -> None: client = _get_redis_for_codes() if client is None: return payload = { "code_hash": code_hash, "created_at": utcnow().isoformat(), } client.set( _redis_code_key(email, purpose), json.dumps(payload), ex=max(1, expire_minutes * 60), ) def _set_send_cooldown_in_redis(email: str, purpose: VerificationPurpose) -> None: client = _get_redis_for_codes() if client is None or CODE_SEND_COOLDOWN_SECONDS <= 0: return try: client.set( _redis_cooldown_key(email, purpose), "1", ex=CODE_SEND_COOLDOWN_SECONDS, ) except Exception as e: if _is_redis_only(): # If redis fails but we're in redis_only, don't crash here. # We already generated the code hash, but we won't cache it in redis. # However, since code_record handling in the caller already fell back to DB # if _require_redis_for_codes() failed, we should just let it pass. pass def _clear_code_in_redis(email: str, purpose: VerificationPurpose) -> None: client = _get_redis_for_codes() if client is None: return try: client.delete(_redis_code_key(email, purpose)) except Exception: # 清理失败不影响主流程 pass def _build_auth_response(user: AppUser) -> AuthTokenResponse: token, expires_in = create_access_token(user_id=user.id, email=user.email) return AuthTokenResponse( access_token=token, expires_in=expires_in, user=UserProfileResponse.model_validate(user), ) @router.post("/register/send-code", response_model=MessageResponse) async def send_register_code( payload: RegisterCodeSendRequest, db: Session = Depends(get_db), service: EmailVerificationService = Depends(get_verification_service), ): email = _normalize_email(payload.email) existing_user = db.query(AppUser).filter(AppUser.email == email).first() if existing_user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Email is already registered", ) try: code = service.send_code(email, VerificationPurpose.REGISTER) await send_html_email( to_email=email, subject=f"【{code}】聚势智见 注册验证码", html_content=_build_verification_email( code, "注册", REGISTER_CODE_EXPIRE_MINUTES ), ) except TooManyCodeRequestsError as e: raise HTTPException(status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail=str(e)) except Exception as e: raise HTTPException( status_code=500, detail=f"Failed to send verification code: {e}", ) return MessageResponse(message="Verification code sent") @router.post("/login/send-code", response_model=MessageResponse) async def send_login_code( payload: LoginCodeSendRequest, db: Session = Depends(get_db), service: EmailVerificationService = Depends(get_verification_service), ): email = _normalize_email(payload.email) user = db.query(AppUser).filter(AppUser.email == email).first() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Email is not registered", ) try: code = service.send_code(email, VerificationPurpose.LOGIN) await send_html_email( to_email=email, subject=f"【{code}】聚势智见 登录验证码", html_content=_build_verification_email( code, "登录", LOGIN_CODE_EXPIRE_MINUTES ), ) except TooManyCodeRequestsError as e: raise HTTPException(status_code=429, detail=str(e)) except Exception as e: raise HTTPException( status_code=500, detail=f"Failed to send verification code: {e}", ) return MessageResponse(message="Verification code sent") @router.post( "/register", response_model=AuthTokenResponse, status_code=status.HTTP_201_CREATED, ) async def register( payload: RegisterRequest, db: Session = Depends(get_db), service: EmailVerificationService = Depends(get_verification_service), ): email = _normalize_email(payload.email) existing_user = db.query(AppUser).filter(AppUser.email == email).first() if existing_user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Email is already registered", ) try: service.verify_code( email=email, purpose=VerificationPurpose.REGISTER, code=payload.verification_code, ) except CodeExpiredError: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Verification code expired") except CodeInvalidError: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid verification code") except TooManyCodeRequestsError: raise HTTPException(status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail="Too many attempts") now = utcnow() nickname = payload.nickname or email.split("@")[0] user = AppUser( email=email, password_hash=hash_password(payload.password), nickname=nickname, metadata_={"email_verified_at": now.isoformat()}, ) db.add(user) db.commit() db.refresh(user) return _build_auth_response(user) @router.post("/login", response_model=AuthTokenResponse) async def login(payload: LoginRequest, db: Session = Depends(get_db)): """密码登录""" email = _normalize_email(payload.email) user = db.query(AppUser).filter(AppUser.email == email).first() if not user or not user.password_hash: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid email or password") if not verify_password(payload.password, user.password_hash): raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid email or password") return _build_auth_response(user) @router.post("/login/code", response_model=AuthTokenResponse) async def login_with_code( payload: LoginWithCodeRequest, db: Session = Depends(get_db), service: EmailVerificationService = Depends(get_verification_service), ): email = _normalize_email(payload.email) user = db.query(AppUser).filter(AppUser.email == email).first() if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid email or verification code", ) try: service.verify_code( email=email, purpose=VerificationPurpose.LOGIN, code=payload.verification_code, ) except CodeExpiredError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid email or verification code", ) except CodeInvalidError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid email or verification code", ) except TooManyCodeRequestsError: raise HTTPException( status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail="Too many attempts", ) return _build_auth_response(user)