import math import os from datetime import timedelta, timezone from typing import Tuple from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy.orm import Session import random from app.api.dependencies import get_db from app.core.security import ( create_access_token, generate_verification_code, hash_password, hash_verification_code, verify_password, verify_verification_code, ) from app.models.models import AppUser, EmailVerificationCode, VerificationPurpose, utcnow from app.schemas.auth_schema import ( AuthTokenResponse, LoginCodeSendRequest, LoginRequest, LoginWithCodeRequest, MessageResponse, RegisterCodeSendRequest, RegisterRequest, UserProfileResponse, ) from app.utils.email_utils import send_html_email router = APIRouter() DEFAULT_REGISTER_CODE_EXPIRE_MINUTES = 10 DEFAULT_LOGIN_CODE_EXPIRE_MINUTES = 10 DEFAULT_CODE_SEND_COOLDOWN_SECONDS = 60 REGISTER_CODE_EXPIRE_MINUTES = int( os.getenv("REGISTER_CODE_EXPIRE_MINUTES", str(DEFAULT_REGISTER_CODE_EXPIRE_MINUTES)) ) LOGIN_CODE_EXPIRE_MINUTES = int( os.getenv("LOGIN_CODE_EXPIRE_MINUTES", str(DEFAULT_LOGIN_CODE_EXPIRE_MINUTES)) ) CODE_SEND_COOLDOWN_SECONDS = int( os.getenv("CODE_SEND_COOLDOWN_SECONDS", str(DEFAULT_CODE_SEND_COOLDOWN_SECONDS)) ) def _normalize_email(email: str) -> str: return email.strip().lower() def _build_verification_email(code: str, purpose_text: str, expire_minutes: int) -> str: return f"""

InsightRadar 邮箱验证

您的{purpose_text}验证码是:

{code}

该验证码在 {expire_minutes} 分钟内有效。请勿泄露给他人。

""" def _invalidate_unused_codes(db: Session, email: str, purpose: VerificationPurpose) -> None: db.query(EmailVerificationCode).filter( EmailVerificationCode.email == email, EmailVerificationCode.purpose == purpose, EmailVerificationCode.is_used.is_(False), ).update({EmailVerificationCode.is_used: True}, synchronize_session=False) db.commit() def _create_code_record( db: Session, *, email: str, purpose: VerificationPurpose, expire_minutes: int, ) -> Tuple[EmailVerificationCode, str]: code = generate_verification_code() now = utcnow() code_record = EmailVerificationCode( email=email, purpose=purpose, code_hash=hash_verification_code(code), expires_at=now + timedelta(minutes=expire_minutes), ) db.add(code_record) db.commit() return code_record, code def _enforce_code_send_cooldown(db: Session, email: str, purpose: VerificationPurpose) -> None: """ 防抖:限制同一邮箱同一用途验证码的发送频率,避免用户短时间连续点击。 """ if CODE_SEND_COOLDOWN_SECONDS <= 0: return latest_record = ( db.query(EmailVerificationCode) .filter( EmailVerificationCode.email == email, EmailVerificationCode.purpose == purpose, ) .order_by(EmailVerificationCode.created_at.desc()) .first() ) if not latest_record: return now = utcnow() record_time = latest_record.created_at if record_time.tzinfo is None: record_time = record_time.replace(tzinfo=timezone.utc) elapsed_seconds = (now - record_time).total_seconds() if elapsed_seconds >= CODE_SEND_COOLDOWN_SECONDS: return retry_after_seconds = max(1, math.ceil(CODE_SEND_COOLDOWN_SECONDS - elapsed_seconds)) raise HTTPException( status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail=f"Please wait {retry_after_seconds}s before requesting another verification code", headers={"Retry-After": str(retry_after_seconds)}, ) def _build_auth_response(user: AppUser) -> AuthTokenResponse: token, expires_in = create_access_token(user_id=user.id, email=user.email) return AuthTokenResponse( access_token=token, expires_in=expires_in, user=UserProfileResponse.model_validate(user), ) @router.post("/register/send-code", response_model=MessageResponse) async def send_register_code(payload: RegisterCodeSendRequest, db: Session = Depends(get_db)): email = _normalize_email(payload.email) existing_user = db.query(AppUser).filter(AppUser.email == email).first() if existing_user: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Email is already registered") _enforce_code_send_cooldown(db, email, VerificationPurpose.REGISTER) _invalidate_unused_codes(db, email, VerificationPurpose.REGISTER) code_record, code = _create_code_record( db, email=email, purpose=VerificationPurpose.REGISTER, expire_minutes=REGISTER_CODE_EXPIRE_MINUTES, ) email_sent = await send_html_email( to_email=email, subject=f"【{code}】InsightRadar 注册验证码", html_content=_build_verification_email(code, "注册", REGISTER_CODE_EXPIRE_MINUTES), ) if not email_sent: code_record.is_used = True db.add(code_record) db.commit() raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to send verification code", ) return MessageResponse(message="Verification code sent") @router.post("/login/send-code", response_model=MessageResponse) async def send_login_code(payload: LoginCodeSendRequest, db: Session = Depends(get_db)): email = _normalize_email(payload.email) user = db.query(AppUser).filter(AppUser.email == email).first() if not user: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Email is not registered") _enforce_code_send_cooldown(db, email, VerificationPurpose.LOGIN) _invalidate_unused_codes(db, email, VerificationPurpose.LOGIN) code_record, code = _create_code_record( db, email=email, purpose=VerificationPurpose.LOGIN, expire_minutes=LOGIN_CODE_EXPIRE_MINUTES, ) email_sent = await send_html_email( to_email=email, subject=f"【{code}】InsightRadar 登录验证码", html_content=_build_verification_email(code, "登录", LOGIN_CODE_EXPIRE_MINUTES), ) if not email_sent: code_record.is_used = True db.add(code_record) db.commit() raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to send verification code", ) return MessageResponse(message="Verification code sent") @router.post( "/register", response_model=AuthTokenResponse, status_code=status.HTTP_201_CREATED, ) async def register(payload: RegisterRequest, db: Session = Depends(get_db)): email = _normalize_email(payload.email) existing_user = db.query(AppUser).filter(AppUser.email == email).first() if existing_user: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Email is already registered") now = utcnow() code_record = db.query(EmailVerificationCode).filter( EmailVerificationCode.email == email, EmailVerificationCode.purpose == VerificationPurpose.REGISTER, EmailVerificationCode.is_used.is_(False), EmailVerificationCode.expires_at >= now, ).order_by(EmailVerificationCode.created_at.desc()).first() if not code_record: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Verification code does not exist or expired") if not verify_verification_code(payload.verification_code, code_record.code_hash): raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid verification code") nickname = payload.nickname or email.split("@")[0] user = AppUser( email=email, password_hash=hash_password(payload.password), nickname=nickname, metadata_={"email_verified_at": now.isoformat()}, ) code_record.is_used = True db.add(user) db.add(code_record) db.commit() db.refresh(user) return _build_auth_response(user) @router.post("/login", response_model=AuthTokenResponse) async def login(payload: LoginRequest, db: Session = Depends(get_db)): email = _normalize_email(payload.email) user = db.query(AppUser).filter(AppUser.email == email).first() if not user or not user.password_hash: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid email or password") if not verify_password(payload.password, user.password_hash): raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid email or password") return _build_auth_response(user) @router.post("/login/code", response_model=AuthTokenResponse) async def login_with_code(payload: LoginWithCodeRequest, db: Session = Depends(get_db)): email = _normalize_email(payload.email) user = db.query(AppUser).filter(AppUser.email == email).first() if not user: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid email or verification code") now = utcnow() code_record = db.query(EmailVerificationCode).filter( EmailVerificationCode.email == email, EmailVerificationCode.purpose == VerificationPurpose.LOGIN, EmailVerificationCode.is_used.is_(False), EmailVerificationCode.expires_at >= now, ).order_by(EmailVerificationCode.created_at.desc()).first() if not code_record: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Verification code does not exist or expired") if not verify_verification_code(payload.verification_code, code_record.code_hash): raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid email or verification code") code_record.is_used = True db.add(code_record) db.commit() return _build_auth_response(user)