from functools import lru_cache import os import logging from typing import Optional from app.models.models import VerificationPurpose from app.core.verification.email.verificationRepository import VerificationRepository from app.core.security import generate_verification_code, hash_verification_code from app.core.verification.email.RespositoryImpl.hybirdRepository import get_verification_repository logger = logging.getLogger(__name__) # 注册验证码有效期(分钟) REGISTER_CODE_EXPIRE_MINUTES = os.getenv("REGISTER_CODE_EXPIRE_MINUTES", 5) # 登录验证码有效期(分钟) LOGIN_CODE_EXPIRE_MINUTES = os.getenv("LOGIN_CODE_EXPIRE_MINUTES",5) # 同一邮箱发送验证码的冷却间隔(秒) CODE_SEND_COOLDOWN_SECONDS = os.getenv("CODE_SEND_COOLDOWN_SECONDS",60) CODE_VERIFICATE_ATTEMP_SECONDS = os.getenv("CODE_VERIFICATE_ATTEMP_SECONDS", 60) CODE_VERIFICATE_ATTEMP_COUNT = os.getenv("CODE_VERIFICATE_ATTEMP_COUNT", 10) class CodeExpiredError(Exception): """code has been expired""" pass class CodeInvalidError(Exception): """code is not right""" pass class TooManyCodeRequestsError(Exception): """User request too many times when they verificate the email""" pass def get_ttl(purpose: VerificationPurpose)->int: if purpose == VerificationPurpose.LOGIN: return int(LOGIN_CODE_EXPIRE_MINUTES) * 60 else: return int(REGISTER_CODE_EXPIRE_MINUTES) * 60 class EmailVerificationService: def __init__(self, repo: VerificationRepository) -> None: self.repo = repo def _cooldown_key(self, email: str, purpose: VerificationPurpose) -> str: return f"verification:cooldown:{purpose.value}:{email.lower()}" def send_code(self, email: str, purpose: VerificationPurpose) -> str: email = email.lower() count = self.repo.incr(self._cooldown_key(email, purpose), int(CODE_SEND_COOLDOWN_SECONDS)) if count > 1: raise TooManyCodeRequestsError("Please wait before requesting another code") code = generate_verification_code() code_hash = hash_verification_code(code) self.repo.set_code(email, purpose, code_hash, get_ttl(purpose)) return code def verify_code(self,email: str, code: str, purpose: VerificationPurpose): email = email.lower() key = f"verification:attempts:{purpose.value.lower()}:{email}" code_hash = hash_verification_code(code) attempts = self.repo.incr(key, int(CODE_VERIFICATE_ATTEMP_SECONDS)) if attempts > int(CODE_VERIFICATE_ATTEMP_COUNT): raise TooManyCodeRequestsError("Too many attempts") stored = self.repo.compare_and_consume(email, purpose, code_hash) if stored == False: raise CodeInvalidError("Invalid code") if not stored: raise CodeExpiredError("Code expired or not found") return True @lru_cache def get_verification_service(): repo = get_verification_repository() return EmailVerificationService(repo)